-- Thrift Log File
Summary:
-- This is the thrifty version of Pillar's batch_writer
-- Cleaned up a lot of the code in batch writer and made it conform to Thrift's strict coding standards
-- Added TBufferedRouterTransport.h/cc to actually route messsages via readEnd() to the file writer. It's
not quite as easy to route the messages in Thrift as it was in Pillar
Reviewed By: Slee
Test Plan: Tested by making sure that the file was recording data
Notes:
-- The real correctness test will be when I finish writing TLogFileTransport (pillar_logfile.cpp).
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664826 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cc b/lib/cpp/src/transport/TBufferedFileWriter.cc
new file mode 100644
index 0000000..ac46b97
--- /dev/null
+++ b/lib/cpp/src/transport/TBufferedFileWriter.cc
@@ -0,0 +1,348 @@
+#include "TBufferedFileWriter.h"
+
+#include <pthread.h>
+#include <cassert>
+#include <cstdlib>
+#include <string>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <errno.h>
+
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport {
+
+TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) {
+ init(filename, sz, 0, 0);
+}
+
+TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) {
+ init(filename, sz, fd, offset);
+}
+
+void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) {
+ // validate buffer size
+ sz_ = sz;
+ if (sz_ <= 0) {
+ throw TTransportException("invalid input buffer size");
+ }
+
+ // set file-related variables
+ fd_ = 0;
+ resetOutputFile(fd, filename, offset);
+
+ // set default values of flush related params
+ flushMaxBytes_ = 1024 * 100;
+ flushMaxUs_ = 20 * 1000;
+
+ // allocate event buffer
+ buffer_ = new eventInfo[sz_];
+
+ // buffer is initially empty
+ isEmpty_ = true;
+ isFull_ = false;
+
+ // both head and tail are initially at 0
+ headPos_ = 0;
+ tailPos_ = 0;
+
+ // for lack of a better option, set chunk size to 0. Users can change this to whatever they want
+ chunkSize_ = 0;
+
+ // initialize all the condition vars/mutexes
+ pthread_mutex_init(&mutex_, NULL);
+ pthread_cond_init(¬Full_, NULL);
+ pthread_cond_init(¬Empty_, NULL);
+ pthread_cond_init(&flushed_, NULL);
+
+ // not closing the file during init
+ closing_ = false;
+
+ // spawn writer thread
+ pthread_create(&writer_, NULL, startWriterThread, (void *)this);
+}
+
+void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) {
+ filename_ = filename;
+ offset_ = offset;
+
+ // check if current file is still open
+ if (fd_ > 0) {
+ // TODO: unclear if this should throw an error
+ fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str());
+ ::close(fd_);
+ }
+ fd_ = fd;
+}
+
+
+TBufferedFileWriter::~TBufferedFileWriter() {
+ // flush output buffer
+ flush();
+
+ // send a signal to write thread to end
+ closing_ = true;
+ pthread_join(writer_, NULL);
+
+ delete[] buffer_;
+
+ // TODO: should the file be closed here?
+}
+
+
+void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+ // make sure that event size is valid
+ if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
+ // ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+ return;
+ }
+
+ if (eventLen == 0) {
+ ERROR("cannot enqueue an empty event");
+ return;
+ }
+
+ eventInfo toEnqueue;
+ uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen);
+ toEnqueue.payLoad_ = bufCopy;
+ toEnqueue.eventSize_ = eventLen;
+
+ return enqueueEvent(toEnqueue, blockUntilFlush);
+}
+
+void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) {
+ // Lock mutex
+ pthread_mutex_lock(&mutex_);
+ // Can't enqueue while buffer is full
+ while(isFull_) {
+ pthread_cond_wait(¬Full_, &mutex_);
+ }
+
+ // make a copy and enqueue at tail of buffer
+ buffer_[tailPos_] = toEnqueue;
+ tailPos_ = (tailPos_+1) % sz_;
+
+ // mark the buffer as non-empty
+ isEmpty_ = false;
+
+ // circular buffer has wrapped around (and is full)
+ if(tailPos_ == headPos_) {
+ // DEBUG("queue is full");
+ isFull_ = true;
+ }
+
+ // signal anybody who's waiting for the buffer to be non-empty
+ pthread_cond_signal(¬Empty_);
+ if(blockUntilFlush) {
+ pthread_cond_wait(&flushed_, &mutex_);
+ }
+
+ // TODO: don't return until flushed to disk
+ // this really should be a loop where it makes sure it got flushed
+ // because condition variables can get triggered by the os for no reason
+ // it is probably a non-factor for the time being
+ pthread_mutex_unlock(&mutex_);
+
+}
+
+eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) {
+ //deadline time struc
+ struct timespec ts;
+ if(deadline) {
+ ts.tv_sec = deadline/(1000*1000);
+ ts.tv_nsec = (deadline%(1000*1000))*1000;
+ }
+
+ // wait for the queue to fill up
+ pthread_mutex_lock(&mutex_);
+ while(isEmpty_) {
+ // do a timed wait on the condition variable
+ if(deadline) {
+ int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts);
+ if(e == ETIMEDOUT) {
+ break;
+ }
+ }
+ else {
+ // just wait until the buffer gets an item
+ pthread_cond_wait(¬Empty_, &mutex_);
+ }
+ }
+
+ string ret;
+ bool doSignal = false;
+
+ // could be empty if we timed out
+ eventInfo retEvent;
+ if(!isEmpty_) {
+ retEvent = buffer_[headPos_];
+ headPos_ = (headPos_+1) % sz_;
+
+ isFull_ = false;
+ doSignal = true;
+
+ // check if this is the last item in the buffer
+ if(headPos_ == tailPos_) {
+ isEmpty_ = true;
+ }
+ }
+
+ // unlock the mutex and signal if required
+ pthread_mutex_unlock(&mutex_);
+ if(doSignal) {
+ pthread_cond_signal(¬Full_);
+ }
+
+ return retEvent;
+}
+
+
+void TBufferedFileWriter::flush()
+{
+ eventInfo flushEvent;
+ flushEvent.payLoad_ = NULL;
+ flushEvent.eventSize_ = 0;
+
+ notFlushed_ = true;
+
+ enqueueEvent(flushEvent, false);
+
+ // wait for flush to take place
+ pthread_mutex_lock(&mutex_);
+
+ while(notFlushed_) {
+ pthread_cond_wait(&flushed_, &mutex_);
+ }
+
+ pthread_mutex_unlock(&mutex_);
+}
+
+void TBufferedFileWriter::openOutputFile() {
+ mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
+ fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode);
+
+ // make sure open call was successful
+ if(fd_ == -1) {
+ char errorMsg[1024];
+ sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str());
+ perror(errorMsg);
+ throw TTransportException(errorMsg);
+ }
+}
+
+uint32_t TBufferedFileWriter::getCurrentTime() {
+ long long ret;
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ ret = tv.tv_sec;
+ ret = ret*1000*1000 + tv.tv_usec;
+ return ret;
+}
+
+
+void TBufferedFileWriter::writerThread() {
+ // open file if it is not open
+ if(!fd_) {
+ openOutputFile();
+ }
+
+ // Figure out the next time by which a flush must take place
+ long long nextFlush = getCurrentTime() + flushMaxUs_;
+ uint32_t unflushed = 0;
+
+ while(1) {
+ // this will only be true when the destructor is being invoked
+ if(closing_) {
+ if(-1 == ::close(fd_)) {
+ perror("TBufferedFileWriter: error in close");
+ }
+ throw TTransportException("error in file close");
+ }
+
+ //long long start = now();
+ eventInfo outEvent = dequeueEvent(nextFlush);
+
+ // sanity check on event
+ if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) {
+ ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_);
+ continue;
+ }
+ //long long diff = now()-start;
+ //DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
+
+ // If chunking is required, then make sure that msg does not cross chunk boundary
+ if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) {
+
+ // event size must be less than chunk size
+ if(outEvent.eventSize_ > chunkSize_) {
+ ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event",
+ outEvent.eventSize_, chunkSize_);
+ continue;
+ }
+
+ long long chunk1 = offset_/chunkSize_;
+ long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_;
+
+ // if adding this event will cross a chunk boundary, pad the chunk with zeros
+ if(chunk1 != chunk2) {
+ int padding = (int)(chunk2*chunkSize_ - offset_);
+
+ // sanity check
+ if (padding <= 0) {
+ DEBUG("Padding is empty, skipping event");
+ continue;
+ }
+ if (padding > (int32_t)chunkSize_) {
+ DEBUG("padding is larger than chunk size, skipping event");
+ continue;
+ }
+ // DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
+ uint8_t zeros[padding];
+ bzero(zeros, padding);
+ if(-1 == ::write(fd_, zeros, padding)) {
+ perror("TBufferedFileWriter: error while padding zeros");
+ throw TTransportException("TBufferedFileWriter: error while padding zeros");
+ }
+ unflushed += padding;
+ offset_ += padding;
+ }
+ }
+
+ // write the dequeued event to the file
+ if(outEvent.eventSize_ > 0) {
+ if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) {
+ perror("TBufferedFileWriter: error while writing event");
+ // TODO: should this trigger an exception or simply continue?
+ throw TTransportException("TBufferedFileWriter: error while writing event");
+ }
+
+ // deallocate payload
+ free(outEvent.payLoad_);
+
+ unflushed += outEvent.eventSize_;
+ offset_ += outEvent.eventSize_;
+ }
+
+ // couple of cases from which a flush could be triggered
+ if((getCurrentTime() >= nextFlush && unflushed > 0) ||
+ unflushed > flushMaxBytes_ ||
+ (outEvent.eventSize_ == 0) ) {
+ //Debug("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
+
+ // sync (force flush) file to disk
+ fsync(fd_);
+ nextFlush = getCurrentTime() + flushMaxUs_;
+ unflushed = 0;
+
+ // notify anybody(thing?) waiting for flush completion
+ pthread_mutex_lock(&mutex_);
+ notFlushed_ = false;
+ pthread_mutex_unlock(&mutex_);
+ pthread_cond_broadcast(&flushed_);
+ }
+ }
+
+}
+
+}}} // facebook::thrift::transport