From 28f298dd5d9b68c9c0dbaefa95c88d51ed8e430a Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 1 May 2008 06:17:36 +0000 Subject: [PATCH] Memory-based transport rewrite. The old implementations of the memory-based transports (TBufferedTransport, TFramedTransport, and TMemoryBuffer) shared very little code and all worked a bit differently. This change unifies them all as subclasses of a single base (TBufferBase) which handles the fast-path operations (when requests can be satisfied by the buffer) with inline methods (that will eventually be made nonvirtual in the template branch) and calls out to pure-virutal methods to handle full/empty buffers. All of the buffer-management is now done in terms of "base and bound" pointers rather than "pos" integers. These classes were moved to TBufferTransports.{h,cpp}. The .h is included in TTransportUtils for backwards compatibility. Also added a "TShortReadTransport" to assist testing transports. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665676 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_cpp_generator.cc | 2 +- contrib/thrift_dump.cpp | 2 +- lib/cpp/Makefile.am | 2 + lib/cpp/src/processor/PeekProcessor.h | 1 + lib/cpp/src/protocol/TDebugProtocol.h | 5 +- lib/cpp/src/protocol/TJSONProtocol.h | 4 +- lib/cpp/src/server/TNonblockingServer.h | 2 +- lib/cpp/src/transport/TBufferTransports.cpp | 353 ++++++++++ lib/cpp/src/transport/TBufferTransports.h | 664 +++++++++++++++++++ lib/cpp/src/transport/THttpClient.cpp | 2 +- lib/cpp/src/transport/THttpClient.h | 2 +- lib/cpp/src/transport/TShortReadTransport.h | 84 +++ lib/cpp/src/transport/TTransportUtils.cpp | 342 ---------- lib/cpp/src/transport/TTransportUtils.h | 493 +------------- test/Benchmark.cpp | 2 +- test/DenseProtoTest.cpp | 2 +- test/JSONProtoTest.cpp | 2 +- test/Makefile.am | 3 +- test/OptionalRequiredTest.cpp | 2 +- test/TBufferBaseTest.cpp | 589 ++++++++++++++++ test/TMemoryBufferTest.cpp | 2 +- test/TPipedTransportTest.cpp | 1 + test/ZlibTest.cpp | 2 +- 23 files changed, 1715 insertions(+), 848 deletions(-) create mode 100644 lib/cpp/src/transport/TBufferTransports.cpp create mode 100644 lib/cpp/src/transport/TBufferTransports.h create mode 100644 lib/cpp/src/transport/TShortReadTransport.h create mode 100644 test/TBufferBaseTest.cpp diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index 0a1d4def..fecb7c73 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -2337,7 +2337,7 @@ void t_cpp_generator::generate_service_skeleton(t_service* tservice) { "#include " << endl << "#include " << endl << "#include " << endl << - "#include " << endl << + "#include " << endl << endl << "using namespace facebook::thrift;" << endl << "using namespace facebook::thrift::protocol;" << endl << diff --git a/contrib/thrift_dump.cpp b/contrib/thrift_dump.cpp index da359fd8..aac1e1cf 100644 --- a/contrib/thrift_dump.cpp +++ b/contrib/thrift_dump.cpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include #include diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index ac5a7769..df64353e 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -41,6 +41,7 @@ libthrift_la_SOURCES = src/Thrift.cpp \ src/transport/TSocketPool.cpp \ src/transport/TServerSocket.cpp \ src/transport/TTransportUtils.cpp \ + src/transport/TBufferTransports.cpp \ src/server/TServer.cpp \ src/server/TSimpleServer.cpp \ src/server/TThreadPoolServer.cpp \ @@ -107,6 +108,7 @@ include_transport_HEADERS = \ src/transport/TTransport.h \ src/transport/TTransportException.h \ src/transport/TTransportUtils.h \ + src/transport/TBufferTransports.h \ src/transport/TZlibTransport.h include_serverdir = $(include_thriftdir)/server diff --git a/lib/cpp/src/processor/PeekProcessor.h b/lib/cpp/src/processor/PeekProcessor.h index 49e7cd9e..697be27e 100644 --- a/lib/cpp/src/processor/PeekProcessor.h +++ b/lib/cpp/src/processor/PeekProcessor.h @@ -11,6 +11,7 @@ #include #include #include +#include #include namespace facebook { namespace thrift { namespace processor { diff --git a/lib/cpp/src/protocol/TDebugProtocol.h b/lib/cpp/src/protocol/TDebugProtocol.h index bac20982..c08720b7 100644 --- a/lib/cpp/src/protocol/TDebugProtocol.h +++ b/lib/cpp/src/protocol/TDebugProtocol.h @@ -12,8 +12,6 @@ #include -#include - namespace facebook { namespace thrift { namespace protocol { /* @@ -158,6 +156,9 @@ class TDebugProtocolFactory : public TProtocolFactory { }}} // facebook::thrift::protocol +// TODO(dreiss): Move (part of) ThriftDebugString into a .cpp file and remove this. +#include + namespace facebook { namespace thrift { template diff --git a/lib/cpp/src/protocol/TJSONProtocol.h b/lib/cpp/src/protocol/TJSONProtocol.h index efb8e636..cece97e4 100644 --- a/lib/cpp/src/protocol/TJSONProtocol.h +++ b/lib/cpp/src/protocol/TJSONProtocol.h @@ -8,7 +8,6 @@ #define _THRIFT_PROTOCOL_TJSONPROTOCOL_H_ 1 #include "TProtocol.h" -#include #include @@ -303,6 +302,9 @@ class TJSONProtocolFactory { }}} // facebook::thrift::protocol +// TODO(dreiss): Move part of ThriftJSONString into a .cpp file and remove this. +#include + namespace facebook { namespace thrift { template diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index de2e43b9..3de5b948 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -9,7 +9,7 @@ #include #include -#include +#include #include #include #include diff --git a/lib/cpp/src/transport/TBufferTransports.cpp b/lib/cpp/src/transport/TBufferTransports.cpp new file mode 100644 index 00000000..9164a1f9 --- /dev/null +++ b/lib/cpp/src/transport/TBufferTransports.cpp @@ -0,0 +1,353 @@ +// Copyright (c) 2006- Facebook +// Distributed under the Thrift Software License +// +// See accompanying file LICENSE or visit the Thrift site at: +// http://developers.facebook.com/thrift/ + +#include +#include + +#include + +using std::string; + +namespace facebook { namespace thrift { namespace transport { + + +uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) { + uint32_t want = len; + uint32_t have = rBound_ - rBase_; + + // We should only take the slow path if we can't satisfy the read + // with the data already in the buffer. + assert(have < want); + + // Copy out whatever we have. + if (have > 0) { + memcpy(buf, rBase_, have); + want -= have; + buf += have; + } + // Get more from underlying transport up to buffer size. + // Note that this makes a lot of sense if len < rBufSize_ + // and almost no sense otherwise. TODO(dreiss): Fix that + // case (possibly including some readv hotness). + setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); + + // Hand over whatever we have. + uint32_t give = std::min(want, static_cast(rBound_ - rBase_)); + memcpy(buf, rBase_, give); + rBase_ += give; + want -= give; + + return (len - want); +} + +void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) { + uint32_t have_bytes = wBase_ - wBuf_.get(); + uint32_t space = wBound_ - wBase_; + // We should only take the slow path if we can't accomodate the write + // with the free space already in the buffer. + assert(wBound_ - wBase_ < static_cast(len)); + + // Now here's the tricky question: should we copy data from buf into our + // internal buffer and write it from there, or should we just write out + // the current internal buffer in one syscall and write out buf in another. + // If our currently buffered data plus buf is at least double our buffer + // size, we will have to do two syscalls no matter what (except in the + // degenerate case when our buffer is empty), so there is no use copying. + // Otherwise, there is sort of a sliding scale. If we have N-1 bytes + // buffered and need to write 2, it would be crazy to do two syscalls. + // On the other hand, if we have 2 bytes buffered and are writing 2N-3, + // we can save a syscall in the short term by loading up our buffer, writing + // it out, and copying the rest of the bytes into our buffer. Of course, + // if we get another 2-byte write, we haven't saved any syscalls at all, + // and have just copied nearly 2N bytes for nothing. Finding a perfect + // policy would require predicting the size of future writes, so we're just + // going to always eschew syscalls if we have less than 2N bytes to write. + + // The case where we have to do two syscalls. + // This case also covers the case where the buffer is empty, + // but it is clearer (I think) to think of it as two separate cases. + if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) { + // TODO(dreiss): writev + if (have_bytes > 0) { + transport_->write(wBuf_.get(), have_bytes); + } + transport_->write(buf, len); + wBase_ = wBuf_.get(); + return; + } + + // Fill up our internal buffer for a write. + memcpy(wBase_, buf, space); + buf += space; + len -= space; + transport_->write(wBuf_.get(), wBufSize_); + + // Copy the rest into our buffer. + assert(len < wBufSize_); + memcpy(wBuf_.get(), buf, len); + wBase_ = wBuf_.get() + len; + return; +} + +const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { + // If the request is bigger than our buffer, we are hosed. + if (*len > rBufSize_) { + return NULL; + } + + // The number of bytes of data we have already. + uint32_t have = rBound_ - rBase_; + // The number of additional bytes we need from the underlying transport. + uint32_t need = *len - have; + // The space from the start of the buffer to the end of our data. + uint32_t offset = rBound_ - rBuf_.get(); + assert(need > 0); + + // If we have less than half our buffer space available, shift the data + // we have down to the start. If the borrow is big compared to our buffer, + // this could be kind of a waste, but if the borrow is small, it frees up + // space at the end of our buffer to do a bigger single read from the + // underlying transport. Also, if our needs extend past the end of the + // buffer, we have to do a copy no matter what. + if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) { + memmove(rBuf_.get(), rBase_, have); + setReadBuffer(rBuf_.get(), have); + } + + // First try to fill up the buffer. + uint32_t got = transport_->read(rBound_, rBufSize_ - have); + rBound_ += got; + need -= got; + + // If that fails, readAll until we get what we need. + if (need > 0) { + rBound_ += transport_->readAll(rBound_, need); + } + + *len = rBound_ - rBase_; + return rBase_; +} + +void TBufferedTransport::flush() { + // Write out any data waiting in the write buffer. + uint32_t have_bytes = wBase_ - wBuf_.get(); + if (have_bytes > 0) { + transport_->write(wBuf_.get(), have_bytes); + wBase_ = wBuf_.get(); + } + + // Flush the underlying transport. + transport_->flush(); +} + + +uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { + uint32_t want = len; + uint32_t have = rBound_ - rBase_; + + // We should only take the slow path if we can't satisfy the read + // with the data already in the buffer. + assert(have < want); + + // Copy out whatever we have. + if (have > 0) { + memcpy(buf, rBase_, have); + want -= have; + buf += have; + } + + // Read another frame. + readFrame(); + + // TODO(dreiss): Should we warn when reads cross frames? + + // Hand over whatever we have. + uint32_t give = std::min(want, static_cast(rBound_ - rBase_)); + memcpy(buf, rBase_, give); + rBase_ += give; + want -= give; + + return (len - want); +} + +void TFramedTransport::readFrame() { + // TODO(dreiss): Think about using readv here, even though it would + // result in (gasp) read-ahead. + + // Read the size of the next frame. + int32_t sz; + transport_->readAll((uint8_t*)&sz, sizeof(sz)); + sz = ntohl(sz); + + if (sz < 0) { + throw TTransportException("Frame size has negative value"); + } + + // Read the frame payload, and reset markers. + if (sz > static_cast(rBufSize_)) { + rBuf_.reset(new uint8_t[sz]); + rBufSize_ = sz; + } + transport_->readAll(rBuf_.get(), sz); + setReadBuffer(rBuf_.get(), sz); +} + +void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) { + // Double buffer size until sufficient. + uint32_t have = wBase_ - wBuf_.get(); + while (wBufSize_ < len + have) { + wBufSize_ *= 2; + } + + // TODO(dreiss): Consider modifying this class to use malloc/free + // so we can use realloc here. + + // Allocate new buffer. + uint8_t* new_buf = new uint8_t[wBufSize_]; + + // Copy the old buffer to the new one. + memcpy(new_buf, wBuf_.get(), have); + + // Now point buf to the new one. + wBuf_.reset(new_buf); + wBase_ = wBuf_.get() + have; + wBound_ = wBuf_.get() + wBufSize_; + + // Copy the data into the new buffer. + memcpy(wBase_, buf, len); + wBase_ += len; +} + +void TFramedTransport::flush() { + int32_t sz_hbo, sz_nbo; + assert(wBufSize_ > sizeof(sz_nbo)); + + // Slip the frame size into the start of the buffer. + sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo)); + sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo)); + memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo)); + + // Write size and frame body. + transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo); + + // Reset our pointers. + wBase_ = wBuf_.get(); + + // Pad the buffer so we can insert the size later. + uint32_t pad = 0; + this->write((uint8_t*)&pad, sizeof(pad)); + + // Flush the underlying transport. + transport_->flush(); +} + +const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { + // Don't try to be clever with shifting buffers. + // If the fast path failed let the protocol use its slow path. + // Besides, who is going to try to borrow across messages? + return NULL; +} + + +void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) { + // Correct rBound_ so we can use the fast path in the future. + rBound_ = wBase_; + + // Decide how much to give. + uint32_t give = std::min(len, available_read()); + + *out_start = rBase_; + *out_give = give; + + // Preincrement rBase_ so the caller doesn't have to. + rBase_ += give; +} + +uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) { + uint8_t* start; + uint32_t give; + computeRead(len, &start, &give); + + // Copy into the provided buffer. + memcpy(buf, start, give); + + return give; +} + +uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) { + // Don't get some stupid assertion failure. + if (buffer_ == NULL) { + return 0; + } + + uint8_t* start; + uint32_t give; + computeRead(len, &start, &give); + + // Append to the provided string. + str.append((char*)start, give); + + return give; +} + +void TMemoryBuffer::ensureCanWrite(uint32_t len) { + // Check available space + uint32_t avail = available_write(); + if (len <= avail) { + return; + } + + if (!owner_) { + throw TTransportException("Insufficient space in external MemoryBuffer"); + } + + // Grow the buffer as necessary. + while (len > avail) { + bufferSize_ *= 2; + wBound_ = buffer_ + bufferSize_; + avail = available_write(); + } + + // Allocate into a new pointer so we don't bork ours if it fails. + void* new_buffer = std::realloc(buffer_, bufferSize_); + if (new_buffer == NULL) { + throw TTransportException("Out of memory."); + } + + ptrdiff_t offset = (uint8_t*)new_buffer - buffer_; + buffer_ += offset; + rBase_ += offset; + rBound_ += offset; + wBase_ += offset; + wBound_ += offset; +} + +void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) { + ensureCanWrite(len); + + // Copy into the buffer and increment wBase_. + memcpy(wBase_, buf, len); + wBase_ += len; +} + +void TMemoryBuffer::wroteBytes(uint32_t len) { + uint32_t avail = available_write(); + if (len > avail) { + throw TTransportException("Client wrote more bytes than size of buffer."); + } + wBase_ += len; +} + +const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) { + rBound_ = wBase_; + if (available_read() >= *len) { + *len = available_read(); + return rBase_; + } + return NULL; +} + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h new file mode 100644 index 00000000..4277d402 --- /dev/null +++ b/lib/cpp/src/transport/TBufferTransports.h @@ -0,0 +1,664 @@ +// Copyright (c) 2006- Facebook +// Distributed under the Thrift Software License +// +// See accompanying file LICENSE or visit the Thrift site at: +// http://developers.facebook.com/thrift/ + +#ifndef _THRIFT_TRANSPORT_TDOUBLEBUFFERS_H_ +#define _THRIFT_TRANSPORT_TDOUBLEBUFFERS_H_ 1 + +#include "boost/scoped_array.hpp" + +#include + +#ifdef __GNUC__ +#define TDB_LIKELY(val) (__builtin_expect((val), 1)) +#define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) +#else +#define TDB_LIKELY(val) (val) +#define TDB_UNLIKELY(val) (val) +#endif + +namespace facebook { namespace thrift { namespace transport { + + +/** + * Base class for all transports that use read/write buffers for performance. + * + * TBufferBase is designed to implement the fast-path "memcpy" style + * operations that work in the common case. It does so with small and + * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract + * class. Subclasses are expected to define the "slow path" operations + * that have to be done when the buffers are full or empty. + * + * @author David Reiss + */ +class TBufferBase : public TTransport { + + public: + + /** + * Fast-path read. + * + * When we have enough data buffered to fulfill the read, we can satisfy it + * with a single memcpy, then adjust our internal pointers. If the buffer + * is empty, we call out to our slow path, implemented by a subclass. + * This method is meant to eventually be nonvirtual and inlinable. + */ + uint32_t read(uint8_t* buf, uint32_t len) { + uint8_t* new_rBase = rBase_ + len; + if (TDB_LIKELY(new_rBase <= rBound_)) { + memcpy(buf, rBase_, len); + rBase_ = new_rBase; + return len; + } + return readSlow(buf, len); + } + + /** + * Fast-path write. + * + * When we have enough empty space in our buffer to accomodate the write, we + * can satisfy it with a single memcpy, then adjust our internal pointers. + * If the buffer is full, we call out to our slow path, implemented by a + * subclass. This method is meant to eventually be nonvirtual and + * inlinable. + */ + void write(const uint8_t* buf, uint32_t len) { + uint8_t* new_wBase = wBase_ + len; + if (TDB_LIKELY(new_wBase <= wBound_)) { + memcpy(wBase_, buf, len); + wBase_ = new_wBase; + return; + } + writeSlow(buf, len); + } + + /** + * Fast-path borrow. A lot like the fast-path read. + */ + const uint8_t* borrow(uint8_t* buf, uint32_t* len) { + if (TDB_LIKELY(static_cast(*len) <= rBound_ - rBase_)) { + // With strict aliasing, writing to len shouldn't force us to + // refetch rBase_ from memory. TODO(dreiss): Verify this. + *len = rBound_ - rBase_; + return rBase_; + } + return borrowSlow(buf, len); + } + + /** + * Consume doesn't require a slow path. + */ + void consume(uint32_t len) { + if (TDB_LIKELY(static_cast(len) <= rBound_ - rBase_)) { + rBase_ += len; + } else { + throw TTransportException(TTransportException::BAD_ARGS, + "consume did not follow a borrow."); + } + } + + + protected: + + /// Slow path read. + virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; + + /// Slow path read. + virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; + + /** + * Slow path borrow. + * + * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len + */ + virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; + + /** + * Trivial constructor. + * + * Initialize pointers safely. Constructing is not a very + * performance-sensitive operation, so it is okay to just leave it to + * the concrete class to set up pointers correctly. + */ + TBufferBase() + : rBase_(NULL) + , rBound_(NULL) + , wBase_(NULL) + , wBound_(NULL) + {} + + /// Convenience mutator for setting the read buffer. + void setReadBuffer(uint8_t* buf, uint32_t len) { + rBase_ = buf; + rBound_ = buf+len; + } + + /// Convenience mutator for setting the read buffer. + void setWriteBuffer(uint8_t* buf, uint32_t len) { + wBase_ = buf; + wBound_ = buf+len; + } + + virtual ~TBufferBase() {} + + /// Reads begin here. + uint8_t* rBase_; + /// Reads may extend to just before here. + uint8_t* rBound_; + + /// Writes begin here. + uint8_t* wBase_; + /// Writes may extend to just before here. + uint8_t* wBound_; +}; + + +/** + * Buffered transport. For reads it will read more data than is requested + * and will serve future data out of a local buffer. For writes, data is + * stored to an in memory buffer before being written out. + * + * @author Mark Slee + * @author David Reiss + */ +class TBufferedTransport : public TBufferBase { + public: + + static const int DEFAULT_BUFFER_SIZE = 512; + + /// Use default buffer sizes. + TBufferedTransport(boost::shared_ptr transport) + : transport_(transport) + , rBufSize_(DEFAULT_BUFFER_SIZE) + , wBufSize_(DEFAULT_BUFFER_SIZE) + , rBuf_(new uint8_t[rBufSize_]) + , wBuf_(new uint8_t[wBufSize_]) + { + initPointers(); + } + + /// Use specified buffer sizes. + TBufferedTransport(boost::shared_ptr transport, uint32_t sz) + : transport_(transport) + , rBufSize_(sz) + , wBufSize_(sz) + , rBuf_(new uint8_t[rBufSize_]) + , wBuf_(new uint8_t[wBufSize_]) + { + initPointers(); + } + + /// Use specified read and write buffer sizes. + TBufferedTransport(boost::shared_ptr transport, uint32_t rsz, uint32_t wsz) + : transport_(transport) + , rBufSize_(rsz) + , wBufSize_(rsz) + , rBuf_(new uint8_t[rBufSize_]) + , wBuf_(new uint8_t[wBufSize_]) + { + initPointers(); + } + + void open() { + transport_->open(); + } + + bool isOpen() { + return transport_->isOpen(); + } + + bool peek() { + if (rBase_ == rBound_) { + setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); + } + return (rBound_ > rBase_); + } + + void close() { + flush(); + transport_->close(); + } + + virtual uint32_t readSlow(uint8_t* buf, uint32_t len); + + virtual void writeSlow(const uint8_t* buf, uint32_t len); + + void flush(); + + + /** + * The following behavior is currently implemented by TBufferedTransport, + * but that may change in a future version: + * 1/ If len is at most rBufSize_, borrow will never return NULL. + * Depending on the underlying transport, it could throw an exception + * or hang forever. + * 2/ Some borrow requests may copy bytes internally. However, + * if len is at most rBufSize_/2, none of the copied bytes + * will ever have to be copied again. For optimial performance, + * stay under this limit. + */ + virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); + + boost::shared_ptr getUnderlyingTransport() { + return transport_; + } + + protected: + void initPointers() { + setReadBuffer(rBuf_.get(), 0); + setWriteBuffer(wBuf_.get(), wBufSize_); + // Write size never changes. + } + + boost::shared_ptr transport_; + + uint32_t rBufSize_; + uint32_t wBufSize_; + boost::scoped_array rBuf_; + boost::scoped_array wBuf_; +}; + + +/** + * Wraps a transport into a buffered one. + * + * @author Mark Slee + */ +class TBufferedTransportFactory : public TTransportFactory { + public: + TBufferedTransportFactory() {} + + virtual ~TBufferedTransportFactory() {} + + /** + * Wraps the transport into a buffered one. + */ + virtual boost::shared_ptr getTransport(boost::shared_ptr trans) { + return boost::shared_ptr(new TBufferedTransport(trans)); + } + +}; + + +/** + * Framed transport. All writes go into an in-memory buffer until flush is + * called, at which point the transport writes the length of the entire + * binary chunk followed by the data payload. This allows the receiver on the + * other end to always do fixed-length reads. + * + * @author Mark Slee + * @author David Reiss + */ +class TFramedTransport : public TBufferBase { + public: + + static const int DEFAULT_BUFFER_SIZE = 512; + + /// Use default buffer sizes. + TFramedTransport(boost::shared_ptr transport) + : transport_(transport) + , rBufSize_(0) + , wBufSize_(DEFAULT_BUFFER_SIZE) + , rBuf_() + , wBuf_(new uint8_t[wBufSize_]) + { + initPointers(); + } + + TFramedTransport(boost::shared_ptr transport, uint32_t sz) + : transport_(transport) + , rBufSize_(0) + , wBufSize_(sz) + , rBuf_() + , wBuf_(new uint8_t[wBufSize_]) + { + initPointers(); + } + + void open() { + transport_->open(); + } + + bool isOpen() { + return transport_->isOpen(); + } + + bool peek() { + return (rBase_ < rBound_) || transport_->peek(); + } + + void close() { + flush(); + transport_->close(); + } + + virtual uint32_t readSlow(uint8_t* buf, uint32_t len); + + virtual void writeSlow(const uint8_t* buf, uint32_t len); + + virtual void flush(); + + const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); + + boost::shared_ptr getUnderlyingTransport() { + return transport_; + } + + protected: + /** + * Reads a frame of input from the underlying stream. + */ + void readFrame(); + + void initPointers() { + setReadBuffer(NULL, 0); + setWriteBuffer(wBuf_.get(), wBufSize_); + + // Pad the buffer so we can insert the size later. + int32_t pad = 0; + this->write((uint8_t*)&pad, sizeof(pad)); + } + + boost::shared_ptr transport_; + + uint32_t rBufSize_; + uint32_t wBufSize_; + boost::scoped_array rBuf_; + boost::scoped_array wBuf_; +}; + +/** + * Wraps a transport into a framed one. + * + * @author Dave Simpson + */ +class TFramedTransportFactory : public TTransportFactory { + public: + TFramedTransportFactory() {} + + virtual ~TFramedTransportFactory() {} + + /** + * Wraps the transport into a framed one. + */ + virtual boost::shared_ptr getTransport(boost::shared_ptr trans) { + return boost::shared_ptr(new TFramedTransport(trans)); + } + +}; + + +/** + * A memory buffer is a tranpsort that simply reads from and writes to an + * in memory buffer. Anytime you call write on it, the data is simply placed + * into a buffer, and anytime you call read, data is read from that buffer. + * + * The buffers are allocated using C constructs malloc,realloc, and the size + * doubles as necessary. We've considered using scoped + * + * @author Mark Slee + * @author David Reiss + */ +class TMemoryBuffer : public TBufferBase { + private: + + // Common initialization done by all constructors. + void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { + if (buf == NULL && size != 0) { + assert(owner); + buf = (uint8_t*)std::malloc(size); + if (buf == NULL) { + throw TTransportException("Out of memory"); + } + } + + buffer_ = buf; + bufferSize_ = size; + + rBase_ = buffer_; + rBound_ = buffer_ + wPos; + // TODO(dreiss): Investigate NULL-ing this if !owner. + wBase_ = buffer_ + wPos; + wBound_ = buffer_ + bufferSize_; + + owner_ = owner; + + // rBound_ is really an artifact. In principle, it should always be + // equal to wBase_. We update it in a few places (computeRead, etc.). + } + + public: + static const uint32_t defaultSize = 1024; + + /** + * This enum specifies how a TMemoryBuffer should treat + * memory passed to it via constructors or resetBuffer. + * + * OBSERVE: + * TMemoryBuffer will simply store a pointer to the memory. + * It is the callers responsibility to ensure that the pointer + * remains valid for the lifetime of the TMemoryBuffer, + * and that it is properly cleaned up. + * Note that no data can be written to observed buffers. + * + * COPY: + * TMemoryBuffer will make an internal copy of the buffer. + * The caller has no responsibilities. + * + * TAKE_OWNERSHIP: + * TMemoryBuffer will become the "owner" of the buffer, + * and will be responsible for freeing it. + * The membory must have been allocated with malloc. + */ + enum MemoryPolicy { + OBSERVE = 1, + COPY = 2, + TAKE_OWNERSHIP = 3, + }; + + /** + * Construct a TMemoryBuffer with a default-sized buffer, + * owned by the TMemoryBuffer object. + */ + TMemoryBuffer() { + initCommon(NULL, defaultSize, true, 0); + } + + /** + * Construct a TMemoryBuffer with a buffer of a specified size, + * owned by the TMemoryBuffer object. + * + * @param sz The initial size of the buffer. + */ + TMemoryBuffer(uint32_t sz) { + initCommon(NULL, sz, true, 0); + } + + /** + * Construct a TMemoryBuffer with buf as its initial contents. + * + * @param buf The initial contents of the buffer. + * Note that, while buf is a non-const pointer, + * TMemoryBuffer will not write to it if policy == OBSERVE, + * so it is safe to const_cast(whatever). + * @param sz The size of @c buf. + * @param policy See @link MemoryPolicy @endlink . + */ + TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { + if (buf == NULL && sz != 0) { + throw TTransportException(TTransportException::BAD_ARGS, + "TMemoryBuffer given null buffer with non-zero size."); + } + + switch (policy) { + case OBSERVE: + case TAKE_OWNERSHIP: + initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); + break; + case COPY: + initCommon(NULL, sz, true, 0); + this->write(buf, sz); + break; + default: + throw TTransportException(TTransportException::BAD_ARGS, + "Invalid MemoryPolicy for TMemoryBuffer"); + } + } + + ~TMemoryBuffer() { + if (owner_) { + std::free(buffer_); + } + } + + bool isOpen() { + return true; + } + + bool peek() { + return (rBase_ < wBase_); + } + + void open() {} + + void close() {} + + // TODO(dreiss): Make bufPtr const. + void getBuffer(uint8_t** bufPtr, uint32_t* sz) { + *bufPtr = rBase_; + *sz = wBase_ - rBase_; + } + + std::string getBufferAsString() { + if (buffer_ == NULL) { + return ""; + } + uint8_t* buf; + uint32_t sz; + getBuffer(&buf, &sz); + return std::string((char*)buf, (std::string::size_type)sz); + } + + void appendBufferToString(std::string& str) { + if (buffer_ == NULL) { + return; + } + uint8_t* buf; + uint32_t sz; + getBuffer(&buf, &sz); + str.append((char*)buf, sz); + } + + void resetBuffer() { + rBase_ = buffer_; + rBound_ = buffer_; + wBase_ = buffer_; + // It isn't safe to write into a buffer we don't own. + if (!owner_) { + wBound_ = wBase_; + bufferSize_ = 0; + } + } + + /// See constructor documentation. + void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { + // Use a variant of the copy-and-swap trick for assignment operators. + // This is sub-optimal in terms of performance for two reasons: + // 1/ The constructing and swapping of the (small) values + // in the temporary object takes some time, and is not necessary. + // 2/ If policy == COPY, we allocate the new buffer before + // freeing the old one, precluding the possibility of + // reusing that memory. + // I doubt that either of these problems could be optimized away, + // but the second is probably no a common case, and the first is minor. + // I don't expect resetBuffer to be a common operation, so I'm willing to + // bite the performance bullet to make the method this simple. + + // Construct the new buffer. + TMemoryBuffer new_buffer(buf, sz, policy); + // Move it into ourself. + this->swap(new_buffer); + // Our old self gets destroyed. + } + + std::string readAsString(uint32_t len) { + std::string str; + (void)readAppendToString(str, len); + return str; + } + + uint32_t readAppendToString(std::string& str, uint32_t len); + + void readEnd() { + if (rBase_ == wBase_) { + resetBuffer(); + } + } + + uint32_t available_read() const { + // Remember, wBase_ is the real rBound_. + return wBase_ - rBase_; + } + + uint32_t available_write() const { + return wBound_ - wBase_; + } + + // Returns a pointer to where the client can write data to append to + // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a + // write of the provided length. The returned pointer is very convenient for + // passing to read(), recv(), or similar. You must call wroteBytes() as soon + // as data is written or the buffer will not be aware that data has changed. + uint8_t* getWritePtr(uint32_t len) { + ensureCanWrite(len); + return wBase_; + } + + // Informs the buffer that the client has written 'len' bytes into storage + // that had been provided by getWritePtr(). + void wroteBytes(uint32_t len); + + protected: + void swap(TMemoryBuffer& that) { + using std::swap; + swap(buffer_, that.buffer_); + swap(bufferSize_, that.bufferSize_); + + swap(rBase_, that.rBase_); + swap(rBound_, that.rBound_); + swap(wBase_, that.wBase_); + swap(wBound_, that.wBound_); + + swap(owner_, that.owner_); + } + + // Make sure there's at least 'len' bytes available for writing. + void ensureCanWrite(uint32_t len); + + // Compute the position and available data for reading. + void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); + + uint32_t readSlow(uint8_t* buf, uint32_t len); + + void writeSlow(const uint8_t* buf, uint32_t len); + + const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); + + // Data buffer + uint8_t* buffer_; + + // Allocated buffer size + uint32_t bufferSize_; + + // Is this object the owner of the buffer? + bool owner_; + + // Don't forget to update constrctors, initCommon, and swap if + // you add new members. +}; + +}}} // facebook::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_TDOUBLEBUFFERS_H_ diff --git a/lib/cpp/src/transport/THttpClient.cpp b/lib/cpp/src/transport/THttpClient.cpp index a2940932..8c82d7fd 100644 --- a/lib/cpp/src/transport/THttpClient.cpp +++ b/lib/cpp/src/transport/THttpClient.cpp @@ -71,7 +71,7 @@ THttpClient::~THttpClient() { } uint32_t THttpClient::read(uint8_t* buf, uint32_t len) { - if (readBuffer_.available() == 0) { + if (readBuffer_.available_read() == 0) { readBuffer_.resetBuffer(); uint32_t got = readMoreData(); if (got == 0) { diff --git a/lib/cpp/src/transport/THttpClient.h b/lib/cpp/src/transport/THttpClient.h index 1ae3575c..dbffe99d 100644 --- a/lib/cpp/src/transport/THttpClient.h +++ b/lib/cpp/src/transport/THttpClient.h @@ -7,7 +7,7 @@ #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_ #define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1 -#include +#include namespace facebook { namespace thrift { namespace transport { diff --git a/lib/cpp/src/transport/TShortReadTransport.h b/lib/cpp/src/transport/TShortReadTransport.h new file mode 100644 index 00000000..428fa9e5 --- /dev/null +++ b/lib/cpp/src/transport/TShortReadTransport.h @@ -0,0 +1,84 @@ +// Copyright (c) 2006- Facebook +// Distributed under the Thrift Software License +// +// See accompanying file LICENSE or visit the Thrift site at: +// http://developers.facebook.com/thrift/ + +#ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ +#define _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ 1 + +#include + +#include + +namespace facebook { namespace thrift { namespace transport { namespace test { + +/** + * This class is only meant for testing. It wraps another transport. + * Calls to read are passed through with some probability. Otherwise, + * the read amount is randomly reduced before being passed through. + * + * @author David Reiss + */ +class TShortReadTransport : public TTransport { + public: + TShortReadTransport(boost::shared_ptr transport, double full_prob) + : transport_(transport) + , fullProb_(full_prob) + {} + + bool isOpen() { + return transport_->isOpen(); + } + + bool peek() { + return transport_->peek(); + } + + void open() { + transport_->open(); + } + + void close() { + transport_->close(); + } + + uint32_t read(uint8_t* buf, uint32_t len) { + if (len == 0) { + return 0; + } + + if (rand()/(double)RAND_MAX >= fullProb_) { + len = 1 + rand()%len; + } + return transport_->read(buf, len); + } + + void write(const uint8_t* buf, uint32_t len) { + transport_->write(buf, len); + } + + void flush() { + transport_->flush(); + } + + const uint8_t* borrow(uint8_t* buf, uint32_t* len) { + return transport_->borrow(buf, len); + } + + void consume(uint32_t len) { + return transport_->consume(len); + } + + boost::shared_ptr getUnderlyingTransport() { + return transport_; + } + + protected: + boost::shared_ptr transport_; + double fullProb_; +}; + +}}}} // facebook::thrift::transport::test + +#endif // #ifndef _THRIFT_TRANSPORT_TSHORTREADTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp index d00025e9..d365a3cc 100644 --- a/lib/cpp/src/transport/TTransportUtils.cpp +++ b/lib/cpp/src/transport/TTransportUtils.cpp @@ -10,348 +10,6 @@ using std::string; namespace facebook { namespace thrift { namespace transport { -uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) { - uint32_t need = len; - - // We don't have enough data yet - if (rLen_-rPos_ < need) { - // Copy out whatever we have - if (rLen_-rPos_ > 0) { - memcpy(buf, rBuf_+rPos_, rLen_-rPos_); - need -= rLen_-rPos_; - buf += rLen_-rPos_; - } - // Get more from underlying transport up to buffer size - rLen_ = transport_->read(rBuf_, rBufSize_); - rPos_ = 0; - } - - // Hand over whatever we have - uint32_t give = need; - if (rLen_-rPos_ < give) { - give = rLen_-rPos_; - } - memcpy(buf, rBuf_+rPos_, give); - rPos_ += give; - need -= give; - return (len - need); -} - -void TBufferedTransport::write(const uint8_t* buf, uint32_t len) { - if (len == 0) { - return; - } - - uint32_t pos = 0; - - while ((len-pos) + wLen_ >= wBufSize_) { - uint32_t copy = wBufSize_ - wLen_; - memcpy(wBuf_ + wLen_, buf + pos, copy); - - transport_->write(wBuf_, wBufSize_); - pos += copy; - wLen_ = 0; - } - - if ((len - pos) > 0) { - memcpy(wBuf_ + wLen_, buf + pos, len - pos); - wLen_ += len - pos; - } -} - -const uint8_t* TBufferedTransport::borrow(uint8_t* buf, uint32_t* len) { - // The number of additional bytes we need from the underlying transport. - // Could be zero or negative. - uint32_t need = *len - (rLen_-rPos_); - - // If we have enough data, just hand over a pointer. - if (need <= 0) { - *len = rLen_-rPos_; - return rBuf_+rPos_; - } - - // If the request is bigger than our buffer, we are hosed. - if (*len > rBufSize_) { - return NULL; - } - - // If we have less than half our buffer available, - // or we need more space than is in the buffer, - // shift the data we have down to the start. - if ((rLen_ > rBufSize_/2) || (rLen_+need > rBufSize_)) { - memmove(rBuf_, rBuf_+rPos_, rLen_-rPos_); - rLen_ -= rPos_; - rPos_ = 0; - } - - // First try to fill up the buffer. - uint32_t got = transport_->read(rBuf_+rLen_, rBufSize_-rLen_); - rLen_ += got; - need -= got; - - // If that fails, readAll until we get what we need. - if (need > 0) { - rLen_ += transport_->readAll(rBuf_+rLen_, need); - } - - *len = rLen_-rPos_; - return rBuf_+rPos_; -} - -void TBufferedTransport::consume(uint32_t len) { - if (rLen_-rPos_ >= len) { - rPos_ += len; - } else { - throw TTransportException(TTransportException::BAD_ARGS, - "consume did not follow a borrow."); - } -} - -void TBufferedTransport::flush() { - // Write out any data waiting in the write buffer - if (wLen_ > 0) { - transport_->write(wBuf_, wLen_); - wLen_ = 0; - } - - // Flush the underlying transport - transport_->flush(); -} - -uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) { - if (!read_) { - return transport_->read(buf, len); - } - - uint32_t need = len; - - // We don't have enough data yet - if (rLen_-rPos_ < need) { - // Copy out whatever we have - if (rLen_-rPos_ > 0) { - memcpy(buf, rBuf_+rPos_, rLen_-rPos_); - need -= rLen_-rPos_; - buf += rLen_-rPos_; - } - - // Read another chunk - readFrame(); - } - - // Hand over whatever we have - uint32_t give = need; - if (rLen_-rPos_ < give) { - give = rLen_-rPos_; - } - memcpy(buf, rBuf_+rPos_, give); - rPos_ += give; - need -= give; - return (len - need); -} - -void TFramedTransport::readFrame() { - // Get rid of the old frame - if (rBuf_ != NULL) { - delete [] rBuf_; - rBuf_ = NULL; - } - - // Read in the next chunk size - int32_t sz; - transport_->readAll((uint8_t*)&sz, 4); - sz = (int32_t)ntohl(sz); - - if (sz < 0) { - throw TTransportException("Frame size has negative value"); - } - - // Read the frame payload, reset markers - rBuf_ = new uint8_t[sz]; - transport_->readAll(rBuf_, sz); - rPos_ = 0; - rLen_ = sz; -} - -void TFramedTransport::write(const uint8_t* buf, uint32_t len) { - if (len == 0) { - return; - } - - // Shortcut out if not write mode - if (!write_) { - transport_->write(buf, len); - return; - } - - // Need to grow the buffer - if (len + wLen_ >= wBufSize_) { - - // Double buffer size until sufficient - while (wBufSize_ < len + wLen_) { - wBufSize_ *= 2; - } - - // Allocate new buffer - uint8_t* wBuf2 = new uint8_t[wBufSize_]; - - // Copy the old buffer to the new one - memcpy(wBuf2, wBuf_, wLen_); - - // Now point buf to the new one - delete [] wBuf_; - wBuf_ = wBuf2; - } - - // Copy data into buffer - memcpy(wBuf_ + wLen_, buf, len); - wLen_ += len; -} - -void TFramedTransport::flush() { - if (!write_) { - transport_->flush(); - return; - } - - // Write frame size - int32_t sz = wLen_; - sz = (int32_t)htonl(sz); - - transport_->write((const uint8_t*)&sz, 4); - - // Write frame body - if (wLen_ > 0) { - transport_->write(wBuf_, wLen_); - } - - // All done - wLen_ = 0; - - // Flush the underlying - transport_->flush(); -} - -const uint8_t* TFramedTransport::borrow(uint8_t* buf, uint32_t* len) { - // Don't try to be clever with shifting buffers. - // If we have enough data, give a pointer to it, - // otherwise let the protcol use its slow path. - if (read_ && (rLen_-rPos_ >= *len)) { - *len = rLen_-rPos_; - return rBuf_+rPos_; - } - return NULL; -} - -void TFramedTransport::consume(uint32_t len) { - if (rLen_-rPos_ >= len) { - rPos_ += len; - } else { - throw TTransportException(TTransportException::BAD_ARGS, - "consume did not follow a borrow."); - } -} - -uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) { - // Check avaible data for reading - uint32_t avail = wPos_ - rPos_; - if (avail == 0) { - return 0; - } - - // Decide how much to give - uint32_t give = len; - if (avail < len) { - give = avail; - } - - // Copy into buffer and increment rPos_ - memcpy(buf, buffer_ + rPos_, give); - rPos_ += give; - - return give; -} - -uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) { - // Don't get some stupid assertion failure. - if (buffer_ == NULL) { - return 0; - } - - // Check avaible data for reading - uint32_t avail = wPos_ - rPos_; - if (avail == 0) { - return 0; - } - - // Device how much to give - uint32_t give = len; - if (avail < len) { - give = avail; - } - - // Reserve memory, copy into string, and increment rPos_ - str.reserve(str.length()+give); - str.append((char*)buffer_ + rPos_, give); - rPos_ += give; - - return give; -} - -void TMemoryBuffer::ensureCanWrite(uint32_t len) { - // Check available space - uint32_t avail = bufferSize_ - wPos_; - if (len <= avail) { - return; - } - - if (!owner_) { - throw TTransportException("Insufficient space in external MemoryBuffer"); - } - - // Grow the buffer as necessary - while (len > avail) { - bufferSize_ *= 2; - avail = bufferSize_ - wPos_; - } - buffer_ = (uint8_t*)std::realloc(buffer_, bufferSize_); - if (buffer_ == NULL) { - throw TTransportException("Out of memory."); - } -} - -void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) { - ensureCanWrite(len); - - // Copy into the buffer and increment wPos_ - memcpy(buffer_ + wPos_, buf, len); - wPos_ += len; -} - -void TMemoryBuffer::wroteBytes(uint32_t len) { - uint32_t avail = bufferSize_ - wPos_; - if (len > avail) { - throw TTransportException("Client wrote more bytes than size of buffer."); - } - wPos_ += len; -} - -const uint8_t* TMemoryBuffer::borrow(uint8_t* buf, uint32_t* len) { - if (wPos_-rPos_ >= *len) { - *len = wPos_-rPos_; - return buffer_ + rPos_; - } - return NULL; -} - -void TMemoryBuffer::consume(uint32_t len) { - if (wPos_-rPos_ >= len) { - rPos_ += len; - } else { - throw TTransportException(TTransportException::BAD_ARGS, - "consume did not follow a borrow."); - } -} - uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) { uint32_t need = len; diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index 6c7e4db1..61f6822d 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -12,6 +12,8 @@ #include #include #include +// Include the buffered transports that used to be defined here. +#include #include namespace facebook { namespace thrift { namespace transport { @@ -43,497 +45,6 @@ class TNullTransport : public TTransport { }; -/** - * Buffered transport. For reads it will read more data than is requested - * and will serve future data out of a local buffer. For writes, data is - * stored to an in memory buffer before being written out. - * - * @author Mark Slee - */ -class TBufferedTransport : public TTransport { - public: - TBufferedTransport(boost::shared_ptr transport) : - transport_(transport), - rBufSize_(512), rPos_(0), rLen_(0), - wBufSize_(512), wLen_(0) { - rBuf_ = new uint8_t[rBufSize_]; - wBuf_ = new uint8_t[wBufSize_]; - } - - TBufferedTransport(boost::shared_ptr transport, uint32_t sz) : - transport_(transport), - rBufSize_(sz), rPos_(0), rLen_(0), - wBufSize_(sz), wLen_(0) { - rBuf_ = new uint8_t[rBufSize_]; - wBuf_ = new uint8_t[wBufSize_]; - } - - TBufferedTransport(boost::shared_ptr transport, uint32_t rsz, uint32_t wsz) : - transport_(transport), - rBufSize_(rsz), rPos_(0), rLen_(0), - wBufSize_(wsz), wLen_(0) { - rBuf_ = new uint8_t[rBufSize_]; - wBuf_ = new uint8_t[wBufSize_]; - } - - ~TBufferedTransport() { - delete [] rBuf_; - delete [] wBuf_; - } - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { - if (rPos_ >= rLen_) { - rLen_ = transport_->read(rBuf_, rBufSize_); - rPos_ = 0; - } - return (rLen_ > rPos_); - } - - void open() { - transport_->open(); - } - - void close() { - flush(); - transport_->close(); - } - - uint32_t read(uint8_t* buf, uint32_t len); - - void write(const uint8_t* buf, uint32_t len); - - void flush(); - - /** - * The following behavior is currently implemented by TBufferedTransport, - * but that may change in a future version: - * 1/ If len is at most rBufSize_, borrow will never return NULL. - * Depending on the underlying transport, it could throw an exception - * or hang forever. - * 2/ Some borrow requests may copy bytes internally. However, - * if len is at most rBufSize_/2, none of the copied bytes - * will ever have to be copied again. For optimial performance, - * stay under this limit. - */ - const uint8_t* borrow(uint8_t* buf, uint32_t* len); - - void consume(uint32_t len); - - boost::shared_ptr getUnderlyingTransport() { - return transport_; - } - - protected: - boost::shared_ptr transport_; - uint8_t* rBuf_; - uint32_t rBufSize_; - uint32_t rPos_; - uint32_t rLen_; - - uint8_t* wBuf_; - uint32_t wBufSize_; - uint32_t wLen_; -}; - -/** - * Wraps a transport into a buffered one. - * - * @author Mark Slee - */ -class TBufferedTransportFactory : public TTransportFactory { - public: - TBufferedTransportFactory() {} - - virtual ~TBufferedTransportFactory() {} - - /** - * Wraps the transport into a buffered one. - */ - virtual boost::shared_ptr getTransport(boost::shared_ptr trans) { - return boost::shared_ptr(new TBufferedTransport(trans)); - } - -}; - -/** - * Framed transport. All writes go into an in-memory buffer until flush is - * called, at which point the transport writes the length of the entire - * binary chunk followed by the data payload. This allows the receiver on the - * other end to always do fixed-length reads. - * - * @author Mark Slee - */ -class TFramedTransport : public TTransport { - public: - TFramedTransport(boost::shared_ptr transport) : - transport_(transport), - rPos_(0), - rLen_(0), - read_(true), - wBufSize_(512), - wLen_(0), - write_(true) { - rBuf_ = NULL; - wBuf_ = new uint8_t[wBufSize_]; - } - - TFramedTransport(boost::shared_ptr transport, uint32_t sz) : - transport_(transport), - rPos_(0), - rLen_(0), - read_(true), - wBufSize_(sz), - wLen_(0), - write_(true) { - rBuf_ = NULL; - wBuf_ = new uint8_t[wBufSize_]; - } - - ~TFramedTransport() { - if (rBuf_ != NULL) { - delete [] rBuf_; - } - if (wBuf_ != NULL) { - delete [] wBuf_; - } - } - - void setRead(bool read) { - read_ = read; - } - - void setWrite(bool write) { - write_ = write; - } - - void open() { - transport_->open(); - } - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { - if (rPos_ < rLen_) { - return true; - } - return transport_->peek(); - } - - void close() { - if (wLen_ > 0) { - flush(); - } - transport_->close(); - } - - uint32_t read(uint8_t* buf, uint32_t len); - - void write(const uint8_t* buf, uint32_t len); - - void flush(); - - const uint8_t* borrow(uint8_t* buf, uint32_t* len); - - void consume(uint32_t len); - - boost::shared_ptr getUnderlyingTransport() { - return transport_; - } - - protected: - boost::shared_ptr transport_; - uint8_t* rBuf_; - uint32_t rPos_; - uint32_t rLen_; - bool read_; - - uint8_t* wBuf_; - uint32_t wBufSize_; - uint32_t wLen_; - bool write_; - - /** - * Reads a frame of input from the underlying stream. - */ - void readFrame(); -}; - -/** - * Wraps a transport into a framed one. - * - * @author Dave Simpson - */ -class TFramedTransportFactory : public TTransportFactory { - public: - TFramedTransportFactory() {} - - virtual ~TFramedTransportFactory() {} - - /** - * Wraps the transport into a framed one. - */ - virtual boost::shared_ptr getTransport(boost::shared_ptr trans) { - return boost::shared_ptr(new TFramedTransport(trans)); - } - -}; - - -/** - * A memory buffer is a tranpsort that simply reads from and writes to an - * in memory buffer. Anytime you call write on it, the data is simply placed - * into a buffer, and anytime you call read, data is read from that buffer. - * - * The buffers are allocated using C constructs malloc,realloc, and the size - * doubles as necessary. - * - * @author Mark Slee - * @author David Reiss - */ -class TMemoryBuffer : public TTransport { - private: - - // Common initialization done by all constructors. - void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { - if (buf == NULL && size != 0) { - assert(owner); - buf = (uint8_t*)std::malloc(size); - if (buf == NULL) { - throw TTransportException("Out of memory"); - } - } - - buffer_ = buf; - bufferSize_ = size; - owner_ = owner; - wPos_ = wPos; - rPos_ = 0; - } - - // make sure there's at least 'len' bytes available for writing - void ensureCanWrite(uint32_t len); - - public: - static const uint32_t defaultSize = 1024; - - /** - * This enum specifies how a TMemoryBuffer should treat - * memory passed to it via constructors or resetBuffer. - * - * OBSERVE: - * TMemoryBuffer will simply store a pointer to the memory. - * It is the callers responsibility to ensure that the pointer - * remains valid for the lifetime of the TMemoryBuffer, - * and that it is properly cleaned up. - * Note that no data can be written to observed buffers. - * - * COPY: - * TMemoryBuffer will make an internal copy of the buffer. - * The caller has no responsibilities. - * - * TAKE_OWNERSHIP: - * TMemoryBuffer will become the "owner" of the buffer, - * and will be responsible for freeing it. - * The membory must have been allocated with malloc. - */ - enum MemoryPolicy { - OBSERVE = 1, - COPY = 2, - TAKE_OWNERSHIP = 3, - }; - - /** - * Construct a TMemoryBuffer with a default-sized buffer, - * owned by the TMemoryBuffer object. - */ - TMemoryBuffer() { - initCommon(NULL, defaultSize, true, 0); - } - - /** - * Construct a TMemoryBuffer with a buffer of a specified size, - * owned by the TMemoryBuffer object. - * - * @param sz The initial size of the buffer. - */ - TMemoryBuffer(uint32_t sz) { - initCommon(NULL, sz, true, 0); - } - - /** - * Construct a TMemoryBuffer with buf as its initial contents. - * - * @param buf The initial contents of the buffer. - * Note that, while buf is a non-const pointer, - * TMemoryBuffer will not write to it if policy == OBSERVE, - * so it is safe to const_cast(whatever). - * @param sz The size of @c buf. - * @param policy See @link MemoryPolicy @endlink . - */ - TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { - if (buf == NULL && sz != 0) { - throw TTransportException(TTransportException::BAD_ARGS, - "TMemoryBuffer given null buffer with non-zero size."); - } - - switch (policy) { - case OBSERVE: - case TAKE_OWNERSHIP: - initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); - break; - case COPY: - initCommon(NULL, sz, true, 0); - this->write(buf, sz); - break; - default: - throw TTransportException(TTransportException::BAD_ARGS, - "Invalid MemoryPolicy for TMemoryBuffer"); - } - } - - ~TMemoryBuffer() { - if (owner_) { - std::free(buffer_); - buffer_ = NULL; - } - } - - bool isOpen() { - return true; - } - - bool peek() { - return (rPos_ < wPos_); - } - - void open() {} - - void close() {} - - // TODO(dreiss): Make bufPtr const. - void getBuffer(uint8_t** bufPtr, uint32_t* sz) { - *bufPtr = buffer_; - *sz = wPos_; - } - - std::string getBufferAsString() { - if (buffer_ == NULL) { - return ""; - } - return std::string((char*)buffer_, (std::string::size_type)wPos_); - } - - void appendBufferToString(std::string& str) { - if (buffer_ == NULL) { - return; - } - str.append((char*)buffer_, wPos_); - } - - void resetBuffer() { - wPos_ = 0; - rPos_ = 0; - // It isn't safe to write into a buffer we don't own. - if (!owner_) { - bufferSize_ = 0; - } - } - - /// See constructor documentation. - void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { - // Use a variant of the copy-and-swap trick for assignment operators. - // This is sub-optimal in terms of performance for two reasons: - // 1/ The constructing and swapping of the (small) values - // in the temporary object takes some time, and is not necessary. - // 2/ If policy == COPY, we allocate the new buffer before - // freeing the old one, precluding the possibility of - // reusing that memory. - // I doubt that either of these problems could be optimized away, - // but the second is probably no a common case, and the first is minor. - // I don't expect resetBuffer to be a common operation, so I'm willing to - // bite the performance bullet to make the method this simple. - - // Construct the new buffer. - TMemoryBuffer new_buffer(buf, sz, policy); - // Move it into ourself. - this->swap(new_buffer); - // Our old self gets destroyed. - } - - uint32_t read(uint8_t* buf, uint32_t len); - - std::string readAsString(uint32_t len) { - std::string str; - (void)readAppendToString(str, len); - return str; - } - - uint32_t readAppendToString(std::string& str, uint32_t len); - - void readEnd() { - if (rPos_ == wPos_) { - resetBuffer(); - } - } - - void write(const uint8_t* buf, uint32_t len); - - uint32_t available() const { - return wPos_ - rPos_; - } - - const uint8_t* borrow(uint8_t* buf, uint32_t* len); - - void consume(uint32_t len); - - void swap(TMemoryBuffer& that) { - using std::swap; - swap(buffer_, that.buffer_); - swap(bufferSize_, that.bufferSize_); - swap(wPos_, that.wPos_); - swap(owner_, that.owner_); - } - - // Returns a pointer to where the client can write data to append to - // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a - // write of the provided length. The returned pointer is very convenient for - // passing to read(), recv(), or similar. You must call wroteBytes() as soon - // as data is written or the buffer will not be aware that data has changed. - uint8_t* getWritePtr(uint32_t len) { - ensureCanWrite(len); - return buffer_ + wPos_; - } - - // Informs the buffer that the client has written 'len' bytes into storage - // that had been provided by getWritePtr(). - void wroteBytes(uint32_t len); - - private: - // Data buffer - uint8_t* buffer_; - - // Allocated buffer size - uint32_t bufferSize_; - - // Where the write is at - uint32_t wPos_; - - // Where the reader is at - uint32_t rPos_; - - // Is this object the owner of the buffer? - bool owner_; - - // Don't forget to update constrctors, initCommon, and swap if - // you add new members. -}; - /** * TPipedTransport. This transport allows piping of a request from one * transport to another either when readEnd() or writeEnd(). The typical diff --git a/test/Benchmark.cpp b/test/Benchmark.cpp index 82ea77d9..af118eee 100644 --- a/test/Benchmark.cpp +++ b/test/Benchmark.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include "gen-cpp/DebugProtoTest_types.h" diff --git a/test/DenseProtoTest.cpp b/test/DenseProtoTest.cpp index c979e8f5..68b5df9c 100644 --- a/test/DenseProtoTest.cpp +++ b/test/DenseProtoTest.cpp @@ -21,7 +21,7 @@ g++ -Wall -g -I../lib/cpp/src -I/usr/local/include/boost-1_33_1 \ #include "gen-cpp/DebugProtoTest_types.h" #include "gen-cpp/OptionalRequiredTest_types.h" #include -#include +#include // Can't use memcmp here. GCC is too smart. diff --git a/test/JSONProtoTest.cpp b/test/JSONProtoTest.cpp index cc72b648..eb7f4162 100644 --- a/test/JSONProtoTest.cpp +++ b/test/JSONProtoTest.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include "gen-cpp/DebugProtoTest_types.h" diff --git a/test/Makefile.am b/test/Makefile.am index f2ff45f4..bf5a0d7d 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -40,7 +40,8 @@ TESTS = \ UnitTests_SOURCES = \ UnitTestMain.cpp \ - TMemoryBufferTest.cpp + TMemoryBufferTest.cpp \ + TBufferBaseTest.cpp UnitTests_LDADD = libtestgencpp.la diff --git a/test/OptionalRequiredTest.cpp b/test/OptionalRequiredTest.cpp index 1fbc1f8d..c3fbfd1c 100644 --- a/test/OptionalRequiredTest.cpp +++ b/test/OptionalRequiredTest.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include "gen-cpp/OptionalRequiredTest_types.h" using std::cout; diff --git a/test/TBufferBaseTest.cpp b/test/TBufferBaseTest.cpp new file mode 100644 index 00000000..acc3d4b4 --- /dev/null +++ b/test/TBufferBaseTest.cpp @@ -0,0 +1,589 @@ +#include +#include +#include +#include +#include + +using std::string; +using boost::shared_ptr; +using facebook::thrift::transport::TMemoryBuffer; +using facebook::thrift::transport::TBufferedTransport; +using facebook::thrift::transport::TFramedTransport; +using facebook::thrift::transport::test::TShortReadTransport; + +#define foreach BOOST_FOREACH + +// Shamelessly copied from ZlibTransport. TODO: refactor. +unsigned int dist[][5000] = { + { 1<<15 }, + + { + 5,13,9,1,8,9,11,13,18,48,24,13,21,13,5,11,35,2,4,20,17,72,27,14,15,4,7,26, + 12,1,14,9,2,16,29,41,7,24,4,27,14,4,1,4,25,3,6,34,10,8,50,2,14,13,55,29,3, + 43,53,49,14,4,10,32,27,48,1,3,1,11,5,17,16,51,17,30,15,11,9,2,2,11,52,12,2, + 13,94,1,19,1,38,2,8,43,8,33,7,30,8,17,22,2,15,14,12,34,2,12,6,37,29,74,3, + 165,16,11,17,5,14,3,10,7,37,11,24,7,1,3,12,37,8,9,34,17,12,8,21,13,37,1,4, + 30,14,78,4,15,2,40,37,17,12,36,82,14,4,1,4,7,17,11,16,88,77,2,3,15,3,34,11, + 5,79,22,34,8,4,4,40,22,24,28,9,13,3,34,27,9,16,39,16,39,13,2,4,3,41,26,10,4, + 33,4,7,12,5,6,3,10,30,8,21,16,58,19,9,0,47,7,13,11,19,15,7,53,57,2,13,28,22, + 3,16,9,25,33,12,40,7,12,64,7,14,24,44,9,2,14,11,2,58,1,26,30,11,9,5,24,7,9, + 94,2,10,21,5,5,4,5,6,179,9,18,2,7,13,31,41,17,4,36,3,21,6,26,8,15,18,44,27, + 11,9,25,7,0,14,2,12,20,23,13,2,163,9,5,15,65,2,14,6,8,98,11,15,14,34,2,3,10, + 22,9,92,7,10,32,67,13,3,4,35,8,2,1,5,0,26,381,7,27,8,2,16,93,4,19,5,8,25,9, + 31,14,4,21,5,3,9,22,56,4,18,3,11,18,6,4,3,40,12,16,110,8,35,14,1,18,40,9,12, + 14,3,11,7,57,13,18,116,53,19,22,7,16,11,5,8,21,16,1,75,21,20,1,28,2,6,1,7, + 19,38,5,6,9,9,4,1,7,55,36,62,5,4,4,24,15,1,12,35,48,20,5,17,1,5,26,15,4,54, + 13,5,5,15,5,19,32,29,31,7,6,40,7,80,11,18,8,128,48,6,12,84,13,4,7,2,13,9,16, + 17,3,254,1,4,181,8,44,7,6,24,27,9,23,14,34,16,22,25,10,3,3,4,4,12,2,12,6,7, + 13,58,13,6,11,19,53,11,66,18,19,10,4,13,2,5,49,58,1,67,7,21,64,14,11,14,8,3, + 26,33,91,31,20,7,9,42,39,4,3,55,11,10,0,7,4,75,8,12,0,27,3,8,9,0,12,12,23, + 28,23,20,4,13,30,2,22,20,19,30,6,22,2,6,4,24,7,19,55,86,5,33,2,161,6,7,1,62, + 13,3,72,12,12,9,7,12,10,5,10,29,1,5,22,13,13,5,2,12,3,7,14,18,2,3,46,21,17, + 15,19,3,27,5,16,45,31,10,8,17,18,18,3,7,24,6,55,9,3,6,12,10,12,8,91,9,4,4,4, + 27,29,16,5,7,22,43,28,11,14,8,11,28,109,55,71,40,3,8,22,26,15,44,3,25,29,5, + 3,32,17,12,3,29,27,25,15,11,8,40,39,38,17,3,9,11,2,32,11,6,20,48,75,27,3,7, + 54,12,95,12,7,24,23,2,13,8,15,16,5,12,4,17,7,19,88,2,6,13,115,45,12,21,2,86, + 74,9,7,5,16,32,16,2,21,18,6,34,5,18,260,7,12,16,44,19,92,31,7,8,2,9,0,0,15, + 8,38,4,8,20,18,2,83,3,3,4,9,5,3,10,3,5,29,15,7,11,8,48,17,23,2,17,4,11,22, + 21,64,8,8,4,19,95,0,17,28,9,11,20,71,5,11,18,12,13,45,49,4,1,33,32,23,13,5, + 52,2,2,16,3,4,7,12,2,1,12,6,24,1,22,155,21,3,45,4,12,44,26,5,40,36,9,9,8,20, + 35,31,3,2,32,50,10,8,37,2,75,35,22,15,192,8,11,23,1,4,29,6,8,8,5,12,18,32,4, + 7,12,2,0,0,9,5,48,11,35,3,1,123,6,29,8,11,8,23,51,16,6,63,12,2,5,4,14,2,15, + 7,14,3,2,7,17,32,8,8,10,1,23,62,2,49,6,49,47,23,3,20,7,11,39,10,24,6,15,5,5, + 11,8,16,36,8,13,20,3,10,44,7,52,7,10,36,6,15,10,5,11,4,14,19,17,10,12,3,6, + 23,4,13,94,70,7,36,7,38,7,28,8,4,15,3,19,4,33,39,21,109,4,80,6,40,4,432,4,4, + 7,8,3,31,8,28,37,34,10,2,21,5,22,0,7,36,14,12,6,24,1,21,5,9,2,29,20,54,113, + 13,31,39,27,6,0,27,4,5,2,43,7,8,57,8,62,7,9,12,22,90,30,6,19,7,10,20,6,5,58, + 32,30,41,4,10,25,13,3,8,7,10,2,9,6,151,44,16,12,16,20,8,3,18,11,17,4,10,45, + 15,8,56,38,52,25,40,14,4,17,15,8,2,19,7,8,26,30,2,3,180,8,26,17,38,35,5,16, + 28,5,15,56,13,14,18,9,15,83,27,3,9,4,11,8,27,27,44,10,12,8,3,48,14,7,9,4,4, + 8,4,5,9,122,8,14,12,19,17,21,4,29,63,21,17,10,12,18,47,10,10,53,4,18,16,4,8, + 118,9,5,12,9,11,9,3,12,32,3,23,2,15,3,3,30,3,17,235,15,22,9,299,14,17,1,5, + 16,8,3,7,3,13,2,7,6,4,8,66,2,13,6,15,16,47,3,36,5,7,10,24,1,9,9,8,13,16,26, + 12,7,24,21,18,49,23,39,10,41,4,13,4,27,11,12,12,19,4,147,8,10,9,40,21,2,83, + 10,5,6,11,25,9,50,57,40,12,12,21,1,3,24,23,9,3,9,13,2,3,12,57,8,11,13,15,26, + 15,10,47,36,4,25,1,5,8,5,4,0,12,49,5,19,4,6,16,14,6,10,69,10,33,29,7,8,61, + 12,4,0,3,7,6,3,16,29,27,38,4,21,0,24,3,2,1,19,16,22,2,8,138,11,7,7,3,12,22, + 3,16,5,7,3,53,9,10,32,14,5,7,3,6,22,9,59,26,8,7,58,5,16,11,55,7,4,11,146,91, + 8,13,18,14,6,8,8,31,26,22,6,11,30,11,30,15,18,31,3,48,17,7,6,4,9,2,25,3,35, + 13,13,7,8,4,31,10,8,10,4,3,45,10,23,2,7,259,17,21,13,14,3,26,3,8,27,4,18,9, + 66,7,12,5,8,17,4,23,55,41,51,2,32,26,66,4,21,14,12,65,16,22,17,5,14,2,29,24, + 7,3,36,2,43,53,86,5,28,4,58,13,49,121,6,2,73,2,1,47,4,2,27,10,35,28,27,10, + 17,10,56,7,10,14,28,20,24,40,7,4,7,3,10,11,32,6,6,3,15,11,54,573,2,3,6,2,3, + 14,64,4,16,12,16,42,10,26,4,6,11,69,18,27,2,2,17,22,9,13,22,11,6,1,15,49,3, + 14,1 + }, + + { + 11,11,11,15,47,1,3,1,23,5,8,18,3,23,15,21,1,7,19,10,26,1,17,11,31,21,41,18, + 34,4,9,58,19,3,3,36,5,18,13,3,14,4,9,10,4,19,56,15,3,5,3,11,27,9,4,10,13,4, + 11,6,9,2,18,3,10,19,11,4,53,4,2,2,3,4,58,16,3,0,5,30,2,11,93,10,2,14,10,6,2, + 115,2,25,16,22,38,101,4,18,13,2,145,51,45,15,14,15,13,20,7,24,5,13,14,30,40, + 10,4,107,12,24,14,39,12,6,13,20,7,7,11,5,18,18,45,22,6,39,3,2,1,51,9,11,4, + 13,9,38,44,8,11,9,15,19,9,23,17,17,17,13,9,9,1,10,4,18,6,2,9,5,27,32,72,8, + 37,9,4,10,30,17,20,15,17,66,10,4,73,35,37,6,4,16,117,45,13,4,75,5,24,65,10, + 4,9,4,13,46,5,26,29,10,4,4,52,3,13,18,63,6,14,9,24,277,9,88,2,48,27,123,14, + 61,7,5,10,8,7,90,3,10,3,3,48,17,13,10,18,33,2,19,36,6,21,1,16,12,5,6,2,16, + 15,29,88,28,2,15,6,11,4,6,11,3,3,4,18,9,53,5,4,3,33,8,9,8,6,7,36,9,62,14,2, + 1,10,1,16,7,32,7,23,20,11,10,23,2,1,0,9,16,40,2,81,5,22,8,5,4,37,51,37,10, + 19,57,11,2,92,31,6,39,10,13,16,8,20,6,9,3,10,18,25,23,12,30,6,2,26,7,64,18, + 6,30,12,13,27,7,10,5,3,33,24,99,4,23,4,1,27,7,27,49,8,20,16,3,4,13,9,22,67, + 28,3,10,16,3,2,10,4,8,1,8,19,3,85,6,21,1,9,16,2,30,10,33,12,4,9,3,1,60,38,6, + 24,32,3,14,3,40,8,34,115,5,9,27,5,96,3,40,6,15,5,8,22,112,5,5,25,17,58,2,7, + 36,21,52,1,3,95,12,21,4,11,8,59,24,5,21,4,9,15,8,7,21,3,26,5,11,6,7,17,65, + 14,11,10,2,17,5,12,22,4,4,2,21,8,112,3,34,63,35,2,25,1,2,15,65,23,0,3,5,15, + 26,27,9,5,48,11,15,4,9,5,33,20,15,1,18,19,11,24,40,10,21,74,6,6,32,30,40,5, + 4,7,44,10,25,46,16,12,5,40,7,18,5,18,9,12,8,4,25,5,6,36,4,43,8,9,12,35,17,4, + 8,9,11,27,5,10,17,40,8,12,4,18,9,18,12,20,25,39,42,1,24,13,22,15,7,112,35,3, + 7,17,33,2,5,5,19,8,4,12,24,14,13,2,1,13,6,5,19,11,7,57,0,19,6,117,48,14,8, + 10,51,17,12,14,2,5,8,9,15,4,48,53,13,22,4,25,12,11,19,45,5,2,6,54,22,9,15,9, + 13,2,7,11,29,82,16,46,4,26,14,26,40,22,4,26,6,18,13,4,4,20,3,3,7,12,17,8,9, + 23,6,20,7,25,23,19,5,15,6,23,15,11,19,11,3,17,59,8,18,41,4,54,23,44,75,13, + 20,6,11,2,3,1,13,10,3,7,12,3,4,7,8,30,6,6,7,3,32,9,5,28,6,114,42,13,36,27, + 59,6,93,13,74,8,69,140,3,1,17,48,105,6,11,5,15,1,10,10,14,8,53,0,8,24,60,2, + 6,35,2,12,32,47,16,17,75,2,5,4,37,28,10,5,9,57,4,59,5,12,13,7,90,5,11,5,24, + 22,13,30,1,2,10,9,6,19,3,18,47,2,5,7,9,35,15,3,6,1,21,14,14,18,14,9,12,8,73, + 6,19,3,32,9,14,17,17,5,55,23,6,16,28,3,11,48,4,6,6,6,12,16,30,10,30,27,51, + 18,29,2,3,15,1,76,0,16,33,4,27,3,62,4,10,2,4,8,15,9,41,26,22,2,4,20,4,49,0, + 8,1,57,13,12,39,3,63,10,19,34,35,2,7,8,29,72,4,10,0,77,8,6,7,9,15,21,9,4,1, + 20,23,1,9,18,9,15,36,4,7,6,15,5,7,7,40,2,9,22,2,3,20,4,12,34,13,6,18,15,1, + 38,20,12,7,16,3,19,85,12,16,18,16,2,17,1,13,8,6,12,15,97,17,12,9,3,21,15,12, + 23,44,81,26,30,2,5,17,6,6,0,22,42,19,6,19,41,14,36,7,3,56,7,9,3,2,6,9,69,3, + 15,4,30,28,29,7,9,15,17,17,6,1,6,153,9,33,5,12,14,16,28,3,8,7,14,12,4,6,36, + 9,24,13,13,4,2,9,15,19,9,53,7,13,4,150,17,9,2,6,12,7,3,5,58,19,58,28,8,14,3, + 20,3,0,32,56,7,5,4,27,1,68,4,29,13,5,58,2,9,65,41,27,16,15,12,14,2,10,9,24, + 3,2,9,2,2,3,14,32,10,22,3,13,11,4,6,39,17,0,10,5,5,10,35,16,19,14,1,8,63,19, + 14,8,56,10,2,12,6,12,6,7,16,2,9,9,12,20,73,25,13,21,17,24,5,32,8,12,25,8,14, + 16,5,23,3,7,6,3,11,24,6,30,4,21,13,28,4,6,29,15,5,17,6,26,8,15,8,3,7,7,50, + 11,30,6,2,28,56,16,24,25,23,24,89,31,31,12,7,22,4,10,17,3,3,8,11,13,5,3,27, + 1,12,1,14,8,10,29,2,5,2,2,20,10,0,31,10,21,1,48,3,5,43,4,5,18,13,5,18,25,34, + 18,3,5,22,16,3,4,20,3,9,3,25,6,6,44,21,3,12,7,5,42,3,2,14,4,36,5,3,45,51,15, + 9,11,28,9,7,6,6,12,26,5,14,10,11,42,55,13,21,4,28,6,7,23,27,11,1,41,36,0,32, + 15,26,2,3,23,32,11,2,15,7,29,26,144,33,20,12,7,21,10,7,11,65,46,10,13,20,32, + 4,4,5,19,2,19,15,49,41,1,75,10,11,25,1,2,45,11,8,27,18,10,60,28,29,12,30,19, + 16,4,24,11,19,27,17,49,18,7,40,13,19,22,8,55,12,11,3,6,5,11,8,10,22,5,9,9, + 25,7,17,7,64,1,24,2,12,17,44,4,12,27,21,11,10,7,47,5,9,13,12,38,27,21,7,29, + 7,1,17,3,3,5,48,62,10,3,11,17,15,15,6,3,8,10,8,18,19,13,3,9,7,6,44,9,10,4, + 43,8,6,6,14,20,38,24,2,4,5,5,7,5,9,39,8,44,40,9,19,7,3,15,25,2,37,18,15,9,5, + 8,32,10,5,18,4,7,46,20,17,23,4,11,16,18,31,11,3,11,1,14,1,25,4,27,13,13,39, + 14,6,6,35,6,16,13,11,122,21,15,20,24,10,5,152,15,39,5,20,16,9,14,7,53,6,3,8, + 19,63,32,6,2,3,20,1,19,5,13,42,15,4,6,68,31,46,11,38,10,24,5,5,8,9,12,3,35, + 46,26,16,2,8,4,74,16,44,4,5,1,16,4,14,23,16,69,15,42,31,14,7,7,6,97,14,40,1, + 8,7,34,9,39,19,13,15,10,21,18,10,5,15,38,7,5,12,7,20,15,4,11,6,14,5,17,7,39, + 35,36,18,20,26,22,4,2,36,21,64,0,5,9,10,6,4,1,7,3,1,3,3,4,10,20,90,2,22,48, + 16,23,2,33,40,1,21,21,17,20,8,8,12,4,83,14,48,4,21,3,9,27,5,11,40,15,9,3,16, + 17,9,11,4,24,31,17,3,4,2,11,1,8,4,8,6,41,17,4,13,3,7,17,8,27,5,13,6,10,7,13, + 12,18,13,60,18,3,8,1,12,125,2,7,16,2,11,2,4,7,26,5,9,14,14,16,8,14,7,14,6,9, + 13,9,6,4,26,35,49,36,55,3,9,6,40,26,23,31,19,41,2,10,31,6,54,5,69,16,7,8,16, + 1,5,7,4,22,7,7,5,4,48,11,13,3,98,4,11,19,4,2,14,7,34,7,10,3,2,12,7,6,2,5,118 + }, +}; + +uint8_t data[1<<15]; +string data_str; +void init_data() { + static bool initted = false; + if (initted) return; + initted = true; + + // Repeatability. Kind of. + std::srand(42); + for (int i = 0; i < (int)(sizeof(data)/sizeof(data[0])); ++i) { + data[i] = (uint8_t)rand(); + } + + data_str.assign((char*)data, sizeof(data)); +} + + +BOOST_AUTO_TEST_SUITE( TBufferBaseTest ) + +BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_GetBuffer ) { + init_data(); + + for (int d1 = 0; d1 < 3; d1++) { + TMemoryBuffer buffer(16); + int offset = 0; + int index = 0; + + while (offset < 1<<15) { + buffer.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + string output = buffer.getBufferAsString(); + BOOST_CHECK_EQUAL(data_str, output); + } +} + +BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read ) { + init_data(); + + for (int d1 = 0; d1 < 3; d1++) { + for (int d2 = 0; d2 < 3; d2++) { + TMemoryBuffer buffer(16); + uint8_t data_out[1<<15]; + int offset; + int index; + + offset = 0; + index = 0; + while (offset < 1<<15) { + buffer.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + offset = 0; + index = 0; + while (offset < 1<<15) { + unsigned int got = buffer.read(&data_out[offset], dist[d2][index]); + BOOST_CHECK_EQUAL(got, dist[d2][index]); + offset += dist[d2][index]; + index++; + } + + BOOST_CHECK(!memcmp(data, data_out, sizeof(data))); + } + } +} + +BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_ReadString ) { + init_data(); + + for (int d1 = 0; d1 < 3; d1++) { + for (int d2 = 0; d2 < 3; d2++) { + TMemoryBuffer buffer(16); + string output; + int offset; + int index; + + offset = 0; + index = 0; + while (offset < 1<<15) { + buffer.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + offset = 0; + index = 0; + while (offset < 1<<15) { + unsigned int got = buffer.readAppendToString(output, dist[d2][index]); + BOOST_CHECK_EQUAL(got, dist[d2][index]); + offset += dist[d2][index]; + index++; + } + + BOOST_CHECK_EQUAL(output, data_str); + } + } +} + +BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read_Multi1 ) { + init_data(); + + // Do shorter writes and reads so we don't align to power-of-two boundaries. + + for (int d1 = 0; d1 < 3; d1++) { + for (int d2 = 0; d2 < 3; d2++) { + TMemoryBuffer buffer(16); + uint8_t data_out[1<<15]; + int offset; + int index; + + for (int iter = 0; iter < 6; iter++) { + offset = 0; + index = 0; + while (offset < (1<<15)-42) { + buffer.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + offset = 0; + index = 0; + while (offset < (1<<15)-42) { + buffer.read(&data_out[offset], dist[d2][index]); + offset += dist[d2][index]; + index++; + } + + BOOST_CHECK(!memcmp(data, data_out, (1<<15)-42)); + + // Pull out the extra data. + buffer.read(data_out, 42); + } + } + } +} + +BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read_Multi2 ) { + init_data(); + + // Do shorter writes and reads so we don't align to power-of-two boundaries. + // Pull the buffer out of the loop so its state gets worked harder. + TMemoryBuffer buffer(16); + + for (int d1 = 0; d1 < 3; d1++) { + for (int d2 = 0; d2 < 3; d2++) { + uint8_t data_out[1<<15]; + int offset; + int index; + + for (int iter = 0; iter < 6; iter++) { + offset = 0; + index = 0; + while (offset < (1<<15)-42) { + buffer.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + offset = 0; + index = 0; + while (offset < (1<<15)-42) { + buffer.read(&data_out[offset], dist[d2][index]); + offset += dist[d2][index]; + index++; + } + + BOOST_CHECK(!memcmp(data, data_out, (1<<15)-42)); + + // Pull out the extra data. + buffer.read(data_out, 42); + } + } + } +} + +BOOST_AUTO_TEST_CASE( test_MemoryBuffer_Write_Read_Incomplete ) { + init_data(); + + // Do shorter writes and reads so we don't align to power-of-two boundaries. + // Pull the buffer out of the loop so its state gets worked harder. + + for (int d1 = 0; d1 < 3; d1++) { + for (int d2 = 0; d2 < 3; d2++) { + TMemoryBuffer buffer(16); + uint8_t data_out[1<<13]; + + int write_offset = 0; + int write_index = 0; + unsigned int to_write = (1<<14)-42; + while (to_write > 0) { + int write_amt = std::min(dist[d1][write_index], to_write); + buffer.write(&data[write_offset], write_amt); + write_offset += write_amt; + write_index++; + to_write -= write_amt; + } + + int read_offset = 0; + int read_index = 0; + unsigned int to_read = (1<<13)-42; + while (to_read > 0) { + int read_amt = std::min(dist[d2][read_index], to_read); + int got = buffer.read(&data_out[read_offset], read_amt); + BOOST_CHECK_EQUAL(got, read_amt); + read_offset += read_amt; + read_index++; + to_read -= read_amt; + } + + BOOST_CHECK(!memcmp(data, data_out, (1<<13)-42)); + + int second_offset = write_offset; + int second_index = write_index-1; + unsigned int to_second = (1<<14)+42; + while (to_second > 0) { + int second_amt = std::min(dist[d1][second_index], to_second); + //printf("%d\n", second_amt); + buffer.write(&data[second_offset], second_amt); + second_offset += second_amt; + second_index++; + to_second -= second_amt; + } + + string output = buffer.getBufferAsString(); + BOOST_CHECK_EQUAL(data_str.substr((1<<13)-42), output); + } + } +} + +BOOST_AUTO_TEST_CASE( test_BufferedTransport_Write ) { + init_data(); + + int sizes[] = { + 12, 15, 16, 17, 20, + 501, 512, 523, + 2000, 2048, 2096, + 1<<14, 1<<17, + }; + + foreach (int size, sizes) { + for (int d1 = 0; d1 < 3; d1++) { + shared_ptr buffer(new TMemoryBuffer(16)); + TBufferedTransport trans(buffer, size); + + int offset = 0; + int index = 0; + while (offset < 1<<15) { + trans.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + trans.flush(); + + string output = buffer->getBufferAsString(); + BOOST_CHECK_EQUAL(data_str, output); + } + } +} + +BOOST_AUTO_TEST_CASE( test_BufferedTransport_Read_Full ) { + init_data(); + + int sizes[] = { + 12, 15, 16, 17, 20, + 501, 512, 523, + 2000, 2048, 2096, + 1<<14, 1<<17, + }; + + foreach (int size, sizes) { + for (int d1 = 0; d1 < 3; d1++) { + shared_ptr buffer(new TMemoryBuffer(data, sizeof(data))); + TBufferedTransport trans(buffer, size); + uint8_t data_out[1<<15]; + + int offset = 0; + int index = 0; + while (offset < 1<<15) { + // Note: this doesn't work with "read" because TBufferedTransport + // doesn't try loop over reads, so we get short reads. We don't + // check the return value, so that messes us up. + trans.readAll(&data_out[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + BOOST_CHECK(!memcmp(data, data_out, sizeof(data))); + } + } +} + +BOOST_AUTO_TEST_CASE( test_BufferedTransport_Read_Short ) { + init_data(); + + int sizes[] = { + 12, 15, 16, 17, 20, + 501, 512, 523, + 2000, 2048, 2096, + 1<<14, 1<<17, + }; + + foreach (int size, sizes) { + for (int d1 = 0; d1 < 3; d1++) { + shared_ptr buffer(new TMemoryBuffer(data, sizeof(data))); + shared_ptr tshort(new TShortReadTransport(buffer, 0.125)); + TBufferedTransport trans(buffer, size); + uint8_t data_out[1<<15]; + + int offset = 0; + int index = 0; + while (offset < 1<<15) { + // Note: this doesn't work with "read" because TBufferedTransport + // doesn't try loop over reads, so we get short reads. We don't + // check the return value, so that messes us up. + trans.readAll(&data_out[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + BOOST_CHECK(!memcmp(data, data_out, sizeof(data))); + } + } +} + +BOOST_AUTO_TEST_CASE( test_FramedTransport_Write ) { + init_data(); + + int sizes[] = { + 12, 15, 16, 17, 20, + 501, 512, 523, + 2000, 2048, 2096, + 1<<14, 1<<17, + }; + + foreach (int size, sizes) { + for (int d1 = 0; d1 < 3; d1++) { + shared_ptr buffer(new TMemoryBuffer(16)); + TFramedTransport trans(buffer, size); + + int offset = 0; + int index = 0; + while (offset < 1<<15) { + trans.write(&data[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + trans.flush(); + + int32_t frame_size = -1; + buffer->read(reinterpret_cast(&frame_size), sizeof(frame_size)); + frame_size = (int32_t)ntohl((uint32_t)frame_size); + BOOST_CHECK_EQUAL(frame_size, 1<<15); + BOOST_CHECK_EQUAL(data_str.size(), (unsigned int)frame_size); + string output = buffer->getBufferAsString(); + BOOST_CHECK_EQUAL(data_str, output); + } + } +} + +BOOST_AUTO_TEST_CASE( test_FramedTransport_Read ) { + init_data(); + + for (int d1 = 0; d1 < 3; d1++) { + uint8_t data_out[1<<15]; + shared_ptr buffer(new TMemoryBuffer()); + TFramedTransport trans(buffer); + int32_t length = sizeof(data); + length = (int32_t)htonl((uint32_t)length); + buffer->write(reinterpret_cast(&length), sizeof(length)); + buffer->write(data, sizeof(data)); + + int offset = 0; + int index = 0; + while (offset < 1<<15) { + // This should work with read because we have one huge frame. + trans.read(&data_out[offset], dist[d1][index]); + offset += dist[d1][index]; + index++; + } + + BOOST_CHECK(!memcmp(data, data_out, sizeof(data))); + } +} + +BOOST_AUTO_TEST_CASE( test_FramedTransport_Write_Read ) { + init_data(); + + int sizes[] = { + 12, 15, 16, 17, 20, + 501, 512, 523, + 2000, 2048, 2096, + 1<<14, 1<<17, + }; + + int probs[] = { 1, 2, 4, 8, 16, 32, }; + + foreach (int size, sizes) { + foreach (int prob, probs) { + for (int d1 = 0; d1 < 3; d1++) { + shared_ptr buffer(new TMemoryBuffer(16)); + TFramedTransport trans(buffer, size); + uint8_t data_out[1<<15]; + std::vector flush_sizes; + + int write_offset = 0; + int write_index = 0; + int flush_size = 0; + while (write_offset < 1<<15) { + trans.write(&data[write_offset], dist[d1][write_index]); + write_offset += dist[d1][write_index]; + flush_size += dist[d1][write_index]; + write_index++; + if (rand()%prob == 0) { + flush_sizes.push_back(flush_size); + flush_size = 0; + trans.flush(); + } + } + if (flush_size != 0) { + flush_sizes.push_back(flush_size); + flush_size = 0; + trans.flush(); + } + + int read_offset = 0; + int read_index = 0; + foreach (int fsize, flush_sizes) { + // We are exploiting an implementation detail of TFramedTransport. + // The read buffer starts empty and it will never do more than one + // readFrame per read, so we should always get exactly one frame. + int got = trans.read(&data_out[read_offset], 1<<15); + BOOST_CHECK_EQUAL(got, fsize); + read_offset += got; + read_index++; + } + + BOOST_CHECK_EQUAL((unsigned int)read_offset, sizeof(data)); + BOOST_CHECK(!memcmp(data, data_out, sizeof(data))); + } + } + } +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/test/TMemoryBufferTest.cpp b/test/TMemoryBufferTest.cpp index 970ec470..4614196d 100644 --- a/test/TMemoryBufferTest.cpp +++ b/test/TMemoryBufferTest.cpp @@ -2,7 +2,7 @@ #include #include #include -#include +#include #include #include "gen-cpp/ThriftTest_types.h" diff --git a/test/TPipedTransportTest.cpp b/test/TPipedTransportTest.cpp index c1b371da..06950ea4 100644 --- a/test/TPipedTransportTest.cpp +++ b/test/TPipedTransportTest.cpp @@ -2,6 +2,7 @@ #include #include #include +#include using namespace std; using boost::shared_ptr; using facebook::thrift::transport::TTransportException; diff --git a/test/ZlibTest.cpp b/test/ZlibTest.cpp index 1782c586..08ec1742 100644 --- a/test/ZlibTest.cpp +++ b/test/ZlibTest.cpp @@ -11,7 +11,7 @@ g++ -Wall -g -I../lib/cpp/src -I/usr/local/include/boost-1_33_1 \ #include #include #include -#include +#include #include -- 2.17.1