From e9ef8d7ce98484599730ffc0a2f17d64dc69d944 Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Fri, 8 Dec 2006 23:52:57 +0000 Subject: [PATCH] -- TFileTransport (Thrift Logfile) Summary: -- TBufferedFileWriter.h/cpp will be renamed to TFileTransport.h/cpp in the next commit. -- TFileTransport is essentially reading and writing thrift calls to/from a file instead of a socket. -- The code/design is somewhat similar to pillar_logfile but there are some significant changes. todo: -- still need to do error correction/detection Reviewed By: Mark Slee Test Plan: -- Wrote test in thrift/test/cpp/src/main.cpp that appends to a file and replays requests Notes: It's finally time to port search over to Thrift git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664889 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/transport/TBufferedFileWriter.cpp | 593 ++++++++++++++---- lib/cpp/src/transport/TBufferedFileWriter.h | 286 +++++++-- lib/cpp/src/transport/TTransportException.h | 1 + lib/cpp/src/transport/TTransportUtils.h | 5 +- test/cpp/src/main.cpp | 55 +- 5 files changed, 760 insertions(+), 180 deletions(-) diff --git a/lib/cpp/src/transport/TBufferedFileWriter.cpp b/lib/cpp/src/transport/TBufferedFileWriter.cpp index c3ed2500..39e20745 100644 --- a/lib/cpp/src/transport/TBufferedFileWriter.cpp +++ b/lib/cpp/src/transport/TBufferedFileWriter.cpp @@ -1,43 +1,34 @@ #include "TBufferedFileWriter.h" +#include "TTransportUtils.h" #include -#include -#include -#include -#include -#include + #include #include #include +#include +#include -using std::string; +using namespace std; namespace facebook { namespace thrift { namespace transport { -TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) { - init(filename, sz, 0, 0); -} - -TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) { - init(filename, sz, fd, offset); -} - -void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) { - // validate buffer size - sz_ = sz; - if (sz_ <= 0) { - throw TTransportException("invalid input buffer size"); - } - - // set file-related variables - fd_ = 0; - resetOutputFile(fd, filename, offset); +TFileTransport::TFileTransport(string path) { + filename_ = path; + openLogFile(); - // set default values of flush related params - flushMaxBytes_ = 1024 * 100; - flushMaxUs_ = 20 * 1000; + // set initial values to default + readBuffSize_ = DEFAULT_READ_BUFF_SIZE; + readTimeout_ = DEFAULT_READ_TIMEOUT_MS; + chunkSize_ = DEFAULT_CHUNK_SIZE; + eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE; + flushMaxUs_ = DEFAULT_FLUSH_MAX_US; + flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES; + maxEventSize_ = DEFAULT_MAX_EVENT_SIZE; + maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS; + eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US; - // allocate event buffer - buffer_ = new eventInfo[sz_]; + // initialize buffer lazily + buffer_ = 0; // buffer is initially empty isEmpty_ = true; @@ -47,9 +38,6 @@ void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long o headPos_ = 0; tailPos_ = 0; - // for lack of a better option, set chunk size to 0. Users can change this to whatever they want - chunkSize_ = 0; - // initialize all the condition vars/mutexes pthread_mutex_init(&mutex_, NULL); pthread_cond_init(¬Full_, NULL); @@ -59,42 +47,70 @@ void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long o // not closing the file during init closing_ = false; - // spawn writer thread - pthread_create(&writer_, NULL, startWriterThread, (void *)this); + // create writer thread on demand + writerThreadId_ = 0; + + // read related variables + // read buff initialized lazily + readBuff_ = 0; + currentEvent_ = 0; } -void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) { +void TFileTransport::resetOutputFile(int fd, string filename, long long offset) { filename_ = filename; offset_ = offset; // check if current file is still open if (fd_ > 0) { - // TODO: unclear if this should throw an error - fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str()); + // TODO: should there be a flush here? + fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str()); ::close(fd_); } - fd_ = fd; + + if (fd) { + fd_ = fd; + } else { + // open file if the input fd is 0 + openLogFile(); + } } -TBufferedFileWriter::~TBufferedFileWriter() { - // flush output buffer - flush(); +TFileTransport::~TFileTransport() { + // TODO: Make sure the buffer is actually flushed + // flush the buffer if a writer thread is active + if (writerThreadId_ > 0) { + // flush output buffer + flush(); - // send a signal to write thread to end - closing_ = true; - pthread_join(writer_, NULL); + // send a signal to write thread to end + closing_ = true; + pthread_join(writerThreadId_, NULL); + } - delete[] buffer_; + if (buffer_) { + delete[] buffer_; + } + + if (readBuff_) { + delete readBuff_; + } + + if (currentEvent_) { + delete currentEvent_; + } - // TODO: should the file be closed here? + // close logfile + if (fd_ > 0) { + ::close(fd_); + } } -void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { +void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { // make sure that event size is valid if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) { - // T_ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_); + T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_); return; } @@ -103,17 +119,33 @@ void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bo return; } - eventInfo toEnqueue; - uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen); - toEnqueue.payLoad_ = bufCopy; - toEnqueue.eventSize_ = eventLen; + eventInfo* toEnqueue = new eventInfo(); + toEnqueue->eventBuff_ = (uint8_t *)malloc((sizeof(uint8_t) * eventLen) + 4); + // first 4 bytes is the event length + memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4); + // actual event contents + memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); + toEnqueue->eventSize_ = eventLen + 4; + // T_DEBUG_L(1, "event size: %u", eventLen); return enqueueEvent(toEnqueue, blockUntilFlush); } -void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) { - // Lock mutex +void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) { + // lock mutex pthread_mutex_lock(&mutex_); + + // make sure that enqueue buffer is initialized and writer thread is running + if (buffer_ == 0) { + buffer_ = new eventInfo*[eventBufferSize_]; + } + if (writerThreadId_ == 0) { + if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) { + T_ERROR("Error creating write thread"); + return; + } + } + // Can't enqueue while buffer is full while(isFull_) { pthread_cond_wait(¬Full_, &mutex_); @@ -121,7 +153,7 @@ void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUnt // make a copy and enqueue at tail of buffer buffer_[tailPos_] = toEnqueue; - tailPos_ = (tailPos_+1) % sz_; + tailPos_ = (tailPos_+1) % eventBufferSize_; // mark the buffer as non-empty isEmpty_ = false; @@ -146,7 +178,7 @@ void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUnt } -eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) { +eventInfo* TFileTransport::dequeueEvent(long long deadline) { //deadline time struc struct timespec ts; if(deadline) { @@ -174,10 +206,10 @@ eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) { bool doSignal = false; // could be empty if we timed out - eventInfo retEvent; + eventInfo* retEvent = 0; if(!isEmpty_) { retEvent = buffer_[headPos_]; - headPos_ = (headPos_+1) % sz_; + headPos_ = (headPos_+1) % eventBufferSize_; isFull_ = false; doSignal = true; @@ -194,59 +226,22 @@ eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) { pthread_cond_signal(¬Full_); } - return retEvent; -} - - -void TBufferedFileWriter::flush() -{ - eventInfo flushEvent; - flushEvent.payLoad_ = NULL; - flushEvent.eventSize_ = 0; - - notFlushed_ = true; - - enqueueEvent(flushEvent, false); - - // wait for flush to take place - pthread_mutex_lock(&mutex_); - - while(notFlushed_) { - pthread_cond_wait(&flushed_, &mutex_); + if (!retEvent) { + retEvent = new eventInfo(); } - - pthread_mutex_unlock(&mutex_); -} - -void TBufferedFileWriter::openOutputFile() { - mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH; - fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode); - - // make sure open call was successful - if(fd_ == -1) { - char errorMsg[1024]; - sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str()); - perror(errorMsg); - throw TTransportException(errorMsg); - } -} - -uint32_t TBufferedFileWriter::getCurrentTime() { - long long ret; - struct timeval tv; - gettimeofday(&tv, NULL); - ret = tv.tv_sec; - ret = ret*1000*1000 + tv.tv_usec; - return ret; + return retEvent; } -void TBufferedFileWriter::writerThread() { +void TFileTransport::writerThread() { // open file if it is not open if(!fd_) { - openOutputFile(); + openLogFile(); } + // set the offset to the correct value (EOF) + offset_ = lseek(fd_, 0, SEEK_END); + // Figure out the next time by which a flush must take place long long nextFlush = getCurrentTime() + flushMaxUs_; uint32_t unflushed = 0; @@ -255,43 +250,51 @@ void TBufferedFileWriter::writerThread() { // this will only be true when the destructor is being invoked if(closing_) { if(-1 == ::close(fd_)) { - perror("TBufferedFileWriter: error in close"); + perror("TFileTransport: error in close"); } throw TTransportException("error in file close"); + fd_ = 0; + return; } //long long start = now(); - eventInfo outEvent = dequeueEvent(nextFlush); + eventInfo* outEvent = dequeueEvent(nextFlush); + if (!outEvent) { + T_DEBUG_L(1, "Got an empty event"); + return; + } // sanity check on event - if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) { - T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_); + if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { + T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_); + delete(outEvent); continue; } //long long diff = now()-start; //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000); // If chunking is required, then make sure that msg does not cross chunk boundary - if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) { + if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { // event size must be less than chunk size - if(outEvent.eventSize_ > chunkSize_) { - T_ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event", - outEvent.eventSize_, chunkSize_); + if(outEvent->eventSize_ > chunkSize_) { + T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event", + outEvent->eventSize_, chunkSize_); + delete(outEvent); continue; } long long chunk1 = offset_/chunkSize_; - long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_; + long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_; // if adding this event will cross a chunk boundary, pad the chunk with zeros if(chunk1 != chunk2) { - int padding = (int)(chunk2*chunkSize_ - offset_); + int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_); // sanity check if (padding <= 0) { T_DEBUG("Padding is empty, skipping event"); - continue; + continue; } if (padding > (int32_t)chunkSize_) { T_DEBUG("padding is larger than chunk size, skipping event"); @@ -300,9 +303,10 @@ void TBufferedFileWriter::writerThread() { // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2); uint8_t zeros[padding]; bzero(zeros, padding); + T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_); if(-1 == ::write(fd_, zeros, padding)) { - perror("TBufferedFileWriter: error while padding zeros"); - throw TTransportException("TBufferedFileWriter: error while padding zeros"); + perror("TFileTransport: error while padding zeros"); + throw TTransportException("TFileTransport: error while padding zeros"); } unflushed += padding; offset_ += padding; @@ -310,24 +314,21 @@ void TBufferedFileWriter::writerThread() { } // write the dequeued event to the file - if(outEvent.eventSize_ > 0) { - if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) { - perror("TBufferedFileWriter: error while writing event"); + if(outEvent->eventSize_ > 0) { + if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { + perror("TFileTransport: error while writing event"); // TODO: should this trigger an exception or simply continue? - throw TTransportException("TBufferedFileWriter: error while writing event"); + throw TTransportException("TFileTransport: error while writing event"); } - // deallocate payload - free(outEvent.payLoad_); - - unflushed += outEvent.eventSize_; - offset_ += outEvent.eventSize_; + unflushed += outEvent->eventSize_; + offset_ += outEvent->eventSize_; } // couple of cases from which a flush could be triggered if((getCurrentTime() >= nextFlush && unflushed > 0) || unflushed > flushMaxBytes_ || - (outEvent.eventSize_ == 0) ) { + (outEvent && (outEvent->eventSize_== 0)) ) { //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_); // sync (force flush) file to disk @@ -341,7 +342,347 @@ void TBufferedFileWriter::writerThread() { pthread_mutex_unlock(&mutex_); pthread_cond_broadcast(&flushed_); } + // deallocate dequeued event + delete(outEvent); + } +} + +void TFileTransport::flush() { + eventInfo* flushEvent = new eventInfo(); + notFlushed_ = true; + + enqueueEvent(flushEvent, false); + + // wait for flush to take place + pthread_mutex_lock(&mutex_); + + while(notFlushed_) { + pthread_cond_wait(&flushed_, &mutex_); + } + + pthread_mutex_unlock(&mutex_); +} + + +uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) { + uint32_t have = 0; + uint32_t get = 0; + + while (have < len) { + get = read(buf+have, len-have); + if (get <= 0) { + throw TEOFException(); + } + have += get; } + + return have; +} + +uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { + // check if there an event is ready to be read + if (!currentEvent_) { + readEvent(); + } + + // did not manage to read an event from the file. This could have happened + // if the timeout expired or there was some other error + if (!currentEvent_) { + return 0; + } + + // read as much of the current event as possible + int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; + if (remaining <= (int32_t)len) { + memcpy(buf, + currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, + remaining); + delete(currentEvent_); + currentEvent_ = 0; + return remaining; + } + + // read as much as possible + memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len); + currentEvent_->eventBuffPos_ += len; + return len; +} + +bool TFileTransport::readEvent() { + int readTries = 0; + + if (!readBuff_) { + readBuff_ = new uint8_t[readBuffSize_]; + } + + while (1) { + // check if there is anything in the read buffer + if (readState_.bufferPtr_ == readState_.bufferLen_) { + // advance the offset pointer + offset_ += readState_.bufferLen_; + readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_); + if (readState_.bufferLen_) { + T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); + } + readState_.bufferPtr_ = 0; + readState_.lastDispatchPtr_ = 0; + + // read error + if (readState_.bufferLen_ == -1) { + readState_.resetAllValues(); + perror("TFileTransport: error while reading from file"); + // TODO: should this trigger an exception or simply continue? + throw TTransportException("TFileTransport: error while reading from file"); + } else if (readState_.bufferLen_ == 0) { // EOF + // wait indefinitely if there is no timeout + if (readTimeout_ == -1) { + usleep(eofSleepTime_); + continue; + } else if (readTimeout_ == 0) { + // reset state + readState_.resetState(0); + return false; + } else if (readTimeout_ > 0) { + // timeout already expired once + if (readTries > 0) { + readState_.resetState(0); + return false; + } else { + usleep(readTimeout_ * 1000); + readTries++; + continue; + } + } + } + } + + readTries = 0; + + // attempt to read an event from the buffer + while(readState_.bufferPtr_ < readState_.bufferLen_) { + if (readState_.readingSize_) { + if(readState_.eventSizeBuffPos_ == 0) { + if ( (offset_ + readState_.bufferPtr_)/chunkSize_ != + ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) { + // skip one byte towards chunk boundary + // T_DEBUG_L(1, "Skipping a byte"); + readState_.bufferPtr_++; + continue; + } + } + + readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = + readBuff_[readState_.bufferPtr_++]; + bool eventCorruption = false; + if (readState_.eventSizeBuffPos_ == 4) { + // 0 length event indicates padding + if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) { + T_DEBUG_L(1, "Got padding"); + readState_.resetState(readState_.lastDispatchPtr_); + continue; + } + // got a valid event + readState_.readingSize_ = false; + if (readState_.event_) { + delete(readState_.event_); + } + readState_.event_ = new eventInfo(); + readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_)); + + T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_); + + // TODO + // make sure event is valid, an error is triggered if: + // 1. Event size is larger than user-speficied max-event size + + // 2. Event size is larger than chunk size + + // 3. size indicates that event crosses chunk boundary + + } + + if (eventCorruption) { + // perform some kickass recovery + } + } else { + if (!readState_.event_->eventBuff_) { + readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_]; + readState_.event_->eventBuffPos_ = 0; + } + // take either the entire event or the remaining bytes in the buffer + int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_), + readState_.event_->eventSize_ - readState_.event_->eventBuffPos_); + + // copy data from read buffer into event buffer + memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, + readBuff_ + readState_.bufferPtr_, + reclaimBuffer); + + // increment position ptrs + readState_.event_->eventBuffPos_ += reclaimBuffer; + readState_.bufferPtr_ += reclaimBuffer; + + // if (reclaimBuffer > 0) { + // T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_); + // T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_); + // } + + // check if the event has been read in full + if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { + // set the completed event to the current event + currentEvent_ = readState_.event_; + currentEvent_->eventBuffPos_ = 0; + + readState_.event_ = 0; + readState_.resetState(readState_.bufferPtr_); + + // exit criteria + T_DEBUG_L(0, "Finished one event"); + return true; + } + } + } + + + } +} + +void TFileTransport::seekToChunk(int32_t chunk) { + if (fd_ <= 0) { + throw TTransportException("File not open"); + } + + int32_t lastChunk = getNumChunks(); + + // negative indicates reverse seek (from the end) + if (chunk < 0) { + chunk += lastChunk; + } + + // cannot seek past EOF + if (chunk > lastChunk) { + T_DEBUG("Trying to seek past EOF. Seeking to EOF instead"); + chunk = lastChunk; + } + + uint32_t minEndOffset = 0; + if (chunk == lastChunk) { + minEndOffset = lseek(fd_, 0, SEEK_END); + } + + offset_ = lseek(fd_, chunk * chunkSize_, SEEK_SET); + readState_.resetAllValues(); + if (offset_ == -1) { + perror("TFileTransport: lseek error in seekToChunk"); + // TODO: should this trigger an exception or simply continue? + throw TTransportException("TFileTransport: lseek error in seekToChunk"); + } + + // seek to EOF if user wanted to go to last chunk + uint32_t oldReadTimeout = getReadTimeout(); + setReadTimeout(0); + if (chunk == lastChunk) { + // keep on reading unti the last event at point of seekChunk call + while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {}; + } + setReadTimeout(oldReadTimeout); + +} + +void TFileTransport::seekToEnd() { + seekToChunk(getNumChunks()); +} + +uint32_t TFileTransport::getNumChunks() { + if (fd_ <= 0) { + return 0; + } + struct stat f_info; + fstat(fd_, &f_info); + return (f_info.st_size)/chunkSize_; +} + +// Utility Functions +void TFileTransport::openLogFile() { + mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH; + fd_ = ::open(filename_.c_str(), O_RDWR | O_CREAT | O_APPEND, mode); + + // make sure open call was successful + if(fd_ == -1) { + char errorMsg[1024]; + sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str()); + perror(errorMsg); + throw TTransportException(errorMsg); + } + + // opening the file in append mode causes offset_t to be at the end + offset_ = lseek(fd_, 0, SEEK_CUR); + T_DEBUG_L(1, "initial offset: %lu", offset_); +} + +uint32_t TFileTransport::getCurrentTime() { + long long ret; + struct timeval tv; + gettimeofday(&tv, NULL); + ret = tv.tv_sec; + ret = ret*1000*1000 + tv.tv_usec; + return ret; +} + + +TFileProcessor::TFileProcessor(shared_ptr processor, + shared_ptr protocolFactory, + shared_ptr inputTransport): + processor_(processor), protocolFactory_(protocolFactory), + inputTransport_(inputTransport) { + + // default the output transport to a null transport (common case) + outputTransport_ = shared_ptr(new TNullTransport()); +} + +TFileProcessor::TFileProcessor(shared_ptr processor, + shared_ptr protocolFactory, + shared_ptr inputTransport, + shared_ptr outputTransport): + processor_(processor), protocolFactory_(protocolFactory), + inputTransport_(inputTransport), outputTransport_(outputTransport) { +}; + +void TFileProcessor::process(uint32_t numEvents, bool tail) { + pair,shared_ptr > iop; + iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_); + + // set the read timeout to 0 if tailing is required + int32_t oldReadTimeout = inputTransport_->getReadTimeout(); + if (tail) { + // save old read timeout so it can be restored + inputTransport_->setReadTimeout(0); + } + + uint32_t numProcessed = 0; + while(1) { + // bad form to use exceptions for flow control but there is really + // no other way around it + try { + processor_->process(iop.first, iop.second); + numProcessed++; + if ( (numEvents > 0) && (numProcessed == numEvents)) { + return; + } + } catch (TEOFException& teof) { + if (!tail) { + break; + } + } catch (TException te) { + cerr << te.what() << endl; + break; + } + } + + // restore old read timeout + if (tail) { + inputTransport_->setReadTimeout(oldReadTimeout); + } } diff --git a/lib/cpp/src/transport/TBufferedFileWriter.h b/lib/cpp/src/transport/TBufferedFileWriter.h index c327aabd..81923328 100644 --- a/lib/cpp/src/transport/TBufferedFileWriter.h +++ b/lib/cpp/src/transport/TBufferedFileWriter.h @@ -1,10 +1,12 @@ -#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ -#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1 +#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_ +#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1 #include "TTransport.h" #include "Thrift.h" +#include "TProcessor.h" #include +#include #include @@ -15,42 +17,154 @@ using std::string; // Data pertaining to a single event typedef struct eventInfo { - uint8_t* payLoad_; - uint32_t eventSize_; - - eventInfo():payLoad_(NULL), eventSize_(0){}; + uint8_t* eventBuff_; + uint32_t eventSize_; + uint32_t eventBuffPos_; + + eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){}; + ~eventInfo() { + if (eventBuff_) { + delete[] eventBuff_; + } + } } eventInfo; +// information about current read state +typedef struct readState { + eventInfo* event_; + + // keep track of event size + uint8_t eventSizeBuff_[4]; + uint8_t eventSizeBuffPos_; + bool readingSize_; + + // read buffer variables + int32_t bufferPtr_; + int32_t bufferLen_; + + // last successful dispatch point + int32_t lastDispatchPtr_; + + void resetState(uint32_t lastDispatchPtr) { + readingSize_ = true; + eventSizeBuffPos_ = 0; + lastDispatchPtr_ = lastDispatchPtr; + } + + void resetAllValues() { + resetState(0); + bufferPtr_ = 0; + bufferLen_ = 0; + if (event_) { + delete(event_); + } + event_ = 0; + } + + readState() { + event_ = 0; + resetAllValues(); + } + ~readState() { + if (event_) { + delete(event_); + } + } + +} readState; + /** - * Class that stores a circular in-memory event/message buffer and writes - * elements to disk when the buffer becomes full or a flush is triggered. + * File implementation of a transport. Reads and writes are done to a + * file on disk. * * @author Aditya Agarwal */ -class TBufferedFileWriter : public TTransport { +class TFileTransport : public TTransport { public: - void setFlushMaxUs(uint32_t flushMaxUs) { - flushMaxUs_ = flushMaxUs; + TFileTransport(string path); + ~TFileTransport(); + + // TODO: what is the correct behaviour for this? + // the log file is generally always open + bool isOpen() { + return true; } - uint32_t getFlushMaxUs() { - return flushMaxUs_; + + void write(const uint8_t* buf, uint32_t len) { + enqueueEvent(buf, len, false); + } + + void flush(); + + uint32_t readAll(uint8_t* buf, uint32_t len); + uint32_t read(uint8_t* buf, uint32_t len); + + // log-file specific functions + void seekToChunk(int chunk); + void seekToEnd(); + uint32_t getNumChunks(); + + // for changing the output file + void resetOutputFile(int fd, string filename, long long offset); + + // Setter/Getter functions for user-controllable options + void setReadBuffSize(uint32_t readBuffSize) { + if (readBuffSize) { + readBuffSize_ = readBuffSize; + } + } + uint32_t getReadBuffSize() { + return readBuffSize_; } - void setFlushMaxBytes(uint32_t flushMaxBytes) { - flushMaxBytes_ = flushMaxBytes; + void setReadTimeout(int32_t readTimeout) { + readTimeout_ = readTimeout; } - uint32_t getFlushMaxBytes() { - return flushMaxBytes_; + int32_t getReadTimeout() { + return readTimeout_; } void setChunkSize(uint32_t chunkSize) { - chunkSize_ = chunkSize; + if (chunkSize) { + chunkSize_ = chunkSize; + } } uint32_t getChunkSize() { return chunkSize_; } + void setEventBufferSize(uint32_t bufferSize) { + if (bufferSize) { + if (buffer_) { + delete[] buffer_; + } + eventBufferSize_ = bufferSize; + buffer_ = new eventInfo*[eventBufferSize_]; + } + } + uint32_t getEventBufferSize() { + return eventBufferSize_; + } + + void setFlushMaxUs(uint32_t flushMaxUs) { + if (flushMaxUs) { + flushMaxUs_ = flushMaxUs; + } + } + uint32_t getFlushMaxUs() { + return flushMaxUs_; + } + + void setFlushMaxBytes(uint32_t flushMaxBytes) { + if (flushMaxBytes) { + flushMaxBytes_ = flushMaxBytes; + } + } + uint32_t getFlushMaxBytes() { + return flushMaxBytes_; + } + void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; } @@ -58,52 +172,88 @@ class TBufferedFileWriter : public TTransport { return maxEventSize_; } - TBufferedFileWriter(string filename, uint32_t sz); - TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset); - void init(string filename, uint32_t sz, int fd, long long offset); - ~TBufferedFileWriter(); - - void resetOutputFile(int fd, string filename, long long offset); + void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) { + maxCorruptedEvents_ = maxCorruptedEvents; + } + uint32_t getMaxCorruptedEvents() { + return maxCorruptedEvents_; + } - void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); - void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush); - void write(const uint8_t* buf, uint32_t len) { - enqueueEvent(buf, len, false); + void setEofSleepTimeUs(uint32_t eofSleepTime) { + if (eofSleepTime) { + eofSleepTime_ = eofSleepTime; + } + } + uint32_t getEofSleepTimeUs() { + return eofSleepTime_; } - eventInfo dequeueEvent(long long deadline); - void flush(); + private: + // helper functions for writing to a file + void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); + void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush); + eventInfo* dequeueEvent(long long deadline); // control for writer thread static void* startWriterThread(void* ptr) { - (((TBufferedFileWriter*)ptr)->writerThread()); + (((TFileTransport*)ptr)->writerThread()); return 0; } void writerThread(); + // helper functions for reading from a file + bool readEvent(); - private: - // circular buffer to hold data in before it is flushed. This is an array of strings. Each - // element of the array stores a msg that needs to be written to the file - eventInfo* buffer_; - - // size of string buffer - uint32_t sz_; + // Utility functions + void openLogFile(); + uint32_t getCurrentTime(); + + // Class variables + readState readState_; + uint8_t* readBuff_; + + eventInfo* currentEvent_; + + uint32_t readBuffSize_; + static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024; + + int32_t readTimeout_; + static const int32_t DEFAULT_READ_TIMEOUT_MS = 200; // size of chunks that file will be split up into uint32_t chunkSize_; + static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; + // size of string buffer + uint32_t eventBufferSize_; + static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024; + + // circular buffer to hold data in before it is flushed. This is an array of strings. Each + // element of the array stores a msg that needs to be written to the file + eventInfo** buffer_; + // max number of microseconds that can pass without flushing uint32_t flushMaxUs_; + static const uint32_t DEFAULT_FLUSH_MAX_US = 20000; // max number of bytes that can be written without flushing uint32_t flushMaxBytes_; + static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024; // max event size uint32_t maxEventSize_; + static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0; + + // max number of corrupted events per chunk + uint32_t maxCorruptedEvents_; + static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0; + // sleep duration when EOF is hit + uint32_t eofSleepTime_; + static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; + // writer thread id - pthread_t writer_; + pthread_t writerThreadId_; // variables that determine position of head/tail of circular buffer int headPos_, tailPos_; @@ -126,13 +276,61 @@ class TBufferedFileWriter : public TTransport { int fd_; // Offset within the file - long long offset_; + off_t offset_; + +}; + +// Exception thrown when EOF is hit +class TEOFException : public facebook::thrift::TTransportException { + public: + TEOFException(): + facebook::thrift::TTransportException(TTX_EOF) {}; +}; - void openOutputFile(); - uint32_t getCurrentTime(); +// wrapper class to process events from a file containing thrift events +class TFileProcessor { + public: + /** + * Constructor that defaults output transport to null transport + * + * @param processor processes log-file events + * @param protocolFactory protocol factory + * @param inputTransport file transport + */ + TFileProcessor(shared_ptr processor, + shared_ptr protocolFactory, + shared_ptr inputTransport); + + /** + * Constructor + * + * @param processor processes log-file events + * @param protocolFactory protocol factory + * @param inputTransport input file transport + * @param output output transport + */ + TFileProcessor(shared_ptr processor, + shared_ptr protocolFactory, + shared_ptr inputTransport, + shared_ptr outputTransport); + + /** + * processes events from the file + * + * @param numEvents number of events to process (0 for unlimited) + * @param tail tails the file if true + */ + void process(uint32_t numEvents, bool tail); + + private: + shared_ptr processor_; + shared_ptr protocolFactory_; + shared_ptr inputTransport_; + shared_ptr outputTransport_; }; -}}} + +}}} // facebook::thrift::transport -#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ +#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h index e02eb70f..de94a7c9 100644 --- a/lib/cpp/src/transport/TTransportException.h +++ b/lib/cpp/src/transport/TTransportException.h @@ -12,6 +12,7 @@ enum TTransportExceptionType { TTX_UNKNOWN = 0, TTX_NOT_OPEN = 1, TTX_TIMED_OUT = 2, + TTX_EOF = 3, }; /** diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index a8003cf2..427cc0e7 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -25,7 +25,10 @@ class TNullTransport : public TTransport { void open() {} - void write(const std::string& s) {} + void write(const uint8_t* buf, uint32_t len) { + return; + } + }; diff --git a/test/cpp/src/main.cpp b/test/cpp/src/main.cpp index 8344a885..47ae671b 100644 --- a/test/cpp/src/main.cpp +++ b/test/cpp/src/main.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "Service.h" @@ -73,7 +74,12 @@ class Server : public ServiceIf { int8_t echoByte(int8_t arg) {return arg;} int32_t echoI32(int32_t arg) {return arg;} int64_t echoI64(int64_t arg) {return arg;} - string echoString(string arg) {return arg;} + string echoString(string arg) { + if (arg != "hello") { + T_ERROR_ABORT("WRONG STRING!!!!"); + } + return arg; + } vector echoList(vector arg) {return arg;} set echoSet(set arg) {return arg;} map echoMap(map arg) {return arg;} @@ -189,20 +195,22 @@ public: bool _done; Monitor _sleep; }; - + + int main(int argc, char **argv) { - int port = 9090; + int port = 9091; string serverType = "thread-pool"; string protocolType = "binary"; size_t workerCount = 4; - size_t clientCount = 10; - size_t loopCount = 10000; + size_t clientCount = 20; + size_t loopCount = 50000; TType loopType = T_VOID; string callName = "echoVoid"; bool runServer = true; bool logRequests = false; string requestLogPath = "./requestlog.tlog"; + bool replayRequests = false; ostringstream usage; @@ -217,8 +225,10 @@ int main(int argc, char **argv) { "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl << "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl << + "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl << "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; + map args; for(int ix = 1; ix < argc; ix++) { @@ -272,6 +282,10 @@ int main(int argc, char **argv) { logRequests = args["log-request"] == "true"; } + if(!args["replay-request"].empty()) { + replayRequests = args["replay-request"] == "true"; + } + if(!args["server-type"].empty()) { serverType = args["server-type"]; @@ -299,6 +313,28 @@ int main(int argc, char **argv) { // Dispatcher shared_ptr serviceHandler(new Server()); + if (replayRequests) { + shared_ptr serviceHandler(new Server()); + shared_ptr serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transports + shared_ptr fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + fileTransport->seekToEnd(); + + // Protocol Factory + shared_ptr protocolFactory(new TBinaryProtocolFactory()); + + TFileProcessor fileProcessor(serviceProcessor, + protocolFactory, + fileTransport); + + fileProcessor.process(0, true); + exit(0); + } + + if(runServer) { shared_ptr serviceProcessor(new ServiceProcessor(serviceHandler)); @@ -314,11 +350,12 @@ int main(int argc, char **argv) { if (logRequests) { // initialize the log file - shared_ptr bufferedFileWriter(new TBufferedFileWriter(requestLogPath, 1000)); - bufferedFileWriter->setChunkSize(2 * 1024 * 1024); - bufferedFileWriter->setMaxEventSize(1024 * 16); + shared_ptr fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); - transportFactory = shared_ptr(new TBufferedRouterTransportFactory(bufferedFileWriter)); + transportFactory = + shared_ptr(new TBufferedRouterTransportFactory(fileTransport)); } shared_ptr serverThread; -- 2.17.1