-- Thrift Log File
Summary:
-- This is the thrifty version of Pillar's batch_writer
-- Cleaned up a lot of the code in batch writer and made it conform to Thrift's strict coding standards
-- Added TBufferedRouterTransport.h/cc to actually route messsages via readEnd() to the file writer. It's
not quite as easy to route the messages in Thrift as it was in Pillar
Reviewed By: Slee
Test Plan: Tested by making sure that the file was recording data
Notes:
-- The real correctness test will be when I finish writing TLogFileTransport (pillar_logfile.cpp).
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664826 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
new file mode 100644
index 0000000..c327aab
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -0,0 +1,138 @@
+#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+
+#include "TTransport.h"
+#include "Thrift.h"
+
+#include <string>
+
+#include <boost/shared_ptr.hpp>
+
+namespace facebook { namespace thrift { namespace transport {
+
+using namespace boost;
+using std::string;
+
+// Data pertaining to a single event
+typedef struct eventInfo {
+ uint8_t* payLoad_;
+ uint32_t eventSize_;
+
+ eventInfo():payLoad_(NULL), eventSize_(0){};
+} eventInfo;
+
+
+/**
+ * Class that stores a circular in-memory event/message buffer and writes
+ * elements to disk when the buffer becomes full or a flush is triggered.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedFileWriter : public TTransport {
+ public:
+ void setFlushMaxUs(uint32_t flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
+ uint32_t getFlushMaxUs() {
+ return flushMaxUs_;
+ }
+
+ void setFlushMaxBytes(uint32_t flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
+ uint32_t getFlushMaxBytes() {
+ return flushMaxBytes_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setMaxEventSize(uint32_t maxEventSize) {
+ maxEventSize_ = maxEventSize;
+ }
+ uint32_t getMaxEventSize() {
+ return maxEventSize_;
+ }
+
+ TBufferedFileWriter(string filename, uint32_t sz);
+ TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
+ void init(string filename, uint32_t sz, int fd, long long offset);
+ ~TBufferedFileWriter();
+
+ void resetOutputFile(int fd, string filename, long long offset);
+
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
+ void write(const uint8_t* buf, uint32_t len) {
+ enqueueEvent(buf, len, false);
+ }
+
+ eventInfo dequeueEvent(long long deadline);
+ void flush();
+
+ // control for writer thread
+ static void* startWriterThread(void* ptr) {
+ (((TBufferedFileWriter*)ptr)->writerThread());
+ return 0;
+ }
+ void writerThread();
+
+
+ private:
+ // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+ // element of the array stores a msg that needs to be written to the file
+ eventInfo* buffer_;
+
+ // size of string buffer
+ uint32_t sz_;
+
+ // size of chunks that file will be split up into
+ uint32_t chunkSize_;
+
+ // max number of microseconds that can pass without flushing
+ uint32_t flushMaxUs_;
+
+ // max number of bytes that can be written without flushing
+ uint32_t flushMaxBytes_;
+
+ // max event size
+ uint32_t maxEventSize_;
+
+ // writer thread id
+ pthread_t writer_;
+
+ // variables that determine position of head/tail of circular buffer
+ int headPos_, tailPos_;
+
+ // variables indicating whether the buffer is full or empty
+ bool isFull_, isEmpty_;
+ pthread_cond_t notFull_, notEmpty_;
+ bool closing_;
+
+ // To keep track of whether the buffer has been flushed
+ pthread_cond_t flushed_;
+ bool notFlushed_;
+
+ // Mutex that is grabbed when enqueueing, dequeueing and flushing
+ // from the circular buffer
+ pthread_mutex_t mutex_;
+
+ // File information
+ string filename_;
+ int fd_;
+
+ // Offset within the file
+ long long offset_;
+
+ void openOutputFile();
+ uint32_t getCurrentTime();
+
+};
+
+}}}
+
+#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_