From a0c92417a0eedcb65361d4e8f0d9ae679554c715 Mon Sep 17 00:00:00 2001 From: James Wang Date: Wed, 14 Feb 2007 23:22:08 +0000 Subject: [PATCH] Thrift: Changing TFileTransport to use the same buffer-swap mechanism that pillar does Reviewed by: aditya Tested with thrift test class Notes: TFileTransport used to use a circular buffer. Changed this to use two large buffers, one for reading and one for writing, that are swapped whenever the writer thread finishes with the last write. Also changed a few default constants -- force_flush timeout is now 3 sec, default buffer size is 10000 entries git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664997 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/transport/TFileTransport.cpp | 388 +++++++++++++---------- lib/cpp/src/transport/TFileTransport.h | 93 ++++-- 2 files changed, 286 insertions(+), 195 deletions(-) diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index 61f1b805..3f625bc4 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -13,48 +13,39 @@ using namespace std; namespace facebook { namespace thrift { namespace transport { -TFileTransport::TFileTransport(string path) { - filename_ = path; - openLogFile(); - - // set initial values to default - readBuffSize_ = DEFAULT_READ_BUFF_SIZE; - readTimeout_ = DEFAULT_READ_TIMEOUT_MS; - chunkSize_ = DEFAULT_CHUNK_SIZE; - eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE; - flushMaxUs_ = DEFAULT_FLUSH_MAX_US; - flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES; - maxEventSize_ = DEFAULT_MAX_EVENT_SIZE; - maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS; - eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US; - - // initialize buffer lazily - buffer_ = 0; - - // buffer is initially empty - isEmpty_ = true; - isFull_ = false; - - // both head and tail are initially at 0 - headPos_ = 0; - tailPos_ = 0; - +TFileTransport::TFileTransport(string path) + : readState_() + , readBuff_(NULL) + , currentEvent_(NULL) + , readBuffSize_(DEFAULT_READ_BUFF_SIZE) + , readTimeout_(NO_TAIL_READ_TIMEOUT) + , chunkSize_(DEFAULT_CHUNK_SIZE) + , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE) + , flushMaxUs_(DEFAULT_FLUSH_MAX_US) + , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES) + , maxEventSize_(DEFAULT_MAX_EVENT_SIZE) + , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS) + , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US) + , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US) + , writerThreadId_(0) + , dequeueBuffer_(NULL) + , enqueueBuffer_(NULL) + , closing_(false) + , forceFlush_(false) + , filename_(path) + , fd_(0) + , bufferAndThreadInitialized_(false) + , offset_(0) + , lastBadChunk_(0) + , numCorruptedEventsInChunk_(0) +{ // initialize all the condition vars/mutexes pthread_mutex_init(&mutex_, NULL); pthread_cond_init(¬Full_, NULL); pthread_cond_init(¬Empty_, NULL); pthread_cond_init(&flushed_, NULL); - // not closing the file during init - closing_ = false; - - // create writer thread on demand - writerThreadId_ = 0; - - // read related variables - // read buff initialized lazily - readBuff_ = 0; - currentEvent_ = 0; + openLogFile(); } void TFileTransport::resetOutputFile(int fd, string filename, long long offset) { @@ -96,18 +87,27 @@ TFileTransport::~TFileTransport() { // deal in the common case because writing is quick pthread_join(writerThreadId_, NULL); + writerThreadId_ = 0; } - if (buffer_) { - delete[] buffer_; + if (dequeueBuffer_) { + delete dequeueBuffer_; + dequeueBuffer_ = NULL; + } + + if (enqueueBuffer_) { + delete enqueueBuffer_; + enqueueBuffer_ = NULL; } if (readBuff_) { delete readBuff_; + readBuff_ = NULL; } if (currentEvent_) { delete currentEvent_; + currentEvent_ = NULL; } // close logfile @@ -118,6 +118,25 @@ TFileTransport::~TFileTransport() { } } +bool TFileTransport::initBufferAndWriteThread() { + if (bufferAndThreadInitialized_) { + T_ERROR("Trying to double-init TFileTransport"); + return false; + } + + if (writerThreadId_ == 0) { + if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) { + T_ERROR("Could not create writer thread"); + return false; + } + } + + dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); + enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); + bufferAndThreadInitialized_ = true; + + return true; +} void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { // make sure that event size is valid @@ -139,46 +158,34 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool bl memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); toEnqueue->eventSize_ = eventLen + 4; - // T_DEBUG_L(1, "event size: %u", eventLen); - return enqueueEvent(toEnqueue, blockUntilFlush); -} - -void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) { // lock mutex pthread_mutex_lock(&mutex_); // make sure that enqueue buffer is initialized and writer thread is running - if (buffer_ == 0) { - buffer_ = new eventInfo*[eventBufferSize_]; - } - if (writerThreadId_ == 0) { - if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) { - T_ERROR("Error creating write thread"); + if (!bufferAndThreadInitialized_) { + if (!initBufferAndWriteThread()) { + delete toEnqueue; + pthread_mutex_unlock(&mutex_); return; } } // Can't enqueue while buffer is full - while(isFull_) { + while (enqueueBuffer_->isFull()) { pthread_cond_wait(¬Full_, &mutex_); } - // make a copy and enqueue at tail of buffer - buffer_[tailPos_] = toEnqueue; - tailPos_ = (tailPos_+1) % eventBufferSize_; - - // mark the buffer as non-empty - isEmpty_ = false; - - // circular buffer has wrapped around (and is full) - if(tailPos_ == headPos_) { - // T_DEBUG("queue is full"); - isFull_ = true; + // add to the buffer + if (!enqueueBuffer_->addEvent(toEnqueue)) { + delete toEnqueue; + pthread_mutex_unlock(&mutex_); + return; } // signal anybody who's waiting for the buffer to be non-empty pthread_cond_signal(¬Empty_); - if(blockUntilFlush) { + + if (blockUntilFlush) { pthread_cond_wait(&flushed_, &mutex_); } @@ -186,61 +193,50 @@ void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) { // because condition variables can get triggered by the os for no reason // it is probably a non-factor for the time being pthread_mutex_unlock(&mutex_); - } -eventInfo* TFileTransport::dequeueEvent(long long deadline) { +bool TFileTransport::swapEventBuffers(long long deadline) { //deadline time struc struct timespec ts; - if(deadline) { + if (deadline) { ts.tv_sec = deadline/(1000*1000); ts.tv_nsec = (deadline%(1000*1000))*1000; } // wait for the queue to fill up pthread_mutex_lock(&mutex_); - while(isEmpty_) { + while (enqueueBuffer_->isEmpty()) { // do a timed wait on the condition variable - if(deadline) { + if (deadline) { int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts); if(e == ETIMEDOUT) { break; } - } - else { + } else { // just wait until the buffer gets an item pthread_cond_wait(¬Empty_, &mutex_); } } - string ret; - bool doSignal = false; + bool swapped = false; // could be empty if we timed out - eventInfo* retEvent = 0; - if(!isEmpty_) { - retEvent = buffer_[headPos_]; - headPos_ = (headPos_+1) % eventBufferSize_; + if (!enqueueBuffer_->isEmpty()) { + TFileTransportBuffer *temp = enqueueBuffer_; + enqueueBuffer_ = dequeueBuffer_; + dequeueBuffer_ = temp; - isFull_ = false; - doSignal = true; - - // check if this is the last item in the buffer - if(headPos_ == tailPos_) { - isEmpty_ = true; - } + swapped = true; } // unlock the mutex and signal if required pthread_mutex_unlock(&mutex_); - if(doSignal) { + + if (swapped) { pthread_cond_signal(¬Full_); } - if (!retEvent) { - retEvent = new eventInfo(); - } - return retEvent; + return swapped; } @@ -268,105 +264,99 @@ void TFileTransport::writerThread() { return; } - //long long start = now(); - eventInfo* outEvent = dequeueEvent(nextFlush); - if (!outEvent) { - T_DEBUG_L(1, "Got an empty event"); - return; - } - - // sanity check on event - if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { - T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_); - delete(outEvent); - continue; - } - //long long diff = now()-start; - //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000); - - // If chunking is required, then make sure that msg does not cross chunk boundary - if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { - - // event size must be less than chunk size - if(outEvent->eventSize_ > chunkSize_) { - T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event", - outEvent->eventSize_, chunkSize_); - delete(outEvent); - continue; - } - - long long chunk1 = offset_/chunkSize_; - long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_; - - // if adding this event will cross a chunk boundary, pad the chunk with zeros - if(chunk1 != chunk2) { - int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_); - - // sanity check - if (padding <= 0) { - T_DEBUG("Padding is empty, skipping event"); - continue; + if (swapEventBuffers(nextFlush)) { + eventInfo* outEvent; + while (NULL != (outEvent = dequeueBuffer_->getNext())) { + if (!outEvent) { + T_DEBUG_L(1, "Got an empty event"); + return; } - if (padding > (int32_t)chunkSize_) { - T_DEBUG("padding is larger than chunk size, skipping event"); - continue; + + // sanity check on event + if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { + T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_); + continue; } - uint8_t zeros[padding]; - bzero(zeros, padding); - // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", - // padding, offset_, chunk2); - if(-1 == ::write(fd_, zeros, padding)) { - perror("TFileTransport: error while padding zeros"); - throw TTransportException("TFileTransport: error while padding zeros"); + + // If chunking is required, then make sure that msg does not cross chunk boundary + if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { + + // event size must be less than chunk size + if(outEvent->eventSize_ > chunkSize_) { + T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event", + outEvent->eventSize_, chunkSize_); + continue; + } + + long long chunk1 = offset_/chunkSize_; + long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_; + + // if adding this event will cross a chunk boundary, pad the chunk with zeros + if(chunk1 != chunk2) { + // refetch the offset to keep in sync + offset_ = lseek(fd_, 0, SEEK_CUR); + int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_); + + // sanity check + if (padding <= 0) { + T_DEBUG("Padding is empty, skipping event"); + continue; + } + if (padding > (int32_t)chunkSize_) { + T_DEBUG("padding is larger than chunk size, skipping event"); + continue; + } + uint8_t zeros[padding]; + bzero(zeros, padding); + // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", + // padding, offset_, chunk2); + if(-1 == ::write(fd_, zeros, padding)) { + perror("TFileTransport: error while padding zeros"); + throw TTransportException("TFileTransport: error while padding zeros"); + } + unflushed += padding; + offset_ += padding; + } } - unflushed += padding; - offset_ += padding; - } - } - // write the dequeued event to the file - if(outEvent->eventSize_ > 0) { - if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { - perror("TFileTransport: error while writing event"); - throw TTransportException("TFileTransport: error while writing event"); - } + // write the dequeued event to the file + if(outEvent->eventSize_ > 0) { + if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { + perror("TFileTransport: error while writing event"); + throw TTransportException("TFileTransport: error while writing event"); + } - unflushed += outEvent->eventSize_; - offset_ += outEvent->eventSize_; + unflushed += outEvent->eventSize_; + offset_ += outEvent->eventSize_; + } + } + dequeueBuffer_->reset(); } // couple of cases from which a flush could be triggered - if((getCurrentTime() >= nextFlush && unflushed > 0) || + if ((getCurrentTime() >= nextFlush && unflushed > 0) || unflushed > flushMaxBytes_ || - (outEvent && (outEvent->eventSize_== 0)) ) { - //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_); + forceFlush_) { // sync (force flush) file to disk fsync(fd_); nextFlush = getCurrentTime() + flushMaxUs_; unflushed = 0; - // notify anybody(thing?) waiting for flush completion - pthread_mutex_lock(&mutex_); - notFlushed_ = false; - pthread_mutex_unlock(&mutex_); + // notify anybody waiting for flush completion + forceFlush_ = false; pthread_cond_broadcast(&flushed_); } - // deallocate dequeued event - delete(outEvent); } } void TFileTransport::flush() { - eventInfo* flushEvent = new eventInfo(); - notFlushed_ = true; - - enqueueEvent(flushEvent, false); - // wait for flush to take place pthread_mutex_lock(&mutex_); - while(notFlushed_) { + forceFlush_ = true; + + while (forceFlush_) { pthread_cond_wait(&flushed_, &mutex_); } @@ -569,14 +559,14 @@ bool TFileTransport::isEventCorrupted() { void TFileTransport::performRecovery() { // perform some kickass recovery uint32_t curChunk = getCurChunk(); - if (lastBadChunk == curChunk) { - numCorruptedEventsinChunk++; + if (lastBadChunk_ == curChunk) { + numCorruptedEventsInChunk_++; } else { - lastBadChunk = curChunk; - numCorruptedEventsinChunk = 1; + lastBadChunk_ = curChunk; + numCorruptedEventsInChunk_ = 1; } - if (numCorruptedEventsinChunk < maxCorruptedEvents_) { + if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { // maybe there was an error in reading the file from disk // seek to the beginning of chunk and try again seekToChunk(curChunk); @@ -706,6 +696,70 @@ uint32_t TFileTransport::getCurrentTime() { return ret; } +TFileTransportBuffer::TFileTransportBuffer(uint32_t size) + : bufferMode_(WRITE) + , writePoint_(0) + , readPoint_(0) + , size_(size) +{ + buffer_ = new eventInfo*[size]; +} + +TFileTransportBuffer::~TFileTransportBuffer() { + if (buffer_) { + for (uint32_t i = 0; i < writePoint_; i++) { + delete buffer_[i]; + } + delete[] buffer_; + buffer_ = NULL; + } +} + +bool TFileTransportBuffer::addEvent(eventInfo *event) { + if (bufferMode_ == READ) { + perror("Trying to write to a buffer in read mode"); + } + if (writePoint_ < size_) { + buffer_[writePoint_++] = event; + return true; + } else { + // buffer is full + return false; + } +} + +eventInfo* TFileTransportBuffer::getNext() { + if (bufferMode_ == WRITE) { + bufferMode_ = READ; + } + if (readPoint_ < writePoint_) { + return buffer_[readPoint_++]; + } else { + // no more entries + return NULL; + } +} + +void TFileTransportBuffer::reset() { + if (bufferMode_ == WRITE || writePoint_ > readPoint_) { + T_DEBUG("Resetting a buffer with unread entries"); + } + // Clean up the old entries + for (uint32_t i = 0; i < writePoint_; i++) { + delete buffer_[i]; + } + bufferMode_ = WRITE; + writePoint_ = 0; + readPoint_ = 0; +} + +bool TFileTransportBuffer::isFull() { + return writePoint_ == size_; +} + +bool TFileTransportBuffer::isEmpty() { + return writePoint_ == 0; +} TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index a58be2c2..237234e0 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -74,6 +74,47 @@ typedef struct readState { } readState; +/** + * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events + * to be written to disk. Should be used in the following way: + * 1) Buffer created + * 2) Buffer written to (addEvent) + * 3) Buffer read from (getNext) + * 4) Buffer reset (reset) + * 5) Go back to 2, or destroy buffer + * + * The buffer should never be written to after it is read from, unless it is reset first. + * Note: The above rules are enforced mainly for debugging its sole client TFileTransport + * which uses the buffer in this way. + * + * @author James Wang + */ +class TFileTransportBuffer { + public: + TFileTransportBuffer(uint32_t size); + ~TFileTransportBuffer(); + + bool addEvent(eventInfo *event); + eventInfo* getNext(); + void reset(); + bool isFull(); + bool isEmpty(); + + private: + TFileTransportBuffer(); // should not be used + + enum mode { + WRITE, + READ + }; + mode bufferMode_; + + uint32_t writePoint_; + uint32_t readPoint_; + uint32_t size_; + eventInfo** buffer_; +}; + /** * File implementation of a transport. Reads and writes are done to a * file on disk. @@ -138,14 +179,13 @@ class TFileTransport : public TTransport { } void setEventBufferSize(uint32_t bufferSize) { - if (bufferSize) { - if (buffer_) { - delete[] buffer_; - } - eventBufferSize_ = bufferSize; - buffer_ = new eventInfo*[eventBufferSize_]; + if (bufferAndThreadInitialized_) { + perror("Cannot change the buffer size after writer thread started"); + return; } + eventBufferSize_ = bufferSize; } + uint32_t getEventBufferSize() { return eventBufferSize_; } @@ -194,8 +234,8 @@ class TFileTransport : public TTransport { private: // helper functions for writing to a file void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); - void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush); - eventInfo* dequeueEvent(long long deadline); + bool swapEventBuffers(long long deadline); + bool initBufferAndWriteThread(); // control for writer thread static void* startWriterThread(void* ptr) { @@ -218,7 +258,6 @@ class TFileTransport : public TTransport { // Class variables readState readState_; uint8_t* readBuff_; - eventInfo* currentEvent_; uint32_t readBuffSize_; @@ -231,17 +270,13 @@ class TFileTransport : public TTransport { uint32_t chunkSize_; static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; - // size of string buffer + // size of event buffers uint32_t eventBufferSize_; - static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024; + static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000; - // circular buffer to hold data in before it is flushed. This is an array of strings. Each - // element of the array stores a msg that needs to be written to the file - eventInfo** buffer_; - // max number of microseconds that can pass without flushing uint32_t flushMaxUs_; - static const uint32_t DEFAULT_FLUSH_MAX_US = 20000; + static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000; // max number of bytes that can be written without flushing uint32_t flushMaxBytes_; @@ -266,33 +301,35 @@ class TFileTransport : public TTransport { // writer thread id pthread_t writerThreadId_; - // variables that determine position of head/tail of circular buffer - int headPos_, tailPos_; - - // variables indicating whether the buffer is full or empty - bool isFull_, isEmpty_; + // buffers to hold data before it is flushed. Each element of the buffer stores a msg that + // needs to be written to the file. The buffers are swapped by the writer thread. + TFileTransportBuffer *dequeueBuffer_; + TFileTransportBuffer *enqueueBuffer_; + + // conditions used to block when the buffer is full or empty pthread_cond_t notFull_, notEmpty_; - bool closing_; + volatile bool closing_; // To keep track of whether the buffer has been flushed pthread_cond_t flushed_; - bool notFlushed_; + volatile bool forceFlush_; - // Mutex that is grabbed when enqueueing, dequeueing and flushing - // from the circular buffer + // Mutex that is grabbed when enqueueing and swapping the read/write buffers pthread_mutex_t mutex_; // File information string filename_; int fd_; + // Whether the writer thread and buffers have been initialized + bool bufferAndThreadInitialized_; + // Offset within the file off_t offset_; // event corruption information - uint32_t lastBadChunk; - uint32_t numCorruptedEventsinChunk; - + uint32_t lastBadChunk_; + uint32_t numCorruptedEventsInChunk_; }; // Exception thrown when EOF is hit -- 2.17.1