From e528c7649099d84dea57b47c58bbc5c8e65e45f8 Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Wed, 11 Oct 2006 02:48:43 +0000 Subject: [PATCH] -- Thrift Log File Summary: -- This is the thrifty version of Pillar's batch_writer -- Cleaned up a lot of the code in batch writer and made it conform to Thrift's strict coding standards -- Added TBufferedRouterTransport.h/cc to actually route messsages via readEnd() to the file writer. It's not quite as easy to route the messages in Thrift as it was in Pillar Reviewed By: Slee Test Plan: Tested by making sure that the file was recording data Notes: -- The real correctness test will be when I finish writing TLogFileTransport (pillar_logfile.cpp). git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664826 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 9 +- lib/cpp/src/transport/TBufferedFileWriter.cc | 348 ++++++++++++++++++ lib/cpp/src/transport/TBufferedFileWriter.h | 138 +++++++ .../src/transport/TBufferedRouterTransport.cc | 79 ++++ .../src/transport/TBufferedRouterTransport.h | 92 +++++ .../TBufferedRouterTransportFactory.h | 35 ++ 6 files changed, 699 insertions(+), 2 deletions(-) create mode 100644 lib/cpp/src/transport/TBufferedFileWriter.cc create mode 100644 lib/cpp/src/transport/TBufferedFileWriter.h create mode 100644 lib/cpp/src/transport/TBufferedRouterTransport.cc create mode 100644 lib/cpp/src/transport/TBufferedRouterTransport.h create mode 100644 lib/cpp/src/transport/TBufferedRouterTransportFactory.h diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 2cb47593..a92de3e2 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -11,6 +11,8 @@ libthrift_sources = src/concurrency/Monitor.cc \ src/concurrency/TimerManager.cc \ src/protocol/TBinaryProtocol.cc \ src/transport/TBufferedTransport.cc \ + src/transport/TBufferedFileWriter.cc \ + src/transport/TBufferedRouterTransport.cc \ src/transport/TFramedTransport.cc \ src/transport/TMemoryBuffer.cc \ src/transport/TSocket.cc \ @@ -30,7 +32,8 @@ include_thriftdir = $(includedir)/thrift include_thrift_HEADERS = \ config.h \ src/Thrift.h \ - src/TProcessor.h + src/TProcessor.h \ + src/TLogging.h include_concurrencydir = $(include_thriftdir)/concurrency include_concurrency_HEADERS = \ @@ -59,7 +62,9 @@ include_transport_HEADERS = \ src/transport/TTransport.h \ src/transport/TTransportException.h \ src/transport/TTransportFactory.h \ - src/transport/TBufferedTransportFactory.h + src/transport/TBufferedTransportFactory.h \ + src/transport/TBufferedFileWriter.h \ + src/transport/TBufferedRouterTransport.h include_serverdir = $(include_thriftdir)/server include_server_HEADERS = \ diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cc b/lib/cpp/src/transport/TBufferedFileWriter.cc new file mode 100644 index 00000000..ac46b97c --- /dev/null +++ b/lib/cpp/src/transport/TBufferedFileWriter.cc @@ -0,0 +1,348 @@ +#include "TBufferedFileWriter.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using std::string; + +namespace facebook { namespace thrift { namespace transport { + +TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) { + init(filename, sz, 0, 0); +} + +TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) { + init(filename, sz, fd, offset); +} + +void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) { + // validate buffer size + sz_ = sz; + if (sz_ <= 0) { + throw TTransportException("invalid input buffer size"); + } + + // set file-related variables + fd_ = 0; + resetOutputFile(fd, filename, offset); + + // set default values of flush related params + flushMaxBytes_ = 1024 * 100; + flushMaxUs_ = 20 * 1000; + + // allocate event buffer + buffer_ = new eventInfo[sz_]; + + // buffer is initially empty + isEmpty_ = true; + isFull_ = false; + + // both head and tail are initially at 0 + headPos_ = 0; + tailPos_ = 0; + + // for lack of a better option, set chunk size to 0. Users can change this to whatever they want + chunkSize_ = 0; + + // initialize all the condition vars/mutexes + pthread_mutex_init(&mutex_, NULL); + pthread_cond_init(¬Full_, NULL); + pthread_cond_init(¬Empty_, NULL); + pthread_cond_init(&flushed_, NULL); + + // not closing the file during init + closing_ = false; + + // spawn writer thread + pthread_create(&writer_, NULL, startWriterThread, (void *)this); +} + +void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) { + filename_ = filename; + offset_ = offset; + + // check if current file is still open + if (fd_ > 0) { + // TODO: unclear if this should throw an error + fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str()); + ::close(fd_); + } + fd_ = fd; +} + + +TBufferedFileWriter::~TBufferedFileWriter() { + // flush output buffer + flush(); + + // send a signal to write thread to end + closing_ = true; + pthread_join(writer_, NULL); + + delete[] buffer_; + + // TODO: should the file be closed here? +} + + +void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { + // make sure that event size is valid + if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) { + // ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_); + return; + } + + if (eventLen == 0) { + ERROR("cannot enqueue an empty event"); + return; + } + + eventInfo toEnqueue; + uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen); + toEnqueue.payLoad_ = bufCopy; + toEnqueue.eventSize_ = eventLen; + + return enqueueEvent(toEnqueue, blockUntilFlush); +} + +void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) { + // Lock mutex + pthread_mutex_lock(&mutex_); + // Can't enqueue while buffer is full + while(isFull_) { + pthread_cond_wait(¬Full_, &mutex_); + } + + // make a copy and enqueue at tail of buffer + buffer_[tailPos_] = toEnqueue; + tailPos_ = (tailPos_+1) % sz_; + + // mark the buffer as non-empty + isEmpty_ = false; + + // circular buffer has wrapped around (and is full) + if(tailPos_ == headPos_) { + // DEBUG("queue is full"); + isFull_ = true; + } + + // signal anybody who's waiting for the buffer to be non-empty + pthread_cond_signal(¬Empty_); + if(blockUntilFlush) { + pthread_cond_wait(&flushed_, &mutex_); + } + + // TODO: don't return until flushed to disk + // this really should be a loop where it makes sure it got flushed + // because condition variables can get triggered by the os for no reason + // it is probably a non-factor for the time being + pthread_mutex_unlock(&mutex_); + +} + +eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) { + //deadline time struc + struct timespec ts; + if(deadline) { + ts.tv_sec = deadline/(1000*1000); + ts.tv_nsec = (deadline%(1000*1000))*1000; + } + + // wait for the queue to fill up + pthread_mutex_lock(&mutex_); + while(isEmpty_) { + // do a timed wait on the condition variable + if(deadline) { + int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts); + if(e == ETIMEDOUT) { + break; + } + } + else { + // just wait until the buffer gets an item + pthread_cond_wait(¬Empty_, &mutex_); + } + } + + string ret; + bool doSignal = false; + + // could be empty if we timed out + eventInfo retEvent; + if(!isEmpty_) { + retEvent = buffer_[headPos_]; + headPos_ = (headPos_+1) % sz_; + + isFull_ = false; + doSignal = true; + + // check if this is the last item in the buffer + if(headPos_ == tailPos_) { + isEmpty_ = true; + } + } + + // unlock the mutex and signal if required + pthread_mutex_unlock(&mutex_); + if(doSignal) { + pthread_cond_signal(¬Full_); + } + + return retEvent; +} + + +void TBufferedFileWriter::flush() +{ + eventInfo flushEvent; + flushEvent.payLoad_ = NULL; + flushEvent.eventSize_ = 0; + + notFlushed_ = true; + + enqueueEvent(flushEvent, false); + + // wait for flush to take place + pthread_mutex_lock(&mutex_); + + while(notFlushed_) { + pthread_cond_wait(&flushed_, &mutex_); + } + + pthread_mutex_unlock(&mutex_); +} + +void TBufferedFileWriter::openOutputFile() { + mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH; + fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode); + + // make sure open call was successful + if(fd_ == -1) { + char errorMsg[1024]; + sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str()); + perror(errorMsg); + throw TTransportException(errorMsg); + } +} + +uint32_t TBufferedFileWriter::getCurrentTime() { + long long ret; + struct timeval tv; + gettimeofday(&tv, NULL); + ret = tv.tv_sec; + ret = ret*1000*1000 + tv.tv_usec; + return ret; +} + + +void TBufferedFileWriter::writerThread() { + // open file if it is not open + if(!fd_) { + openOutputFile(); + } + + // Figure out the next time by which a flush must take place + long long nextFlush = getCurrentTime() + flushMaxUs_; + uint32_t unflushed = 0; + + while(1) { + // this will only be true when the destructor is being invoked + if(closing_) { + if(-1 == ::close(fd_)) { + perror("TBufferedFileWriter: error in close"); + } + throw TTransportException("error in file close"); + } + + //long long start = now(); + eventInfo outEvent = dequeueEvent(nextFlush); + + // sanity check on event + if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) { + ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_); + continue; + } + //long long diff = now()-start; + //DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000); + + // If chunking is required, then make sure that msg does not cross chunk boundary + if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) { + + // event size must be less than chunk size + if(outEvent.eventSize_ > chunkSize_) { + ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event", + outEvent.eventSize_, chunkSize_); + continue; + } + + long long chunk1 = offset_/chunkSize_; + long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_; + + // if adding this event will cross a chunk boundary, pad the chunk with zeros + if(chunk1 != chunk2) { + int padding = (int)(chunk2*chunkSize_ - offset_); + + // sanity check + if (padding <= 0) { + DEBUG("Padding is empty, skipping event"); + continue; + } + if (padding > (int32_t)chunkSize_) { + DEBUG("padding is larger than chunk size, skipping event"); + continue; + } + // DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2); + uint8_t zeros[padding]; + bzero(zeros, padding); + if(-1 == ::write(fd_, zeros, padding)) { + perror("TBufferedFileWriter: error while padding zeros"); + throw TTransportException("TBufferedFileWriter: error while padding zeros"); + } + unflushed += padding; + offset_ += padding; + } + } + + // write the dequeued event to the file + if(outEvent.eventSize_ > 0) { + if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) { + perror("TBufferedFileWriter: error while writing event"); + // TODO: should this trigger an exception or simply continue? + throw TTransportException("TBufferedFileWriter: error while writing event"); + } + + // deallocate payload + free(outEvent.payLoad_); + + unflushed += outEvent.eventSize_; + offset_ += outEvent.eventSize_; + } + + // couple of cases from which a flush could be triggered + if((getCurrentTime() >= nextFlush && unflushed > 0) || + unflushed > flushMaxBytes_ || + (outEvent.eventSize_ == 0) ) { + //Debug("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_); + + // sync (force flush) file to disk + fsync(fd_); + nextFlush = getCurrentTime() + flushMaxUs_; + unflushed = 0; + + // notify anybody(thing?) waiting for flush completion + pthread_mutex_lock(&mutex_); + notFlushed_ = false; + pthread_mutex_unlock(&mutex_); + pthread_cond_broadcast(&flushed_); + } + } + +} + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h new file mode 100644 index 00000000..c327aabd --- /dev/null +++ b/lib/cpp/src/transport/TBufferedFileWriter.h @@ -0,0 +1,138 @@ +#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ +#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1 + +#include "TTransport.h" +#include "Thrift.h" + +#include + +#include + +namespace facebook { namespace thrift { namespace transport { + +using namespace boost; +using std::string; + +// Data pertaining to a single event +typedef struct eventInfo { + uint8_t* payLoad_; + uint32_t eventSize_; + + eventInfo():payLoad_(NULL), eventSize_(0){}; +} eventInfo; + + +/** + * Class that stores a circular in-memory event/message buffer and writes + * elements to disk when the buffer becomes full or a flush is triggered. + * + * @author Aditya Agarwal + */ +class TBufferedFileWriter : public TTransport { + public: + void setFlushMaxUs(uint32_t flushMaxUs) { + flushMaxUs_ = flushMaxUs; + } + uint32_t getFlushMaxUs() { + return flushMaxUs_; + } + + void setFlushMaxBytes(uint32_t flushMaxBytes) { + flushMaxBytes_ = flushMaxBytes; + } + uint32_t getFlushMaxBytes() { + return flushMaxBytes_; + } + + void setChunkSize(uint32_t chunkSize) { + chunkSize_ = chunkSize; + } + uint32_t getChunkSize() { + return chunkSize_; + } + + void setMaxEventSize(uint32_t maxEventSize) { + maxEventSize_ = maxEventSize; + } + uint32_t getMaxEventSize() { + return maxEventSize_; + } + + TBufferedFileWriter(string filename, uint32_t sz); + TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset); + void init(string filename, uint32_t sz, int fd, long long offset); + ~TBufferedFileWriter(); + + void resetOutputFile(int fd, string filename, long long offset); + + void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); + void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush); + void write(const uint8_t* buf, uint32_t len) { + enqueueEvent(buf, len, false); + } + + eventInfo dequeueEvent(long long deadline); + void flush(); + + // control for writer thread + static void* startWriterThread(void* ptr) { + (((TBufferedFileWriter*)ptr)->writerThread()); + return 0; + } + void writerThread(); + + + private: + // circular buffer to hold data in before it is flushed. This is an array of strings. Each + // element of the array stores a msg that needs to be written to the file + eventInfo* buffer_; + + // size of string buffer + uint32_t sz_; + + // size of chunks that file will be split up into + uint32_t chunkSize_; + + // max number of microseconds that can pass without flushing + uint32_t flushMaxUs_; + + // max number of bytes that can be written without flushing + uint32_t flushMaxBytes_; + + // max event size + uint32_t maxEventSize_; + + // writer thread id + pthread_t writer_; + + // variables that determine position of head/tail of circular buffer + int headPos_, tailPos_; + + // variables indicating whether the buffer is full or empty + bool isFull_, isEmpty_; + pthread_cond_t notFull_, notEmpty_; + bool closing_; + + // To keep track of whether the buffer has been flushed + pthread_cond_t flushed_; + bool notFlushed_; + + // Mutex that is grabbed when enqueueing, dequeueing and flushing + // from the circular buffer + pthread_mutex_t mutex_; + + // File information + string filename_; + int fd_; + + // Offset within the file + long long offset_; + + void openOutputFile(); + uint32_t getCurrentTime(); + +}; + +}}} + +#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cc b/lib/cpp/src/transport/TBufferedRouterTransport.cc new file mode 100644 index 00000000..7c09953e --- /dev/null +++ b/lib/cpp/src/transport/TBufferedRouterTransport.cc @@ -0,0 +1,79 @@ +#include "TBufferedRouterTransport.h" +#include "Thrift.h" +using std::string; + +namespace facebook { namespace thrift { namespace transport { + +uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) { + uint32_t need = len; + + // We don't have enough data yet + if (rLen_-rPos_ < need) { + // Copy out whatever we have + if (rLen_-rPos_ > 0) { + memcpy(buf, rBuf_+rPos_, rLen_-rPos_); + need -= rLen_-rPos_; + buf += rLen_-rPos_; + rPos_ = rLen_; + } + + // Double the size of the underlying buffer if it is full + if (rLen_ == rBufSize_) { + rBufSize_ *=2; + rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); + } + + // try to fill up the buffer + rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_); + } + + // Hand over whatever we have + uint32_t give = need; + if (rLen_-rPos_ < give) { + give = rLen_-rPos_; + } + memcpy(buf, rBuf_+rPos_, give); + rPos_ += give; + need -= give; + + return (len - need); +} + +void TBufferedRouterTransport::write(const uint8_t* buf, uint32_t len) { + if (len == 0) { + return; + } + + if (len + wLen_ >= wBufSize_) { + uint32_t copy = wBufSize_ - wLen_; + memcpy(wBuf_ + wLen_, buf, copy); + trans_->write(wBuf_+wPos_, wBufSize_-wPos_); + wLen_ += copy; + wPos_ = wLen_; + + uint32_t left = len-copy; + if (left > 0) { + // double the size of the write buffer + wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * wBufSize_ * 2); + memcpy(wBuf_ + wLen_, buf+copy, left); + wLen_ += left; + wBufSize_*=2; + } + } else { + memcpy(wBuf_+wLen_, buf, len); + wLen_ += len; + } +} + +void TBufferedRouterTransport::flush() { + // Write out any data waiting in the write buffer + if (wLen_-wPos_ > 0) { + trans_->write(wBuf_+wPos_, wLen_-wPos_); + wPos_ = wLen_; + } + + // Flush the underlying transport + trans_->flush(); +} + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h new file mode 100644 index 00000000..b01faac8 --- /dev/null +++ b/lib/cpp/src/transport/TBufferedRouterTransport.h @@ -0,0 +1,92 @@ +#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ 1 + +#include "TTransport.h" +#include "Thrift.h" +#include + +#include + +namespace facebook { namespace thrift { namespace transport { + +using namespace boost; + +/** + * BufferedRouterTransport. Funcationally equivalent to TBufferedTransport + * but routes the request to another Transport (typical use case is to route + * the request to TBufferedFileWriter to store the request on disk). The + * underlying buffer expands to a keep a copy of the entire request/response. + * + * @author Aditya Agarwal + */ +class TBufferedRouterTransport : public TTransport { + public: + TBufferedRouterTransport(shared_ptr trans, shared_ptr rtrans) : + trans_(trans), + rtrans_(rtrans), + rBufSize_(512), rPos_(0), rLen_(0), + wBufSize_(512), wPos_(0), wLen_(0) { + + rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); + wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); + } + + TBufferedRouterTransport(shared_ptr trans, shared_ptr rtrans, uint32_t sz) : + trans_(trans), + rtrans_(rtrans), + rBufSize_(512), rPos_(0), rLen_(0), + wBufSize_(sz), wPos_(0), wLen_(0) { + + rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); + wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); + } + + ~TBufferedRouterTransport() { + free(rBuf_); + free(wBuf_); + } + + bool isOpen() { + return trans_->isOpen(); + } + + void open() { + trans_->open(); + } + + void close() { + trans_->close(); + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void readEnd() { + rtrans_->write(rBuf_, rLen_); + + // reset state + rLen_ = 0; + rPos_ = 0; + } + + void write(const uint8_t* buf, uint32_t len); + + void flush(); + + protected: + shared_ptr trans_; + shared_ptr rtrans_; + + uint8_t* rBuf_; + uint32_t rBufSize_; + uint32_t rPos_; + uint32_t rLen_; + + uint8_t* wBuf_; + uint32_t wBufSize_; + uint32_t wPos_; + uint32_t wLen_; +}; + +}}} // facebook::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h new file mode 100644 index 00000000..2b82570c --- /dev/null +++ b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h @@ -0,0 +1,35 @@ +#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ +#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ 1 + +#include +#include +#include + +namespace facebook { namespace thrift { namespace transport { + +/** + * Wraps a transport into a bufferedRouter instance. + * + * @author Aditya Agarwal + */ +class TBufferedRouterTransportFactory : public TTransportFactory { + public: + TBufferedRouterTransportFactory(boost::shared_ptr rTrans): rTrans_(rTrans) {} + + virtual ~TBufferedRouterTransportFactory() {} + + /** + * Wraps the transport into a buffered one. + */ + virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { + boost::shared_ptr buffered(new TBufferedRouterTransport(trans, rTrans_)); + return std::make_pair(buffered, buffered); + } + + private: + boost::shared_ptr rTrans_; +}; + +}}} + +#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ -- 2.17.1