From 3e9d177fd2a063a027262735adafc4d256748b1c Mon Sep 17 00:00:00 2001 From: James Wang Date: Tue, 27 Mar 2007 23:17:34 +0000 Subject: [PATCH] Thrift: Adding StatsProcessor, PeekProcessor, TPipedFileReaderTransport, and TPipedFileReaderTransportFactory classes - StatsProcessor can be used to print events, or keep track of event frequency - PeekProcessor is used to examine data in a thrift event, prior to passing it along to an underlying processor - TPipedFileReaderTransport and its factory are used to pipe a TFileReaderTransport (which TFileProcessor requires) Also fixed some bugs in TFileTransport - next flush time was overflowing and not always being reset Reviewed by: aditya, mcslee Test Plan: Tested using various thrift clients (scribe, falcon) and gdb in sandbox and on dev008. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665066 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 13 +- lib/cpp/src/TProcessor.h | 2 +- lib/cpp/src/processor/PeekProcessor.cpp | 84 ++++++++ lib/cpp/src/processor/PeekProcessor.h | 61 ++++++ lib/cpp/src/processor/StatsProcessor.h | 252 ++++++++++++++++++++++ lib/cpp/src/transport/TFileTransport.cpp | 67 +++--- lib/cpp/src/transport/TFileTransport.h | 38 +++- lib/cpp/src/transport/TTransportUtils.cpp | 83 +++++++ lib/cpp/src/transport/TTransportUtils.h | 87 +++++++- 9 files changed, 634 insertions(+), 53 deletions(-) create mode 100644 lib/cpp/src/processor/PeekProcessor.cpp create mode 100644 lib/cpp/src/processor/PeekProcessor.h create mode 100644 lib/cpp/src/processor/StatsProcessor.h diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index b205e787..84618f4b 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -11,7 +11,8 @@ libthrift_sources = src/Thrift.cpp \ src/concurrency/PosixThreadFactory.cpp \ src/concurrency/ThreadManager.cpp \ src/concurrency/TimerManager.cpp \ - src/protocol/TBinaryProtocol.cpp \ + src/processor/PeekProcessor.cpp \ + src/protocol/TBinaryProtocol.cpp \ src/transport/TFileTransport.cpp \ src/transport/THttpClient.cpp \ src/transport/TSocket.cpp \ @@ -34,14 +35,14 @@ libthriftnb_la_CXXFLAGS = $(libthrift_cxxflags) include_thriftdir = $(includedir)/thrift include_thrift_HEADERS = \ - config.h \ - src/Thrift.h \ - src/TProcessor.h \ - src/TLogging.h + config.h \ + src/Thrift.h \ + src/TProcessor.h \ + src/TLogging.h include_concurrencydir = $(include_thriftdir)/concurrency include_concurrency_HEADERS = \ - src/concurrency/Exception.h \ + src/concurrency/Exception.h \ src/concurrency/Mutex.h \ src/concurrency/Monitor.h \ src/concurrency/PosixThreadFactory.h \ diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index affbe812..ac77da34 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -28,7 +28,7 @@ class TProcessor { virtual bool process(boost::shared_ptr in, boost::shared_ptr out) = 0; - bool process(boost::shared_ptr io) { + virtual bool process(boost::shared_ptr io) { return process(io, io); } diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp new file mode 100644 index 00000000..a7c55718 --- /dev/null +++ b/lib/cpp/src/processor/PeekProcessor.cpp @@ -0,0 +1,84 @@ +#include "PeekProcessor.h" + +namespace facebook { namespace thrift { namespace processor { + +PeekProcessor::PeekProcessor() { + memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer()); +} +PeekProcessor::~PeekProcessor() {} + +void PeekProcessor::initialize(boost::shared_ptr actualProcessor, + boost::shared_ptr protocolFactory, + boost::shared_ptr transportFactory) { + actualProcessor_ = actualProcessor; + pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_); + transportFactory_ = transportFactory; + transportFactory_->initializeTargetTransport(memoryBuffer_); +} + +boost::shared_ptr PeekProcessor::getPipedTransport(boost::shared_ptr in) { + return transportFactory_->getTransport(in); +} + +bool PeekProcessor::process(boost::shared_ptr in, + boost::shared_ptr out) { + + std::string fname; + facebook::thrift::protocol::TMessageType mtype; + int32_t seqid; + in->readMessageBegin(fname, mtype, seqid); + + if (mtype != facebook::thrift::protocol::T_CALL) { + throw facebook::thrift::TException("Unexpected message type"); + } + + // Peek at the name + peekName(fname); + + facebook::thrift::protocol::TType ftype; + int16_t fid; + while (true) { + in->readFieldBegin(fname, ftype, fid); + if (ftype == facebook::thrift::protocol::T_STOP) { + break; + } + + // Peek at the variable + peek(in, ftype, fid); + } + in->readMessageEnd(); + in->getTransport()->readEnd(); + + // Done peeking at variables + peekEnd(); + + // + // All the data is now in memoryBuffer_ and ready to be processed + // + + // Let's first take a peek at the full data in memory + uint8_t* buffer; + uint32_t size; + memoryBuffer_->getBuffer(&buffer, &size); + peekBuffer(buffer, size); + + bool ret = actualProcessor_->process(pipedProtocol_, out); + memoryBuffer_->resetBuffer(); + return ret; +} + +void PeekProcessor::peekName(const std::string& fname) { +} + +void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) { +} + +void PeekProcessor::peek(boost::shared_ptr in, + facebook::thrift::protocol::TType ftype, + int16_t fid) { + in->skip(ftype); +} + +void PeekProcessor::peekEnd() {} + +}}} diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h new file mode 100644 index 00000000..d1d227c1 --- /dev/null +++ b/lib/cpp/src/processor/PeekProcessor.h @@ -0,0 +1,61 @@ +// Copyright (c) 2006- Facebook +// Distributed under the Thrift Software License +// +// See accompanying file LICENSE or visit the Thrift site at: +// http://developers.facebook.com/thrift/ + +#ifndef PEEKPROCESSOR_H +#define PEEKPROCESSOR_H + +#include +#include +#include +#include +#include + +namespace facebook { namespace thrift { namespace processor { + +/* + * Class for peeking at the raw data that is being processed by another processor + * and gives the derived class a chance to change behavior accordingly + * + * @author James Wang + */ +class PeekProcessor : public facebook::thrift::TProcessor { + + public: + PeekProcessor(); + virtual ~PeekProcessor(); + + // Input here: actualProcessor - the underlying processor + // protocolFactory - the protocol factory used to wrap the memory buffer + // transportFactory - this TPipedTransportFactory is used to wrap the source transport + // via a call to getPipedTransport + void initialize(boost::shared_ptr actualProcessor, + boost::shared_ptr protocolFactory, + boost::shared_ptr transportFactory); + + boost::shared_ptr getPipedTransport(boost::shared_ptr in); + + virtual bool process(boost::shared_ptr in, + boost::shared_ptr out); + + // The following three functions can be overloaded by child classes to + // achieve desired peeking behavior + virtual void peekName(const std::string& fname); + virtual void peekBuffer(uint8_t* buffer, uint32_t size); + virtual void peek(boost::shared_ptr in, + facebook::thrift::protocol::TType ftype, + int16_t fid); + virtual void peekEnd(); + + private: + boost::shared_ptr actualProcessor_; + boost::shared_ptr pipedProtocol_; + boost::shared_ptr transportFactory_; + boost::shared_ptr memoryBuffer_; +}; + +}}} // facebook::thrift::processor + +#endif diff --git a/lib/cpp/src/processor/StatsProcessor.h b/lib/cpp/src/processor/StatsProcessor.h new file mode 100644 index 00000000..e0804329 --- /dev/null +++ b/lib/cpp/src/processor/StatsProcessor.h @@ -0,0 +1,252 @@ +// Copyright (c) 2006- Facebook +// Distributed under the Thrift Software License +// +// See accompanying file LICENSE or visit the Thrift site at: +// http://developers.facebook.com/thrift/ + +#ifndef STATSPROCESSOR_H +#define STATSPROCESSOR_H + +#include +#include +#include +#include + +namespace facebook { namespace thrift { namespace processor { + +/* + * Class for keeping track of function call statistics and printing them if desired + * + * @author James Wang + */ +class StatsProcessor : public facebook::thrift::TProcessor { +public: + StatsProcessor(bool print, bool frequency) + : print_(print), + frequency_(frequency) + {} + virtual ~StatsProcessor() {}; + + virtual bool process(boost::shared_ptr piprot, boost::shared_ptr poprot) { + + piprot_ = piprot; + + std::string fname; + facebook::thrift::protocol::TMessageType mtype; + int32_t seqid; + + piprot_->readMessageBegin(fname, mtype, seqid); + if (mtype != facebook::thrift::protocol::T_CALL) { + if (print_) { + printf("Unknown message type\n"); + } + throw facebook::thrift::TException("Unexpected message type"); + } + if (print_) { + printf("%s (", fname.c_str()); + } + if (frequency_) { + if (frequency_map_.find(fname) != frequency_map_.end()) { + frequency_map_[fname]++; + } else { + frequency_map_[fname] = 1; + } + } + + facebook::thrift::protocol::TType ftype; + int16_t fid; + + while (true) { + piprot_->readFieldBegin(fname, ftype, fid); + if (ftype == facebook::thrift::protocol::T_STOP) { + break; + } + + printAndPassToBuffer(ftype); + if (print_) { + printf(", "); + } + } + + if (print_) { + printf("\b\b)\n"); + } + return true; + } + + const std::map& get_frequency_map() { + return frequency_map_; + } + +protected: + void printAndPassToBuffer(facebook::thrift::protocol::TType ftype) { + switch (ftype) { + case facebook::thrift::protocol::T_BOOL: + { + bool boolv; + piprot_->readBool(boolv); + if (print_) { + printf("%d", boolv); + } + } + break; + case facebook::thrift::protocol::T_BYTE: + { + int8_t bytev; + piprot_->readByte(bytev); + if (print_) { + printf("%d", bytev); + } + } + break; + case facebook::thrift::protocol::T_I16: + { + int16_t i16; + piprot_->readI16(i16); + if (print_) { + printf("%d", i16); + } + } + break; + case facebook::thrift::protocol::T_I32: + { + int32_t i32; + piprot_->readI32(i32); + if (print_) { + printf("%d", i32); + } + } + break; + case facebook::thrift::protocol::T_I64: + { + int64_t i64; + piprot_->readI64(i64); + if (print_) { + printf("%ld", i64); + } + } + break; + case facebook::thrift::protocol::T_DOUBLE: + { + double dub; + piprot_->readDouble(dub); + if (print_) { + printf("%f", dub); + } + } + break; + case facebook::thrift::protocol::T_STRING: + { + std::string str; + piprot_->readString(str); + if (print_) { + printf("%s", str.c_str()); + } + } + break; + case facebook::thrift::protocol::T_STRUCT: + { + std::string name; + int16_t fid; + facebook::thrift::protocol::TType ftype; + piprot_->readStructBegin(name); + if (print_) { + printf("<"); + } + while (true) { + piprot_->readFieldBegin(name, ftype, fid); + if (ftype == facebook::thrift::protocol::T_STOP) { + break; + } + printAndPassToBuffer(ftype); + if (print_) { + printf(","); + } + piprot_->readFieldEnd(); + } + piprot_->readStructEnd(); + if (print_) { + printf("\b>"); + } + } + break; + case facebook::thrift::protocol::T_MAP: + { + facebook::thrift::protocol::TType keyType; + facebook::thrift::protocol::TType valType; + uint32_t i, size; + piprot_->readMapBegin(keyType, valType, size); + if (print_) { + printf("{"); + } + for (i = 0; i < size; i++) { + printAndPassToBuffer(keyType); + if (print_) { + printf("=>"); + } + printAndPassToBuffer(valType); + if (print_) { + printf(","); + } + } + piprot_->readMapEnd(); + if (print_) { + printf("\b}"); + } + } + break; + case facebook::thrift::protocol::T_SET: + { + facebook::thrift::protocol::TType elemType; + uint32_t i, size; + piprot_->readSetBegin(elemType, size); + if (print_) { + printf("{"); + } + for (i = 0; i < size; i++) { + printAndPassToBuffer(elemType); + if (print_) { + printf(","); + } + } + piprot_->readSetEnd(); + if (print_) { + printf("\b}"); + } + } + break; + case facebook::thrift::protocol::T_LIST: + { + facebook::thrift::protocol::TType elemType; + uint32_t i, size; + piprot_->readListBegin(elemType, size); + if (print_) { + printf("["); + } + for (i = 0; i < size; i++) { + printAndPassToBuffer(elemType); + if (print_) { + printf(","); + } + } + piprot_->readListEnd(); + if (print_) { + printf("\b]"); + } + } + break; + default: + break; + } + } + + boost::shared_ptr piprot_; + std::map frequency_map_; + + bool print_; + bool frequency_; +}; + +}}} // facebook::thrift::processor + +#endif diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index 9a3ca302..001783fc 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -204,27 +204,14 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool bl pthread_mutex_unlock(&mutex_); } -bool TFileTransport::swapEventBuffers(long long deadline) { - //deadline time struc - struct timespec ts; - if (deadline) { - ts.tv_sec = deadline/(1000*1000); - ts.tv_nsec = (deadline%(1000*1000))*1000; - } - - // wait for the queue to fill up +bool TFileTransport::swapEventBuffers(struct timespec* deadline) { pthread_mutex_lock(&mutex_); - while (enqueueBuffer_->isEmpty()) { - // do a timed wait on the condition variable - if (deadline) { - int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts); - if(e == ETIMEDOUT) { - break; - } - } else { - // just wait until the buffer gets an item - pthread_cond_wait(¬Empty_, &mutex_); - } + if (deadline != NULL) { + // if we were handed a deadline time struct, do a timed wait + pthread_cond_timedwait(¬Empty_, &mutex_, deadline); + } else { + // just wait until the buffer gets an item + pthread_cond_wait(¬Empty_, &mutex_); } bool swapped = false; @@ -259,7 +246,9 @@ void TFileTransport::writerThread() { offset_ = lseek(fd_, 0, SEEK_END); // Figure out the next time by which a flush must take place - long long nextFlush = getCurrentTime() + flushMaxUs_; + + struct timespec ts_next_flush; + getNextFlushTime(&ts_next_flush); uint32_t unflushed = 0; while(1) { @@ -273,7 +262,7 @@ void TFileTransport::writerThread() { return; } - if (swapEventBuffers(nextFlush)) { + if (swapEventBuffers(&ts_next_flush)) { eventInfo* outEvent; while (NULL != (outEvent = dequeueBuffer_->getNext())) { if (!outEvent) { @@ -342,14 +331,23 @@ void TFileTransport::writerThread() { dequeueBuffer_->reset(); } + bool flushTimeElapsed = false; + struct timespec current_time; + clock_gettime(CLOCK_REALTIME, ¤t_time); + + if (current_time.tv_sec > ts_next_flush.tv_sec || + (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) { + flushTimeElapsed = true; + getNextFlushTime(&ts_next_flush); + } + // couple of cases from which a flush could be triggered - if ((getCurrentTime() >= nextFlush && unflushed > 0) || + if ((flushTimeElapsed && unflushed > 0) || unflushed > flushMaxBytes_ || forceFlush_) { // sync (force flush) file to disk fsync(fd_); - nextFlush = getCurrentTime() + flushMaxUs_; unflushed = 0; // notify anybody waiting for flush completion @@ -697,13 +695,14 @@ void TFileTransport::openLogFile() { offset_ = lseek(fd_, 0, SEEK_CUR); } -uint32_t TFileTransport::getCurrentTime() { - long long ret; - struct timeval tv; - gettimeofday(&tv, NULL); - ret = tv.tv_sec; - ret = ret*1000*1000 + tv.tv_usec; - return ret; +void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) { + clock_gettime(CLOCK_REALTIME, ts_next_flush); + ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000; + if (ts_next_flush->tv_nsec > 1000000000) { + ts_next_flush->tv_nsec -= 1000000000; + ts_next_flush->tv_sec += 1; + } + ts_next_flush->tv_sec += flushMaxUs_ / 1000000; } TFileTransportBuffer::TFileTransportBuffer(uint32_t size) @@ -773,7 +772,7 @@ bool TFileTransportBuffer::isEmpty() { TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, - shared_ptr inputTransport): + shared_ptr inputTransport): processor_(processor), inputProtocolFactory_(protocolFactory), outputProtocolFactory_(protocolFactory), @@ -786,7 +785,7 @@ TFileProcessor::TFileProcessor(shared_ptr processor, TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr inputProtocolFactory, shared_ptr outputProtocolFactory, - shared_ptr inputTransport): + shared_ptr inputTransport): processor_(processor), inputProtocolFactory_(inputProtocolFactory), outputProtocolFactory_(outputProtocolFactory), @@ -798,7 +797,7 @@ TFileProcessor::TFileProcessor(shared_ptr processor, TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, - shared_ptr inputTransport, + shared_ptr inputTransport, shared_ptr outputTransport): processor_(processor), inputProtocolFactory_(protocolFactory), diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index f4d25f48..e31e85c0 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -121,13 +121,37 @@ class TFileTransportBuffer { eventInfo** buffer_; }; +/** + * Abstract interface for transports used to read files + */ +class TFileReaderTransport : virtual public TTransport { + public: + virtual int32_t getReadTimeout() = 0; + virtual void setReadTimeout(int32_t readTimeout) = 0; + + virtual uint32_t getNumChunks() = 0; + virtual uint32_t getCurChunk() = 0; + virtual void seekToChunk(int32_t chunk) = 0; + virtual void seekToEnd() = 0; +}; + +/** + * Abstract interface for transports used to write files + */ +class TFileWriterTransport : virtual public TTransport { + public: + virtual uint32_t getChunkSize() = 0; + virtual void setChunkSize(uint32_t chunkSize) = 0; +}; + /** * File implementation of a transport. Reads and writes are done to a * file on disk. * * @author Aditya Agarwal */ -class TFileTransport : public TTransport { +class TFileTransport : public TFileReaderTransport, + public TFileWriterTransport { public: TFileTransport(std::string path); ~TFileTransport(); @@ -240,7 +264,7 @@ class TFileTransport : public TTransport { private: // helper functions for writing to a file void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); - bool swapEventBuffers(long long deadline); + bool swapEventBuffers(struct timespec* deadline); bool initBufferAndWriteThread(); // control for writer thread @@ -259,7 +283,7 @@ class TFileTransport : public TTransport { // Utility functions void openLogFile(); - uint32_t getCurrentTime(); + void getNextFlushTime(struct timespec* ts_next_flush); // Class variables readState readState_; @@ -358,12 +382,12 @@ class TFileProcessor { */ TFileProcessor(boost::shared_ptr processor, boost::shared_ptr protocolFactory, - boost::shared_ptr inputTransport); + boost::shared_ptr inputTransport); TFileProcessor(boost::shared_ptr processor, boost::shared_ptr inputProtocolFactory, boost::shared_ptr outputProtocolFactory, - boost::shared_ptr inputTransport); + boost::shared_ptr inputTransport); /** * Constructor @@ -375,7 +399,7 @@ class TFileProcessor { */ TFileProcessor(boost::shared_ptr processor, boost::shared_ptr protocolFactory, - boost::shared_ptr inputTransport, + boost::shared_ptr inputTransport, boost::shared_ptr outputTransport); /** @@ -396,7 +420,7 @@ class TFileProcessor { boost::shared_ptr processor_; boost::shared_ptr inputProtocolFactory_; boost::shared_ptr outputProtocolFactory_; - boost::shared_ptr inputTransport_; + boost::shared_ptr inputTransport_; boost::shared_ptr outputTransport_; }; diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index aaefb2c7..290b4835 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -296,4 +296,87 @@ void TPipedTransport::flush() { srcTrans_->flush(); } +TPipedFileReaderTransport::TPipedFileReaderTransport(boost::shared_ptr srcTrans, boost::shared_ptr dstTrans) + : TPipedTransport(srcTrans, dstTrans), + srcTrans_(srcTrans) { +} + +TPipedFileReaderTransport::~TPipedFileReaderTransport() { +} + +bool TPipedFileReaderTransport::isOpen() { + return TPipedTransport::isOpen(); +} + +bool TPipedFileReaderTransport::peek() { + return TPipedTransport::peek(); +} + +void TPipedFileReaderTransport::open() { + TPipedTransport::open(); +} + +void TPipedFileReaderTransport::close() { + TPipedTransport::close(); +} + +uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) { + return TPipedTransport::read(buf, len); +} + +uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) { + uint32_t have = 0; + uint32_t get = 0; + + while (have < len) { + get = read(buf+have, len-have); + if (get <= 0) { + throw TEOFException(); + } + have += get; + } + + return have; +} + +void TPipedFileReaderTransport::readEnd() { + TPipedTransport::readEnd(); +} + +void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) { + TPipedTransport::write(buf, len); +} + +void TPipedFileReaderTransport::writeEnd() { + TPipedTransport::writeEnd(); +} + +void TPipedFileReaderTransport::flush() { + TPipedTransport::flush(); +} + +int32_t TPipedFileReaderTransport::getReadTimeout() { + return srcTrans_->getReadTimeout(); +} + +void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) { + srcTrans_->setReadTimeout(readTimeout); +} + +uint32_t TPipedFileReaderTransport::getNumChunks() { + return srcTrans_->getNumChunks(); +} + +uint32_t TPipedFileReaderTransport::getCurChunk() { + return srcTrans_->getCurChunk(); +} + +void TPipedFileReaderTransport::seekToChunk(int32_t chunk) { + srcTrans_->seekToChunk(chunk); +} + +void TPipedFileReaderTransport::seekToEnd() { + srcTrans_->seekToEnd(); +} + }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index f95ad768..653939ff 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -7,7 +7,9 @@ #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ #define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1 +#include #include +#include namespace facebook { namespace thrift { namespace transport { @@ -349,7 +351,7 @@ class TMemoryBuffer : public TTransport { * * @author Aditya Agarwal */ -class TPipedTransport : public TTransport { +class TPipedTransport : virtual public TTransport { public: TPipedTransport(boost::shared_ptr srcTrans, boost::shared_ptr dstTrans) : @@ -465,8 +467,10 @@ class TPipedTransport : public TTransport { */ class TPipedTransportFactory : public TTransportFactory { public: - TPipedTransportFactory(boost::shared_ptr dstTrans): dstTrans_(dstTrans) {} - + TPipedTransportFactory() {} + TPipedTransportFactory(boost::shared_ptr dstTrans) { + initializeTargetTransport(dstTrans); + } virtual ~TPipedTransportFactory() {} /** @@ -476,12 +480,85 @@ class TPipedTransportFactory : public TTransportFactory { return boost::shared_ptr(new TPipedTransport(srcTrans, dstTrans_)); } - private: + virtual void initializeTargetTransport(boost::shared_ptr dstTrans) { + if (dstTrans_.get() == NULL) { + dstTrans_ = dstTrans; + } else { + throw TException("Target transport already initialized"); + } + } + + protected: boost::shared_ptr dstTrans_; }; +/** + * TPipedFileTransport. This is just like a TTransport, except that + * it is a templatized class, so that clients who rely on a specific + * TTransport can still access the original transport. + * + * @author James Wang + */ +class TPipedFileReaderTransport : public TPipedTransport, + public TFileReaderTransport { + public: + TPipedFileReaderTransport(boost::shared_ptr srcTrans, boost::shared_ptr dstTrans); + + ~TPipedFileReaderTransport(); -}}} // facebook::thrift::transport + // TTransport functions + bool isOpen(); + bool peek(); + void open(); + void close(); + uint32_t read(uint8_t* buf, uint32_t len); + uint32_t readAll(uint8_t* buf, uint32_t len); + void readEnd(); + void write(const uint8_t* buf, uint32_t len); + void writeEnd(); + void flush(); + // TFileReaderTransport functions + int32_t getReadTimeout(); + void setReadTimeout(int32_t readTimeout); + uint32_t getNumChunks(); + uint32_t getCurChunk(); + void seekToChunk(int32_t chunk); + void seekToEnd(); + + protected: + // shouldn't be used + TPipedFileReaderTransport(); + boost::shared_ptr srcTrans_; +}; + +/** + * Creates a TPipedFileReaderTransport from a filepath and a destination transport + * + * @author James Wang + */ +class TPipedFileReaderTransportFactory : public TPipedTransportFactory { + public: + TPipedFileReaderTransportFactory() {} + TPipedFileReaderTransportFactory(boost::shared_ptr dstTrans) + : TPipedTransportFactory(dstTrans) + {} + virtual ~TPipedFileReaderTransportFactory() {} + + boost::shared_ptr getTransport(boost::shared_ptr srcTrans) { + boost::shared_ptr pFileReaderTransport = boost::dynamic_pointer_cast(srcTrans); + if (pFileReaderTransport.get() != NULL) { + return getFileReaderTransport(pFileReaderTransport); + } else { + return boost::shared_ptr(); + } + } + + boost::shared_ptr getFileReaderTransport(boost::shared_ptr srcTrans) { + return boost::shared_ptr(new TPipedFileReaderTransport(srcTrans, dstTrans_)); + } +}; + +}}} // facebook::thrift::transport #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ -- 2.17.1