From e12d48596d4666b393d8f69fa8ef2764b9b5bfaa Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Tue, 6 Feb 2007 03:25:13 +0000 Subject: [PATCH] -- TBufferedRouterTransport being renamed to TPipedTransport Summary: - TBufferedRouterTransport is the most nonsensical name I have ever heard of Reviewed By: slee Test Plan: it compiles git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664979 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/TBufferedRouterTransport.cpp | 76 ---------- .../src/transport/TBufferedRouterTransport.h | 129 ---------------- lib/cpp/src/transport/TTransportUtils.cpp | 70 +++++++++ lib/cpp/src/transport/TTransportUtils.h | 141 ++++++++++++++++++ 4 files changed, 211 insertions(+), 205 deletions(-) delete mode 100644 lib/cpp/src/transport/TBufferedRouterTransport.cpp delete mode 100644 lib/cpp/src/transport/TBufferedRouterTransport.h diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.cpp b/lib/cpp/src/transport/TBufferedRouterTransport.cpp deleted file mode 100644 index 60ab594a..00000000 --- a/lib/cpp/src/transport/TBufferedRouterTransport.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "TBufferedRouterTransport.h" -#include "Thrift.h" -using std::string; - -namespace facebook { namespace thrift { namespace transport { - -uint32_t TBufferedRouterTransport::read(uint8_t* buf, uint32_t len) { - uint32_t need = len; - - // We don't have enough data yet - if (rLen_-rPos_ < need) { - // Copy out whatever we have - if (rLen_-rPos_ > 0) { - memcpy(buf, rBuf_+rPos_, rLen_-rPos_); - need -= rLen_-rPos_; - buf += rLen_-rPos_; - rPos_ = rLen_; - } - - // Double the size of the underlying buffer if it is full - if (rLen_ == rBufSize_) { - rBufSize_ *=2; - rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); - } - - // try to fill up the buffer - rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_); - } - - - // Hand over whatever we have - uint32_t give = need; - if (rLen_-rPos_ < give) { - give = rLen_-rPos_; - } - if (give > 0) { - memcpy(buf, rBuf_+rPos_, give); - rPos_ += give; - need -= give; - } - - return (len - need); -} - -void TBufferedRouterTransport::write(const uint8_t* buf, uint32_t len) { - if (len == 0) { - return; - } - - // Make the buffer as big as it needs to be - if ((len + wLen_) >= wBufSize_) { - uint32_t newBufSize = wBufSize_*2; - while ((len + wLen_) >= newBufSize) { - newBufSize *= 2; - } - wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize); - wBufSize_ = newBufSize; - } - - // Copy into the buffer - memcpy(wBuf_ + wLen_, buf, len); - wLen_ += len; -} - -void TBufferedRouterTransport::flush() { - // Write out any data waiting in the write buffer - if (wLen_ > 0) { - trans_->write(wBuf_, wLen_); - wLen_ = 0; - } - - // Flush the underlying transport - trans_->flush(); -} - -}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h deleted file mode 100644 index 0b4577c4..00000000 --- a/lib/cpp/src/transport/TBufferedRouterTransport.h +++ /dev/null @@ -1,129 +0,0 @@ -#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ 1 - -#include "TTransport.h" -#include "Thrift.h" -#include - -#include - -namespace facebook { namespace thrift { namespace transport { - -using namespace boost; - -/** - * BufferedRouterTransport. Funcationally equivalent to TBufferedTransport - * but routes the request to another Transport (typical use case is to route - * the request to TFileTransport to store the request on disk). The - * underlying buffer expands to a keep a copy of the entire request/response. - * - * @author Aditya Agarwal - */ -class TBufferedRouterTransport : public TTransport { - public: - TBufferedRouterTransport(shared_ptr trans, shared_ptr rtrans) : - trans_(trans), - rtrans_(rtrans), - rBufSize_(512), rPos_(0), rLen_(0), - wBufSize_(512), wLen_(0) { - - rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); - wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); - } - - TBufferedRouterTransport(shared_ptr trans, shared_ptr rtrans, uint32_t sz) : - trans_(trans), - rtrans_(rtrans), - rBufSize_(512), rPos_(0), rLen_(0), - wBufSize_(sz), wLen_(0) { - - rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); - wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); - } - - ~TBufferedRouterTransport() { - free(rBuf_); - free(wBuf_); - } - - bool isOpen() { - return trans_->isOpen(); - } - - bool peek() { - if (rPos_ >= rLen_) { - // Double the size of the underlying buffer if it is full - if (rLen_ == rBufSize_) { - rBufSize_ *=2; - rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); - } - - // try to fill up the buffer - rLen_ += trans_->read(rBuf_+rPos_, rBufSize_ - rPos_); - } - return (rLen_ > rPos_); - } - - - void open() { - trans_->open(); - } - - void close() { - trans_->close(); - } - - uint32_t read(uint8_t* buf, uint32_t len); - - void readEnd() { - rtrans_->write(rBuf_, rLen_); - - // reset state - rLen_ = 0; - rPos_ = 0; - } - - void write(const uint8_t* buf, uint32_t len); - - void flush(); - - protected: - shared_ptr trans_; - shared_ptr rtrans_; - - uint8_t* rBuf_; - uint32_t rBufSize_; - uint32_t rPos_; - uint32_t rLen_; - - uint8_t* wBuf_; - uint32_t wBufSize_; - uint32_t wLen_; -}; - - -/** - * Wraps a transport into a bufferedRouter instance. - * - * @author Aditya Agarwal - */ -class TBufferedRouterTransportFactory : public TTransportFactory { - public: - TBufferedRouterTransportFactory(boost::shared_ptr rTrans): rTrans_(rTrans) {} - - virtual ~TBufferedRouterTransportFactory() {} - - /** - * Wraps the transport into a buffered one. - */ - virtual boost::shared_ptr getTransport(boost::shared_ptr trans) { - return boost::shared_ptr(new TBufferedRouterTransport(trans, rTrans_)); - } - - private: - boost::shared_ptr rTrans_; -}; - -}}} // facebook::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index a885020f..54b1f3f8 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -41,6 +41,7 @@ void TBufferedTransport::write(const uint8_t* buf, uint32_t len) { while ((len-pos) + wLen_ >= wBufSize_) { uint32_t copy = wBufSize_ - wLen_; memcpy(wBuf_ + wLen_, buf + pos, copy); + transport_->write(wBuf_, wBufSize_); pos += copy; wLen_ = 0; @@ -216,4 +217,73 @@ void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) { wPos_ += len; } +uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) { + uint32_t need = len; + + // We don't have enough data yet + if (rLen_-rPos_ < need) { + // Copy out whatever we have + if (rLen_-rPos_ > 0) { + memcpy(buf, rBuf_+rPos_, rLen_-rPos_); + need -= rLen_-rPos_; + buf += rLen_-rPos_; + rPos_ = rLen_; + } + + // Double the size of the underlying buffer if it is full + if (rLen_ == rBufSize_) { + rBufSize_ *=2; + rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); + } + + // try to fill up the buffer + rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_); + } + + + // Hand over whatever we have + uint32_t give = need; + if (rLen_-rPos_ < give) { + give = rLen_-rPos_; + } + if (give > 0) { + memcpy(buf, rBuf_+rPos_, give); + rPos_ += give; + need -= give; + } + + return (len - need); +} + +void TPipedTransport::write(const uint8_t* buf, uint32_t len) { + if (len == 0) { + return; + } + + // Make the buffer as big as it needs to be + if ((len + wLen_) >= wBufSize_) { + uint32_t newBufSize = wBufSize_*2; + while ((len + wLen_) >= newBufSize) { + newBufSize *= 2; + } + wBuf_ = (uint8_t *)realloc(wBuf_, sizeof(uint8_t) * newBufSize); + wBufSize_ = newBufSize; + } + + // Copy into the buffer + memcpy(wBuf_ + wLen_, buf, len); + wLen_ += len; +} + +void TPipedTransport::flush() { + // Write out any data waiting in the write buffer + if (wLen_ > 0) { + srcTrans_->write(wBuf_, wLen_); + wLen_ = 0; + } + + // Flush the underlying transport + srcTrans_->flush(); +} + }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index 79a137c4..06547c7b 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -330,6 +330,147 @@ class TMemoryBuffer : public TTransport { }; +/** + * TPipedTransport. This transport allows piping of a request from one + * transport to another either when readEnd() or writeEnd(). The typicAL + * use case for this is to log a request or a reply to disk. + * The underlying buffer expands to a keep a copy of the entire + * request/response. + * + * @author Aditya Agarwal + */ +class TPipedTransport : public TTransport { + public: + TPipedTransport(boost::shared_ptr srcTrans, + boost::shared_ptr dstTrans) : + srcTrans_(srcTrans), + dstTrans_(dstTrans), + rBufSize_(512), rPos_(0), rLen_(0), + wBufSize_(512), wLen_(0) { + + // default is to to pipe the request when readEnd() is called + pipeOnRead_ = true; + pipeOnWrite_ = false; + + rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); + wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); + } + + TPipedTransport(boost::shared_ptr srcTrans, + boost::shared_ptr dstTrans, + uint32_t sz) : + srcTrans_(srcTrans), + dstTrans_(dstTrans), + rBufSize_(512), rPos_(0), rLen_(0), + wBufSize_(sz), wLen_(0) { + + rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); + wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); + } + + ~TPipedTransport() { + free(rBuf_); + free(wBuf_); + } + + bool isOpen() { + return srcTrans_->isOpen(); + } + + bool peek() { + if (rPos_ >= rLen_) { + // Double the size of the underlying buffer if it is full + if (rLen_ == rBufSize_) { + rBufSize_ *=2; + rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); + } + + // try to fill up the buffer + rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_); + } + return (rLen_ > rPos_); + } + + + void open() { + srcTrans_->open(); + } + + void close() { + srcTrans_->close(); + } + + void setPipeOnRead(bool pipeVal) { + pipeOnRead_ = pipeVal; + } + + void setPipeOnWrite(bool pipeVal) { + pipeOnWrite_ = pipeVal; + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void readEnd() { + if (pipeOnRead_) { + dstTrans_->write(rBuf_, rLen_); + } + + // reset state + rLen_ = 0; + rPos_ = 0; + } + + void write(const uint8_t* buf, uint32_t len); + + void writeEnd() { + if (pipeOnWrite_) { + dstTrans_->write(wBuf_, wLen_); + } + } + + void flush(); + + protected: + boost::shared_ptr srcTrans_; + boost::shared_ptr dstTrans_; + + uint8_t* rBuf_; + uint32_t rBufSize_; + uint32_t rPos_; + uint32_t rLen_; + + uint8_t* wBuf_; + uint32_t wBufSize_; + uint32_t wLen_; + + bool pipeOnRead_; + bool pipeOnWrite_; +}; + + +/** + * Wraps a transport into a pipedTransport instance. + * + * @author Aditya Agarwal + */ +class TPipedTransportFactory : public TTransportFactory { + public: + TPipedTransportFactory(boost::shared_ptr dstTrans): dstTrans_(dstTrans) {} + + virtual ~TPipedTransportFactory() {} + + /** + * Wraps the base transport into a piped transport. + */ + virtual boost::shared_ptr getTransport(boost::shared_ptr srcTrans) { + return boost::shared_ptr(new TPipedTransport(srcTrans, dstTrans_)); + } + + private: + boost::shared_ptr dstTrans_; +}; + + }}} // facebook::thrift::transport -- 2.17.1