-- TFileTransport (Thrift Logfile)
Summary:
-- TBufferedFileWriter.h/cpp will be renamed to TFileTransport.h/cpp in the next commit.
-- TFileTransport is essentially reading and writing thrift calls to/from a file instead of a
socket.
-- The code/design is somewhat similar to pillar_logfile but there are some significant changes.
todo:
-- still need to do error correction/detection
Reviewed By: Mark Slee
Test Plan:
-- Wrote test in thrift/test/cpp/src/main.cpp that appends to a file and replays requests
Notes:
It's finally time to port search over to Thrift
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664889 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h
index c327aab..8192332 100644
--- a/lib/cpp/src/transport/TBufferedFileWriter.h
+++ b/lib/cpp/src/transport/TBufferedFileWriter.h
@@ -1,10 +1,12 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
+#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
+#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
#include "TTransport.h"
#include "Thrift.h"
+#include "TProcessor.h"
#include <string>
+#include <stdio.h>
#include <boost/shared_ptr.hpp>
@@ -15,42 +17,154 @@
// Data pertaining to a single event
typedef struct eventInfo {
- uint8_t* payLoad_;
- uint32_t eventSize_;
+ uint8_t* eventBuff_;
+ uint32_t eventSize_;
+ uint32_t eventBuffPos_;
- eventInfo():payLoad_(NULL), eventSize_(0){};
+ eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
+ ~eventInfo() {
+ if (eventBuff_) {
+ delete[] eventBuff_;
+ }
+ }
} eventInfo;
+// information about current read state
+typedef struct readState {
+ eventInfo* event_;
+ // keep track of event size
+ uint8_t eventSizeBuff_[4];
+ uint8_t eventSizeBuffPos_;
+ bool readingSize_;
+
+ // read buffer variables
+ int32_t bufferPtr_;
+ int32_t bufferLen_;
+
+ // last successful dispatch point
+ int32_t lastDispatchPtr_;
+
+ void resetState(uint32_t lastDispatchPtr) {
+ readingSize_ = true;
+ eventSizeBuffPos_ = 0;
+ lastDispatchPtr_ = lastDispatchPtr;
+ }
+
+ void resetAllValues() {
+ resetState(0);
+ bufferPtr_ = 0;
+ bufferLen_ = 0;
+ if (event_) {
+ delete(event_);
+ }
+ event_ = 0;
+ }
+
+ readState() {
+ event_ = 0;
+ resetAllValues();
+ }
+
+ ~readState() {
+ if (event_) {
+ delete(event_);
+ }
+ }
+
+} readState;
+
/**
- * Class that stores a circular in-memory event/message buffer and writes
- * elements to disk when the buffer becomes full or a flush is triggered.
+ * File implementation of a transport. Reads and writes are done to a
+ * file on disk.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
-class TBufferedFileWriter : public TTransport {
+class TFileTransport : public TTransport {
public:
+ TFileTransport(string path);
+ ~TFileTransport();
+
+ // TODO: what is the correct behaviour for this?
+ // the log file is generally always open
+ bool isOpen() {
+ return true;
+ }
+
+ void write(const uint8_t* buf, uint32_t len) {
+ enqueueEvent(buf, len, false);
+ }
+
+ void flush();
+
+ uint32_t readAll(uint8_t* buf, uint32_t len);
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ // log-file specific functions
+ void seekToChunk(int chunk);
+ void seekToEnd();
+ uint32_t getNumChunks();
+
+ // for changing the output file
+ void resetOutputFile(int fd, string filename, long long offset);
+
+ // Setter/Getter functions for user-controllable options
+ void setReadBuffSize(uint32_t readBuffSize) {
+ if (readBuffSize) {
+ readBuffSize_ = readBuffSize;
+ }
+ }
+ uint32_t getReadBuffSize() {
+ return readBuffSize_;
+ }
+
+ void setReadTimeout(int32_t readTimeout) {
+ readTimeout_ = readTimeout;
+ }
+ int32_t getReadTimeout() {
+ return readTimeout_;
+ }
+
+ void setChunkSize(uint32_t chunkSize) {
+ if (chunkSize) {
+ chunkSize_ = chunkSize;
+ }
+ }
+ uint32_t getChunkSize() {
+ return chunkSize_;
+ }
+
+ void setEventBufferSize(uint32_t bufferSize) {
+ if (bufferSize) {
+ if (buffer_) {
+ delete[] buffer_;
+ }
+ eventBufferSize_ = bufferSize;
+ buffer_ = new eventInfo*[eventBufferSize_];
+ }
+ }
+ uint32_t getEventBufferSize() {
+ return eventBufferSize_;
+ }
+
void setFlushMaxUs(uint32_t flushMaxUs) {
- flushMaxUs_ = flushMaxUs;
+ if (flushMaxUs) {
+ flushMaxUs_ = flushMaxUs;
+ }
}
uint32_t getFlushMaxUs() {
return flushMaxUs_;
}
void setFlushMaxBytes(uint32_t flushMaxBytes) {
- flushMaxBytes_ = flushMaxBytes;
+ if (flushMaxBytes) {
+ flushMaxBytes_ = flushMaxBytes;
+ }
}
uint32_t getFlushMaxBytes() {
return flushMaxBytes_;
}
- void setChunkSize(uint32_t chunkSize) {
- chunkSize_ = chunkSize;
- }
- uint32_t getChunkSize() {
- return chunkSize_;
- }
-
void setMaxEventSize(uint32_t maxEventSize) {
maxEventSize_ = maxEventSize;
}
@@ -58,52 +172,88 @@
return maxEventSize_;
}
- TBufferedFileWriter(string filename, uint32_t sz);
- TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
- void init(string filename, uint32_t sz, int fd, long long offset);
- ~TBufferedFileWriter();
-
- void resetOutputFile(int fd, string filename, long long offset);
-
- void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
- void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
- void write(const uint8_t* buf, uint32_t len) {
- enqueueEvent(buf, len, false);
+ void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
+ maxCorruptedEvents_ = maxCorruptedEvents;
+ }
+ uint32_t getMaxCorruptedEvents() {
+ return maxCorruptedEvents_;
}
- eventInfo dequeueEvent(long long deadline);
- void flush();
+ void setEofSleepTimeUs(uint32_t eofSleepTime) {
+ if (eofSleepTime) {
+ eofSleepTime_ = eofSleepTime;
+ }
+ }
+ uint32_t getEofSleepTimeUs() {
+ return eofSleepTime_;
+ }
+
+ private:
+ // helper functions for writing to a file
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
+ eventInfo* dequeueEvent(long long deadline);
// control for writer thread
static void* startWriterThread(void* ptr) {
- (((TBufferedFileWriter*)ptr)->writerThread());
+ (((TFileTransport*)ptr)->writerThread());
return 0;
}
void writerThread();
+ // helper functions for reading from a file
+ bool readEvent();
- private:
- // circular buffer to hold data in before it is flushed. This is an array of strings. Each
- // element of the array stores a msg that needs to be written to the file
- eventInfo* buffer_;
-
- // size of string buffer
- uint32_t sz_;
+ // Utility functions
+ void openLogFile();
+ uint32_t getCurrentTime();
+
+ // Class variables
+ readState readState_;
+ uint8_t* readBuff_;
+
+ eventInfo* currentEvent_;
+
+ uint32_t readBuffSize_;
+ static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
+
+ int32_t readTimeout_;
+ static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
// size of chunks that file will be split up into
uint32_t chunkSize_;
+ static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+ // size of string buffer
+ uint32_t eventBufferSize_;
+ static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
+
+ // circular buffer to hold data in before it is flushed. This is an array of strings. Each
+ // element of the array stores a msg that needs to be written to the file
+ eventInfo** buffer_;
+
// max number of microseconds that can pass without flushing
uint32_t flushMaxUs_;
+ static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
// max number of bytes that can be written without flushing
uint32_t flushMaxBytes_;
+ static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
// max event size
uint32_t maxEventSize_;
+ static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
+
+ // max number of corrupted events per chunk
+ uint32_t maxCorruptedEvents_;
+ static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
+ // sleep duration when EOF is hit
+ uint32_t eofSleepTime_;
+ static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
// writer thread id
- pthread_t writer_;
+ pthread_t writerThreadId_;
// variables that determine position of head/tail of circular buffer
int headPos_, tailPos_;
@@ -126,13 +276,61 @@
int fd_;
// Offset within the file
- long long offset_;
-
- void openOutputFile();
- uint32_t getCurrentTime();
+ off_t offset_;
};
-}}}
+// Exception thrown when EOF is hit
+class TEOFException : public facebook::thrift::TTransportException {
+ public:
+ TEOFException():
+ facebook::thrift::TTransportException(TTX_EOF) {};
+};
-#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
+
+// wrapper class to process events from a file containing thrift events
+class TFileProcessor {
+ public:
+ /**
+ * Constructor that defaults output transport to null transport
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport file transport
+ */
+ TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileTransport> inputTransport);
+
+ /**
+ * Constructor
+ *
+ * @param processor processes log-file events
+ * @param protocolFactory protocol factory
+ * @param inputTransport input file transport
+ * @param output output transport
+ */
+ TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TFileTransport> inputTransport,
+ shared_ptr<TTransport> outputTransport);
+
+ /**
+ * processes events from the file
+ *
+ * @param numEvents number of events to process (0 for unlimited)
+ * @param tail tails the file if true
+ */
+ void process(uint32_t numEvents, bool tail);
+
+ private:
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TProtocolFactory> protocolFactory_;
+ shared_ptr<TFileTransport> inputTransport_;
+ shared_ptr<TTransport> outputTransport_;
+};
+
+
+}}} // facebook::thrift::transport
+
+#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_