From a0e11597163def6727896a77490899681c1eb6d6 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:10:27 +0000 Subject: [PATCH] THRIFT-926. cpp: remove "standalone" distinction in TZlibTransport Now that TZlibTransport::flush() behaves the same way as other transports, there is no need to distinguish between RPC and standalone behavior for TZlibTransport. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005152 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/transport/TZlibTransport.cpp | 116 +++++++++++++++-------- lib/cpp/src/transport/TZlibTransport.h | 44 ++++++--- lib/cpp/test/TransportTest.cpp | 4 +- lib/cpp/test/ZlibTest.cpp | 16 ++-- 4 files changed, 117 insertions(+), 63 deletions(-) diff --git a/lib/cpp/src/transport/TZlibTransport.cpp b/lib/cpp/src/transport/TZlibTransport.cpp index 89537425..ae28bb6d 100644 --- a/lib/cpp/src/transport/TZlibTransport.cpp +++ b/lib/cpp/src/transport/TZlibTransport.cpp @@ -161,31 +161,41 @@ uint32_t TZlibTransport::read(uint8_t* buf, uint32_t len) { rstream_->avail_out = urbuf_size_; urpos_ = 0; - // If we don't have any more compressed data available, - // read some from the underlying transport. - if (rstream_->avail_in == 0) { - uint32_t got = transport_->read(crbuf_, crbuf_size_); - if (got == 0) { - return len - need; - } - rstream_->next_in = crbuf_; - rstream_->avail_in = got; + // Call inflate() to uncompress some more data + if (!readFromZlib()) { + // no data available from underlying transport + return len - need; } - // We have some compressed data now. Uncompress it. - int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH); + // Okay. The read buffer should have whatever we can give it now. + // Loop back to the start and try to give some more. + } +} - if (zlib_rv == Z_STREAM_END) { - if (standalone_) { - input_ended_ = true; - } - } else { - checkZlibRv(zlib_rv, rstream_->msg); +bool TZlibTransport::readFromZlib() { + assert(!input_ended_); + + // If we don't have any more compressed data available, + // read some from the underlying transport. + if (rstream_->avail_in == 0) { + uint32_t got = transport_->read(crbuf_, crbuf_size_); + if (got == 0) { + return false; } + rstream_->next_in = crbuf_; + rstream_->avail_in = got; + } - // Okay. The read buffer should have whatever we can give it now. - // Loop back to the start and try to give some more. + // We have some compressed data now. Uncompress it. + int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH); + + if (zlib_rv == Z_STREAM_END) { + input_ended_ = true; + } else { + checkZlibRv(zlib_rv, rstream_->msg); } + + return true; } @@ -315,30 +325,60 @@ void TZlibTransport::consume(uint32_t len) { } void TZlibTransport::verifyChecksum() { - if (!standalone_) { + // If zlib has already reported the end of the stream, + // it has verified the checksum. + if (input_ended_) { + return; + } + + // This should only be called when reading is complete. + // If the caller still has unread data, throw an exception. + if (readAvail() > 0) { throw TTransportException( - TTransportException::BAD_ARGS, - "TZLibTransport can only verify checksums for standalone objects."); + TTransportException::CORRUPTED_DATA, + "verifyChecksum() called before end of zlib stream"); } - if (!input_ended_) { - // This should only be called when reading is complete, - // but it's possible that the whole checksum has not been fed to zlib yet. - // We try to read an extra byte here to force zlib to finish the stream. - // It might not always be easy to "unread" this byte, - // but we throw an exception if we get it, which is not really - // a recoverable error, so it doesn't matter. - uint8_t buf[1]; - uint32_t got = this->read(buf, sizeof(buf)); - if (got || !input_ended_) { - throw TTransportException( - TTransportException::CORRUPTED_DATA, - "Zlib stream not complete."); - } + // Reset the rstream fields, in case avail_out is 0. + // (Since readAvail() is 0, we know there is no unread data in urbuf_) + rstream_->next_out = urbuf_; + rstream_->avail_out = urbuf_size_; + urpos_ = 0; + + // Call inflate() + // This will throw an exception if the checksum is bad. + bool performed_inflate = readFromZlib(); + if (!performed_inflate) { + // We needed to read from the underlying transport, and the read() call + // returned 0. + // + // Not all TTransport implementations behave the same way here, so we'll + // end up with different behavior depending on the underlying transport. + // + // For some transports (e.g., TFDTransport), read() blocks if no more data + // is available. They only return 0 if EOF has been reached, or if the + // remote endpoint has closed the connection. For those transports, + // verifyChecksum() will block until the checksum becomes available. + // + // Other transport types (e.g., TMemoryBuffer) always return 0 immediately + // if no more data is available. For those transport types, verifyChecksum + // will raise the following exception if the checksum is not available from + // the underlying transport yet. + throw TTransportException(TTransportException::CORRUPTED_DATA, + "checksum not available yet in " + "verifyChecksum()"); + } + + // If input_ended_ is true now, the checksum has been verified + if (input_ended_) { + return; } - // If the checksum had been bad, we would have gotten an error while - // inflating. + // The caller invoked us before the actual end of the data stream + assert(rstream_->avail_out < urbuf_size_); + throw TTransportException(TTransportException::CORRUPTED_DATA, + "verifyChecksum() called before end of " + "zlib stream"); } diff --git a/lib/cpp/src/transport/TZlibTransport.h b/lib/cpp/src/transport/TZlibTransport.h index 0f9815ec..45afe799 100644 --- a/lib/cpp/src/transport/TZlibTransport.h +++ b/lib/cpp/src/transport/TZlibTransport.h @@ -76,8 +76,6 @@ class TZlibTransport : public TVirtualTransport { /** * @param transport The transport to read compressed data from * and write compressed data to. - * @param use_for_rpc True if this object will be used for RPC, - * false if this is a standalone object. * @param urbuf_size Uncompressed buffer size for reading. * @param crbuf_size Compressed buffer size for reading. * @param uwbuf_size Uncompressed buffer size for writing. @@ -86,13 +84,11 @@ class TZlibTransport : public TVirtualTransport { * TODO(dreiss): Write a constructor that isn't a pain. */ TZlibTransport(boost::shared_ptr transport, - bool use_for_rpc, int urbuf_size = DEFAULT_URBUF_SIZE, int crbuf_size = DEFAULT_CRBUF_SIZE, int uwbuf_size = DEFAULT_UWBUF_SIZE, int cwbuf_size = DEFAULT_CWBUF_SIZE) : transport_(transport), - standalone_(!use_for_rpc), urpos_(0), uwpos_(0), input_ended_(false), @@ -108,13 +104,6 @@ class TZlibTransport : public TVirtualTransport { rstream_(NULL), wstream_(NULL) { - - if (!standalone_) { - throw TTransportException( - TTransportException::BAD_ARGS, - "TZLibTransport has not been tested for RPC."); - } - if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) { // Have to copy this into a local because of a linking issue. int minimum = MIN_DIRECT_DEFLATE_SIZE; @@ -206,22 +195,47 @@ class TZlibTransport : public TVirtualTransport { inline int readAvail(); void flushToTransport(int flush); void flushToZlib(const uint8_t* buf, int len, int flush); + bool readFromZlib(); + + private: + // Deprecated constructor signature. + // + // This used to be the constructor signature. If you are getting a compile + // error because you are trying to use this constructor, you need to update + // your code as follows: + // - Remove the use_for_rpc argument in the constructur. + // There is no longer any distinction between RPC and standalone zlib + // transports. (Previously, only standalone was allowed, anyway.) + // - Replace TZlibTransport::flush() calls with TZlibTransport::finish() + // in your code. Previously, flush() used to finish the zlib stream. + // Now flush() only flushes out pending data, so more writes can be + // performed after a flush(). The finish() method can be used to finalize + // the zlib stream. + // + // If we don't declare this constructor, old code written as + // TZlibTransport(trans, false) still compiles but behaves incorrectly. + // The second bool argument is converted to an integer and used as the + // urbuf_size. + TZlibTransport(boost::shared_ptr transport, + bool use_for_rpc, + int urbuf_size = DEFAULT_URBUF_SIZE, + int crbuf_size = DEFAULT_CRBUF_SIZE, + int uwbuf_size = DEFAULT_UWBUF_SIZE, + int cwbuf_size = DEFAULT_CWBUF_SIZE); + protected: // Writes smaller than this are buffered up. // Larger (or equal) writes are dumped straight to zlib. static const int MIN_DIRECT_DEFLATE_SIZE = 32; boost::shared_ptr transport_; - bool standalone_; int urpos_; int uwpos_; - /// True iff zlib has reached the end of a stream. - /// This is only ever true in standalone protcol objects. + /// True iff zlib has reached the end of the input stream. bool input_ended_; /// True iff we have finished the output stream. - /// This is only ever true in standalone protcol objects. bool output_finished_; int urbuf_size_; diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp index 7f95e383..a932643c 100644 --- a/lib/cpp/test/TransportTest.cpp +++ b/lib/cpp/test/TransportTest.cpp @@ -183,8 +183,8 @@ class CoupledZlibTransports : public CoupledTransports { public: CoupledZlibTransports() : buf(new TMemoryBuffer) { - in = new TZlibTransport(buf, false); - out = new TZlibTransport(buf, false); + in = new TZlibTransport(buf); + out = new TZlibTransport(buf); } ~CoupledZlibTransports() { diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp index e2403d7e..1e9f1872 100644 --- a/lib/cpp/test/ZlibTest.cpp +++ b/lib/cpp/test/ZlibTest.cpp @@ -143,7 +143,7 @@ uint8_t* gen_random_buffer(uint32_t buf_len) { void test_write_then_read(const uint8_t* buf, uint32_t buf_len) { shared_ptr membuf(new TMemoryBuffer()); - shared_ptr zlib_trans(new TZlibTransport(membuf, false)); + shared_ptr zlib_trans(new TZlibTransport(membuf)); zlib_trans->write(buf, buf_len); zlib_trans->finish(); @@ -162,12 +162,12 @@ void test_separate_checksum(const uint8_t* buf, uint32_t buf_len) { // the stream was not complete. I'm about to go fix that. // It worked. Awesome. shared_ptr membuf(new TMemoryBuffer()); - shared_ptr zlib_trans(new TZlibTransport(membuf, false)); + shared_ptr zlib_trans(new TZlibTransport(membuf)); zlib_trans->write(buf, buf_len); zlib_trans->finish(); string tmp_buf; membuf->appendBufferToString(tmp_buf); - zlib_trans.reset(new TZlibTransport(membuf, false, + zlib_trans.reset(new TZlibTransport(membuf, TZlibTransport::DEFAULT_URBUF_SIZE, tmp_buf.length()-1)); @@ -182,7 +182,7 @@ void test_incomplete_checksum(const uint8_t* buf, uint32_t buf_len) { // Make sure we still get that "not complete" error if // it really isn't complete. shared_ptr membuf(new TMemoryBuffer()); - shared_ptr zlib_trans(new TZlibTransport(membuf, false)); + shared_ptr zlib_trans(new TZlibTransport(membuf)); zlib_trans->write(buf, buf_len); zlib_trans->finish(); string tmp_buf; @@ -209,7 +209,7 @@ void test_read_write_mix(const uint8_t* buf, uint32_t buf_len, const shared_ptr& read_gen) { // Try it with a mix of read/write sizes. shared_ptr membuf(new TMemoryBuffer()); - shared_ptr zlib_trans(new TZlibTransport(membuf, false)); + shared_ptr zlib_trans(new TZlibTransport(membuf)); unsigned int tot; tot = 0; @@ -244,7 +244,7 @@ void test_read_write_mix(const uint8_t* buf, uint32_t buf_len, void test_invalid_checksum(const uint8_t* buf, uint32_t buf_len) { // Verify checksum checking. shared_ptr membuf(new TMemoryBuffer()); - shared_ptr zlib_trans(new TZlibTransport(membuf, false)); + shared_ptr zlib_trans(new TZlibTransport(membuf)); zlib_trans->write(buf, buf_len); zlib_trans->finish(); string tmp_buf; @@ -282,7 +282,7 @@ void test_invalid_checksum(const uint8_t* buf, uint32_t buf_len) { void test_write_after_flush(const uint8_t* buf, uint32_t buf_len) { // write some data shared_ptr membuf(new TMemoryBuffer()); - shared_ptr zlib_trans(new TZlibTransport(membuf, false)); + shared_ptr zlib_trans(new TZlibTransport(membuf)); zlib_trans->write(buf, buf_len); // call finish() @@ -321,7 +321,7 @@ void test_no_write() { { // Create a TZlibTransport object, and immediately destroy it // when it goes out of scope. - TZlibTransport w_zlib_trans(membuf, false); + TZlibTransport w_zlib_trans(membuf); } BOOST_CHECK_EQUAL(membuf->available_read(), 0); -- 2.17.1