From 794568bafe32416338a011258872907bb51b476b Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Thu, 18 Jan 2007 06:20:24 +0000 Subject: [PATCH] -- additions to ThriftLogfile Summary: -- fixed peek() in TBufferedRouterTransport.cpp -- Added processChunk() to ThriftLogfile Reviewed By: Slee git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664924 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/TBufferedRouterTransport.cpp | 9 ++- .../src/transport/TBufferedRouterTransport.h | 15 +++++ lib/cpp/src/transport/TFileTransport.cpp | 55 ++++++++++++++----- lib/cpp/src/transport/TFileTransport.h | 9 ++- 4 files changed, 70 insertions(+), 18 deletions(-) diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cpp b/lib/cpp/src/transport/TBufferedRouterTransport.cpp index 7c09953e..ad6a28ff 100644 --- a/lib/cpp/src/transport/TBufferedRouterTransport.cpp +++ b/lib/cpp/src/transport/TBufferedRouterTransport.cpp @@ -27,14 +27,17 @@ uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) { rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_); } + // Hand over whatever we have uint32_t give = need; if (rLen_-rPos_ < give) { give = rLen_-rPos_; } - memcpy(buf, rBuf_+rPos_, give); - rPos_ += give; - need -= give; + if (give > 0) { + memcpy(buf, rBuf_+rPos_, give); + rPos_ += give; + need -= give; + } return (len - need); } diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h index 3a5e3941..b2e4d4fa 100644 --- a/lib/cpp/src/transport/TBufferedRouterTransport.h +++ b/lib/cpp/src/transport/TBufferedRouterTransport.h @@ -50,6 +50,21 @@ class TBufferedRouterTransport : public TTransport { return trans_->isOpen(); } + bool peek() { + if (rPos_ >= rLen_) { + // Double the size of the underlying buffer if it is full + if (rLen_ == rBufSize_) { + rBufSize_ *=2; + rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); + } + + // try to fill up the buffer + rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_); + } + return (rLen_ > rPos_); + } + + void open() { trans_->open(); } diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index cfdb3b95..05dd25d7 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -306,10 +306,10 @@ void TFileTransport::writerThread() { T_DEBUG("padding is larger than chunk size, skipping event"); continue; } - // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2); uint8_t zeros[padding]; bzero(zeros, padding); - T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_); + // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", + // padding, offset_, chunk2); if(-1 == ::write(fd_, zeros, padding)) { perror("TFileTransport: error while padding zeros"); throw TTransportException("TFileTransport: error while padding zeros"); @@ -399,9 +399,12 @@ uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { // read as much of the current event as possible int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; if (remaining <= (int32_t)len) { - memcpy(buf, - currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, - remaining); + // copy over anything thats remaining + if (remaining > 0) { + memcpy(buf, + currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, + remaining); + } delete(currentEvent_); currentEvent_ = 0; return remaining; @@ -426,12 +429,12 @@ bool TFileTransport::readEvent() { // advance the offset pointer offset_ += readState_.bufferLen_; readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_); - if (readState_.bufferLen_) { - T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); - } + // if (readState_.bufferLen_) { + // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); + // } readState_.bufferPtr_ = 0; readState_.lastDispatchPtr_ = 0; - + // read error if (readState_.bufferLen_ == -1) { readState_.resetAllValues(); @@ -481,7 +484,7 @@ bool TFileTransport::readEvent() { if (readState_.eventSizeBuffPos_ == 4) { // 0 length event indicates padding if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) { - T_DEBUG_L(1, "Got padding"); + // T_DEBUG_L(1, "Got padding"); readState_.resetState(readState_.lastDispatchPtr_); continue; } @@ -493,8 +496,6 @@ bool TFileTransport::readEvent() { readState_.event_ = new eventInfo(); readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_)); - T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_); - // TODO // make sure event is valid, an error is triggered if: // 1. Event size is larger than user-speficied max-event size @@ -541,7 +542,7 @@ bool TFileTransport::readEvent() { readState_.resetState(readState_.bufferPtr_); // exit criteria - T_DEBUG_L(0, "Finished one event"); + // T_DEBUG_L(0, "Finished one event"); return true; } } @@ -605,6 +606,10 @@ uint32_t TFileTransport::getNumChunks() { return (f_info.st_size)/chunkSize_; } +uint32_t TFileTransport::getCurChunk() { + return offset_/chunkSize_; +} + // Utility Functions void TFileTransport::openLogFile() { mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH; @@ -620,7 +625,6 @@ void TFileTransport::openLogFile() { // opening the file in append mode causes offset_t to be at the end offset_ = lseek(fd_, 0, SEEK_CUR); - T_DEBUG_L(1, "initial offset: %lu", offset_); } uint32_t TFileTransport::getCurrentTime() { @@ -689,4 +693,27 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) { } +void TFileProcessor::processChunk() { + pair,shared_ptr > iop; + iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_); + + uint32_t curChunk = inputTransport_->getCurChunk(); + + while(1) { + // bad form to use exceptions for flow control but there is really + // no other way around it + try { + processor_->process(iop.first, iop.second); + if (curChunk != inputTransport_->getCurChunk()) { + break; + } + } catch (TEOFException& teof) { + break; + } catch (TException te) { + cerr << te.what() << endl; + break; + } + } +} + }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index 81923328..fb4a1c25 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -63,7 +63,7 @@ typedef struct readState { readState() { event_ = 0; - resetAllValues(); + resetAllValues(); } ~readState() { @@ -104,6 +104,7 @@ class TFileTransport : public TTransport { void seekToChunk(int chunk); void seekToEnd(); uint32_t getNumChunks(); + uint32_t getCurChunk(); // for changing the output file void resetOutputFile(int fd, string filename, long long offset); @@ -323,6 +324,12 @@ class TFileProcessor { */ void process(uint32_t numEvents, bool tail); + /** + * process events until the end of the chunk + * + */ + void processChunk(); + private: shared_ptr processor_; shared_ptr protocolFactory_; -- 2.17.1