From: James Wang Date: Fri, 13 Apr 2007 20:10:39 +0000 (+0000) Subject: Thrift: Modifications to PeekProcessor to be able to support nested PeekProcessors X-Git-Tag: 0.2.0~1372 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=5f45207ee9774226ef39fa93e6a2100e9e8d8ff5;p=common%2Fthrift.git Thrift: Modifications to PeekProcessor to be able to support nested PeekProcessors Reviewed by: boz Test Plan: Tested with Falcon git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665100 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/processor/PeekProcessor.cpp b/lib/cpp/src/processor/PeekProcessor.cpp index d510a5d8..21df9db7 100644 --- a/lib/cpp/src/processor/PeekProcessor.cpp +++ b/lib/cpp/src/processor/PeekProcessor.cpp @@ -1,45 +1,63 @@ #include "PeekProcessor.h" +using namespace facebook::thrift::transport; +using namespace facebook::thrift::protocol; +using namespace facebook::thrift; + namespace facebook { namespace thrift { namespace processor { PeekProcessor::PeekProcessor() { - memoryBuffer_.reset(new facebook::thrift::transport::TMemoryBuffer()); + memoryBuffer_.reset(new TMemoryBuffer()); + targetTransport_ = memoryBuffer_; } PeekProcessor::~PeekProcessor() {} -void PeekProcessor::initialize(boost::shared_ptr actualProcessor, - boost::shared_ptr protocolFactory, - boost::shared_ptr transportFactory) { +void PeekProcessor::initialize(boost::shared_ptr actualProcessor, + boost::shared_ptr protocolFactory, + boost::shared_ptr transportFactory) { actualProcessor_ = actualProcessor; - pipedProtocol_ = protocolFactory->getProtocol(memoryBuffer_); + pipedProtocol_ = protocolFactory->getProtocol(targetTransport_); transportFactory_ = transportFactory; - transportFactory_->initializeTargetTransport(memoryBuffer_); + transportFactory_->initializeTargetTransport(targetTransport_); } -boost::shared_ptr PeekProcessor::getPipedTransport(boost::shared_ptr in) { +boost::shared_ptr PeekProcessor::getPipedTransport(boost::shared_ptr in) { return transportFactory_->getTransport(in); } -bool PeekProcessor::process(boost::shared_ptr in, - boost::shared_ptr out) { +void PeekProcessor::setTargetTransport(boost::shared_ptr targetTransport) { + targetTransport_ = targetTransport; + if (boost::dynamic_pointer_cast(targetTransport_)) { + memoryBuffer_ = boost::dynamic_pointer_cast(targetTransport); + } else if (boost::dynamic_pointer_cast(targetTransport_)) { + memoryBuffer_ = boost::dynamic_pointer_cast(boost::dynamic_pointer_cast(targetTransport_)->getTargetTransport()); + } + + if (!memoryBuffer_) { + throw TException("Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer"); + } +} + +bool PeekProcessor::process(boost::shared_ptr in, + boost::shared_ptr out) { std::string fname; - facebook::thrift::protocol::TMessageType mtype; + TMessageType mtype; int32_t seqid; in->readMessageBegin(fname, mtype, seqid); - if (mtype != facebook::thrift::protocol::T_CALL) { - throw facebook::thrift::TException("Unexpected message type"); + if (mtype != T_CALL) { + throw TException("Unexpected message type"); } // Peek at the name peekName(fname); - facebook::thrift::protocol::TType ftype; + TType ftype; int16_t fid; while (true) { in->readFieldBegin(fname, ftype, fid); - if (ftype == facebook::thrift::protocol::T_STOP) { + if (ftype == T_STOP) { break; } @@ -73,9 +91,9 @@ void PeekProcessor::peekName(const std::string& fname) { void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) { } -void PeekProcessor::peek(boost::shared_ptr in, - facebook::thrift::protocol::TType ftype, - int16_t fid) { +void PeekProcessor::peek(boost::shared_ptr in, + TType ftype, + int16_t fid) { in->skip(ftype); } diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h index d1d227c1..6b8414b5 100644 --- a/lib/cpp/src/processor/PeekProcessor.h +++ b/lib/cpp/src/processor/PeekProcessor.h @@ -37,6 +37,8 @@ class PeekProcessor : public facebook::thrift::TProcessor { boost::shared_ptr getPipedTransport(boost::shared_ptr in); + void setTargetTransport(boost::shared_ptr targetTransport); + virtual bool process(boost::shared_ptr in, boost::shared_ptr out); @@ -54,6 +56,7 @@ class PeekProcessor : public facebook::thrift::TProcessor { boost::shared_ptr pipedProtocol_; boost::shared_ptr transportFactory_; boost::shared_ptr memoryBuffer_; + boost::shared_ptr targetTransport_; }; }}} // facebook::thrift::processor diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index 653939ff..cb9a95f7 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -425,6 +425,7 @@ class TPipedTransport : virtual public TTransport { void readEnd() { if (pipeOnRead_) { dstTrans_->write(rBuf_, rLen_); + dstTrans_->flush(); } // reset state @@ -437,11 +438,16 @@ class TPipedTransport : virtual public TTransport { void writeEnd() { if (pipeOnWrite_) { dstTrans_->write(wBuf_, wLen_); + dstTrans_->flush(); } } void flush(); + boost::shared_ptr getTargetTransport() { + return dstTrans_; + } + protected: boost::shared_ptr srcTrans_; boost::shared_ptr dstTrans_;