-- Thrift Log File

Summary:
-- This is the thrifty version of Pillar's batch_writer
-- Cleaned up a lot of the code in batch writer and made it conform to Thrift's strict coding standards
-- Added TBufferedRouterTransport.h/cc to actually route messsages via readEnd() to the file writer. It's
   not quite as easy to route the messages in Thrift as it was in Pillar

Reviewed By: Slee

Test Plan: Tested by making sure that the file was recording data

Notes:
-- The real correctness test will be when I finish writing TLogFileTransport (pillar_logfile.cpp).


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664826 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
new file mode 100644
index 0000000..c327aab
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -0,0 +1,138 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+
+#include <string>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport { 
+
+using namespace boost;
+using std::string;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+   uint8_t* payLoad_;
+   uint32_t eventSize_;
+
+  eventInfo():payLoad_(NULL), eventSize_(0){};
+} eventInfo;
+
+
+/**
+ * Class that stores a circular in-memory event/message buffer and writes 
+ * elements to disk when the buffer becomes full or a flush is triggered.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedFileWriter : public TTransport {
+ public:
+  void setFlushMaxUs(uint32_t flushMaxUs) {
+    flushMaxUs_ = flushMaxUs;
+  }
+  uint32_t getFlushMaxUs() {
+    return flushMaxUs_;
+  }
+
+  void setFlushMaxBytes(uint32_t flushMaxBytes) {
+    flushMaxBytes_ = flushMaxBytes;
+  }
+  uint32_t getFlushMaxBytes() {
+    return flushMaxBytes_;
+  }
+
+  void setChunkSize(uint32_t chunkSize) {
+    chunkSize_ = chunkSize;
+  }
+  uint32_t getChunkSize() {
+    return chunkSize_;
+  }
+
+  void setMaxEventSize(uint32_t maxEventSize) {
+    maxEventSize_ = maxEventSize;
+  }
+  uint32_t getMaxEventSize() {
+    return maxEventSize_;
+  }
+
+  TBufferedFileWriter(string filename, uint32_t sz);
+  TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
+  void init(string filename, uint32_t sz, int fd, long long offset);
+  ~TBufferedFileWriter();
+
+  void resetOutputFile(int fd, string filename, long long offset);
+
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
+  void write(const uint8_t* buf, uint32_t len) {
+    enqueueEvent(buf, len, false);
+  }
+
+  eventInfo dequeueEvent(long long deadline);
+  void flush();
+
+  // control for writer thread
+  static void* startWriterThread(void* ptr) {
+    (((TBufferedFileWriter*)ptr)->writerThread());
+    return 0;
+  }
+  void writerThread();
+
+
+ private:
+  // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+  // element of the array stores a msg that needs to be written to the file
+  eventInfo* buffer_;
+  
+  // size of string buffer
+  uint32_t sz_;
+
+  // size of chunks that file will be split up into
+  uint32_t chunkSize_;
+
+  // max number of microseconds that can pass without flushing
+  uint32_t flushMaxUs_;
+
+  // max number of bytes that can be written without flushing
+  uint32_t flushMaxBytes_;
+
+  // max event size
+  uint32_t maxEventSize_;
+  
+  // writer thread id
+  pthread_t writer_;
+
+  // variables that determine position of head/tail of circular buffer
+  int headPos_, tailPos_;
+
+  // variables indicating whether the buffer is full or empty
+  bool isFull_, isEmpty_;
+  pthread_cond_t notFull_, notEmpty_;
+  bool closing_;
+
+  // To keep track of whether the buffer has been flushed
+  pthread_cond_t flushed_;
+  bool notFlushed_;
+
+  // Mutex that is grabbed when enqueueing, dequeueing and flushing
+  // from the circular buffer
+  pthread_mutex_t mutex_;
+
+  // File information
+  string filename_;
+  int fd_;
+
+  // Offset within the file
+  long long offset_;
+
+  void openOutputFile();
+  uint32_t getCurrentTime();
+
+};
+
+}}}
+
+#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_