-- TFileTransport (Thrift Logfile)

Summary:
-- TBufferedFileWriter.h/cpp will be renamed to TFileTransport.h/cpp in the next commit.
-- TFileTransport is essentially reading and writing thrift calls to/from a file instead of a
   socket.
-- The code/design is somewhat similar to pillar_logfile but there are some significant changes.

todo:
-- still need to do error correction/detection

Reviewed By: Mark Slee

Test Plan:
-- Wrote test in thrift/test/cpp/src/main.cpp that appends to a file and replays requests

Notes:
It's finally time to port search over to Thrift


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664889 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
index c327aab..8192332 100644
--- a/lib/cpp/src/transport/TBufferedFileWriter.h
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -1,10 +1,12 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
 
 #include "TTransport.h"
 #include "Thrift.h"
+#include "TProcessor.h"
 
 #include <string>
+#include <stdio.h>
 
 #include <boost/shared_ptr.hpp>
 
@@ -15,42 +17,154 @@
 
 // Data pertaining to a single event
 typedef struct eventInfo {
-   uint8_t* payLoad_;
-   uint32_t eventSize_;
+  uint8_t* eventBuff_;
+  uint32_t eventSize_;
+  uint32_t eventBuffPos_;
 
-  eventInfo():payLoad_(NULL), eventSize_(0){};
+  eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+  ~eventInfo() {
+    if (eventBuff_) {
+      delete[] eventBuff_;
+    }
+  }
 } eventInfo;
 
+// information about current read state
+typedef struct readState {
+  eventInfo* event_;
 
+  // keep track of event size
+  uint8_t   eventSizeBuff_[4];
+  uint8_t   eventSizeBuffPos_;
+  bool      readingSize_;
+
+  // read buffer variables
+  int32_t  bufferPtr_;
+  int32_t  bufferLen_;
+
+  // last successful dispatch point
+  int32_t lastDispatchPtr_;
+  
+  void resetState(uint32_t lastDispatchPtr) {
+    readingSize_ = true;
+    eventSizeBuffPos_ = 0;
+    lastDispatchPtr_ = lastDispatchPtr;
+  }
+
+  void resetAllValues() {
+    resetState(0);
+    bufferPtr_ = 0;
+    bufferLen_ = 0;
+    if (event_) {
+      delete(event_);
+    }
+    event_ = 0;
+  }
+
+  readState() {
+    event_ = 0;
+    resetAllValues();
+  }
+
+  ~readState() {
+    if (event_) {
+      delete(event_);
+    }
+  }
+
+} readState;
+ 
 /**
- * Class that stores a circular in-memory event/message buffer and writes 
- * elements to disk when the buffer becomes full or a flush is triggered.
+ * File implementation of a transport. Reads and writes are done to a 
+ * file on disk.
  *
  * @author Aditya Agarwal <aditya@facebook.com>
  */
-class TBufferedFileWriter : public TTransport {
+class TFileTransport : public TTransport {
  public:
+  TFileTransport(string path);
+  ~TFileTransport();
+
+  // TODO: what is the correct behaviour for this?
+  // the log file is generally always open
+  bool isOpen() {
+    return true;
+  }
+  
+  void write(const uint8_t* buf, uint32_t len) {
+    enqueueEvent(buf, len, false);
+  }
+  
+  void flush();
+
+  uint32_t readAll(uint8_t* buf, uint32_t len);
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  // log-file specific functions
+  void seekToChunk(int chunk);
+  void seekToEnd();
+  uint32_t getNumChunks();
+
+  // for changing the output file
+  void resetOutputFile(int fd, string filename, long long offset);
+
+  // Setter/Getter functions for user-controllable options
+  void setReadBuffSize(uint32_t readBuffSize) {
+    if (readBuffSize) {
+      readBuffSize_ = readBuffSize;
+    }
+  }
+  uint32_t getReadBuffSize() {
+    return readBuffSize_;
+  }
+
+  void setReadTimeout(int32_t readTimeout) {
+    readTimeout_ = readTimeout;
+  }
+  int32_t getReadTimeout() {
+    return readTimeout_;
+  }
+
+  void setChunkSize(uint32_t chunkSize) {
+    if (chunkSize) {
+      chunkSize_ = chunkSize;
+    }
+  }
+  uint32_t getChunkSize() {
+    return chunkSize_;
+  }
+
+  void setEventBufferSize(uint32_t bufferSize) {    
+    if (bufferSize) {
+      if (buffer_) {
+        delete[] buffer_;
+      }
+      eventBufferSize_ = bufferSize;
+      buffer_ = new eventInfo*[eventBufferSize_];
+    }
+  }
+  uint32_t getEventBufferSize() {
+    return eventBufferSize_;
+  }
+
   void setFlushMaxUs(uint32_t flushMaxUs) {
-    flushMaxUs_ = flushMaxUs;
+    if (flushMaxUs) {
+      flushMaxUs_ = flushMaxUs;
+    }
   }
   uint32_t getFlushMaxUs() {
     return flushMaxUs_;
   }
 
   void setFlushMaxBytes(uint32_t flushMaxBytes) {
-    flushMaxBytes_ = flushMaxBytes;
+    if (flushMaxBytes) {
+      flushMaxBytes_ = flushMaxBytes;
+    }
   }
   uint32_t getFlushMaxBytes() {
     return flushMaxBytes_;
   }
 
-  void setChunkSize(uint32_t chunkSize) {
-    chunkSize_ = chunkSize;
-  }
-  uint32_t getChunkSize() {
-    return chunkSize_;
-  }
-
   void setMaxEventSize(uint32_t maxEventSize) {
     maxEventSize_ = maxEventSize;
   }
@@ -58,52 +172,88 @@
     return maxEventSize_;
   }
 
-  TBufferedFileWriter(string filename, uint32_t sz);
-  TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
-  void init(string filename, uint32_t sz, int fd, long long offset);
-  ~TBufferedFileWriter();
-
-  void resetOutputFile(int fd, string filename, long long offset);
-
-  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
-  void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
-  void write(const uint8_t* buf, uint32_t len) {
-    enqueueEvent(buf, len, false);
+  void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+    maxCorruptedEvents_ = maxCorruptedEvents;
+  }
+  uint32_t getMaxCorruptedEvents() {
+    return maxCorruptedEvents_;
   }
 
-  eventInfo dequeueEvent(long long deadline);
-  void flush();
+  void setEofSleepTimeUs(uint32_t eofSleepTime) {
+    if (eofSleepTime) {
+      eofSleepTime_ = eofSleepTime;
+    }
+  }
+  uint32_t getEofSleepTimeUs() {
+    return eofSleepTime_;
+  }
+
+ private:
+  // helper functions for writing to a file
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
+  eventInfo* dequeueEvent(long long deadline);
 
   // control for writer thread
   static void* startWriterThread(void* ptr) {
-    (((TBufferedFileWriter*)ptr)->writerThread());
+    (((TFileTransport*)ptr)->writerThread());
     return 0;
   }
   void writerThread();
 
+  // helper functions for reading from a file
+  bool readEvent();
 
- private:
-  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
-  // element of the array stores a msg that needs to be written to the file
-  eventInfo* buffer_;
-  
-  // size of string buffer
-  uint32_t sz_;
+  // Utility functions
+  void openLogFile();
+  uint32_t getCurrentTime();
+
+  // Class variables
+  readState readState_;
+  uint8_t* readBuff_;
+
+  eventInfo* currentEvent_;
+
+  uint32_t readBuffSize_;
+  static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+  int32_t readTimeout_;
+  static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
 
   // size of chunks that file will be split up into
   uint32_t chunkSize_;
+  static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
 
+  // size of string buffer
+  uint32_t eventBufferSize_;
+  static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
+
+  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+  // element of the array stores a msg that needs to be written to the file
+  eventInfo** buffer_;
+  
   // max number of microseconds that can pass without flushing
   uint32_t flushMaxUs_;
+  static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
 
   // max number of bytes that can be written without flushing
   uint32_t flushMaxBytes_;
+  static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
 
   // max event size
   uint32_t maxEventSize_;
+  static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+  // max number of corrupted events per chunk
+  uint32_t maxCorruptedEvents_;
+  static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
   
+  // sleep duration when EOF is hit
+  uint32_t eofSleepTime_;
+  static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+    
   // writer thread id
-  pthread_t writer_;
+  pthread_t writerThreadId_;
 
   // variables that determine position of head/tail of circular buffer
   int headPos_, tailPos_;
@@ -126,13 +276,61 @@
   int fd_;
 
   // Offset within the file
-  long long offset_;
-
-  void openOutputFile();
-  uint32_t getCurrentTime();
+  off_t offset_;
 
 };
 
-}}}
+// Exception thrown when EOF is hit
+class TEOFException : public facebook::thrift::TTransportException {
+ public:
+  TEOFException():
+    facebook::thrift::TTransportException(TTX_EOF) {};
+};
 
-#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+  /** 
+   * Constructor that defaults output transport to null transport
+   * 
+   * @param processor processes log-file events
+   * @param protocolFactory protocol factory
+   * @param inputTransport file transport
+   */
+  TFileProcessor(shared_ptr<TProcessor> processor,
+                 shared_ptr<TProtocolFactory> protocolFactory,
+                 shared_ptr<TFileTransport> inputTransport);
+
+  /** 
+   * Constructor
+   * 
+   * @param processor processes log-file events
+   * @param protocolFactory protocol factory
+   * @param inputTransport input file transport
+   * @param output output transport
+   */    
+  TFileProcessor(shared_ptr<TProcessor> processor,
+                 shared_ptr<TProtocolFactory> protocolFactory,
+                 shared_ptr<TFileTransport> inputTransport,
+                 shared_ptr<TTransport> outputTransport);                      
+
+  /**
+   * processes events from the file
+   *
+   * @param numEvents number of events to process (0 for unlimited)
+   * @param tail tails the file if true
+   */
+  void process(uint32_t numEvents, bool tail);
+
+ private:
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TProtocolFactory> protocolFactory_;
+  shared_ptr<TFileTransport> inputTransport_;
+  shared_ptr<TTransport> outputTransport_;
+};
+
+ 
+}}} // facebook::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_