THRIFT-926. cpp: Make TZlibTransport::flush() behave like other transports
Previously, TZlibTransport::flush() finished the zlib stream, so calling
write() after flush() would result in an error. Now it just flushes the
data, without finishing the stream. A new TZlibTransport::finish()
function has been added to finish the stream.
This breaks compatibility. I'm aware of anyone using this code outside
of Facebook, though.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005151 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
index e5ddeee..7f95e38 100644
--- a/lib/cpp/test/TransportTest.cpp
+++ b/lib/cpp/test/TransportTest.cpp
@@ -36,6 +36,7 @@
#include <transport/TBufferTransports.h>
#include <transport/TFDTransport.h>
#include <transport/TFileTransport.h>
+#include <transport/TZlibTransport.h>
using namespace apache::thrift::transport;
@@ -178,6 +179,22 @@
boost::shared_ptr<TMemoryBuffer> buf;
};
+class CoupledZlibTransports : public CoupledTransports<TZlibTransport> {
+ public:
+ CoupledZlibTransports() :
+ buf(new TMemoryBuffer) {
+ in = new TZlibTransport(buf, false);
+ out = new TZlibTransport(buf, false);
+ }
+
+ ~CoupledZlibTransports() {
+ delete in;
+ delete out;
+ }
+
+ boost::shared_ptr<TMemoryBuffer> buf;
+};
+
class CoupledFDTransports : public CoupledTransports<TFDTransport> {
public:
CoupledFDTransports() {
@@ -363,7 +380,16 @@
read_size = rchunk_size - chunk_read;
}
- int bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
+ int bytes_read = -1;
+ try {
+ bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
+ } catch (TTransportException& e) {
+ BOOST_FAIL("read(pos=" << total_read << ", size=" << read_size <<
+ ") threw exception \"" << e.what() <<
+ "\"; written so far: " << total_written << " / " <<
+ totalSize << " bytes");
+ }
+
BOOST_REQUIRE_MESSAGE(bytes_read > 0,
"read(pos=" << total_read << ", size=" <<
read_size << ") returned " << bytes_read <<
@@ -449,6 +475,17 @@
BUFFER_TESTS(CoupledBufferedTransports)
BUFFER_TESTS(CoupledFramedTransports)
+ TEST_RW(CoupledZlibTransports, 1024*1024*10, 0, 0);
+ TEST_RW(CoupledZlibTransports, 1024*1024*10, rand4k, rand4k);
+ TEST_RW(CoupledZlibTransports, 1024*1024*5, 167, 163);
+ TEST_RW(CoupledZlibTransports, 1024*64, 1, 1);
+
+ TEST_RW(CoupledZlibTransports, 1024*1024*10, 0, 0, rand4k, rand4k);
+ TEST_RW(CoupledZlibTransports, 1024*1024*10,
+ rand4k, rand4k, rand4k, rand4k);
+ TEST_RW(CoupledZlibTransports, 1024*1024*5, 167, 163, rand4k, rand4k);
+ TEST_RW(CoupledZlibTransports, 1024*64, 1, 1, rand4k, rand4k);
+
// TFDTransport tests
// Since CoupledFDTransports tests with a pipe, writes will block
// if there is too much outstanding unread data in the pipe.
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index e952e71..e2403d7 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -145,7 +145,7 @@
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
zlib_trans->write(buf, buf_len);
- zlib_trans->flush();
+ zlib_trans->finish();
boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
uint32_t got = zlib_trans->read(mirror.get(), buf_len);
@@ -164,7 +164,7 @@
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
zlib_trans->write(buf, buf_len);
- zlib_trans->flush();
+ zlib_trans->finish();
string tmp_buf;
membuf->appendBufferToString(tmp_buf);
zlib_trans.reset(new TZlibTransport(membuf, false,
@@ -184,7 +184,7 @@
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
zlib_trans->write(buf, buf_len);
- zlib_trans->flush();
+ zlib_trans->finish();
string tmp_buf;
membuf->appendBufferToString(tmp_buf);
tmp_buf.erase(tmp_buf.length() - 1);
@@ -222,7 +222,7 @@
tot += write_len;
}
- zlib_trans->flush();
+ zlib_trans->finish();
tot = 0;
boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
@@ -246,7 +246,7 @@
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
zlib_trans->write(buf, buf_len);
- zlib_trans->flush();
+ zlib_trans->finish();
string tmp_buf;
membuf->appendBufferToString(tmp_buf);
// Modify a byte at the end of the buffer (part of the checksum).
@@ -279,6 +279,54 @@
}
}
+void test_write_after_flush(const uint8_t* buf, uint32_t buf_len) {
+ // write some data
+ shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ zlib_trans->write(buf, buf_len);
+
+ // call finish()
+ zlib_trans->finish();
+
+ // make sure write() throws an error
+ try {
+ uint8_t write_buf[] = "a";
+ zlib_trans->write(write_buf, 1);
+ BOOST_ERROR("write() after finish() did not raise an exception");
+ } catch (TTransportException& ex) {
+ BOOST_CHECK_EQUAL(ex.getType(), TTransportException::BAD_ARGS);
+ }
+
+ // make sure flush() throws an error
+ try {
+ zlib_trans->flush();
+ BOOST_ERROR("flush() after finish() did not raise an exception");
+ } catch (TTransportException& ex) {
+ BOOST_CHECK_EQUAL(ex.getType(), TTransportException::BAD_ARGS);
+ }
+
+ // make sure finish() throws an error
+ try {
+ zlib_trans->finish();
+ BOOST_ERROR("finish() after finish() did not raise an exception");
+ } catch (TTransportException& ex) {
+ BOOST_CHECK_EQUAL(ex.getType(), TTransportException::BAD_ARGS);
+ }
+}
+
+void test_no_write() {
+ // Verify that no data is written to the underlying transport if we
+ // never write data to the TZlibTransport.
+ shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
+ {
+ // Create a TZlibTransport object, and immediately destroy it
+ // when it goes out of scope.
+ TZlibTransport w_zlib_trans(membuf, false);
+ }
+
+ BOOST_CHECK_EQUAL(membuf->available_read(), 0);
+}
+
/*
* Initialization
*/
@@ -301,6 +349,7 @@
ADD_TEST_CASE(suite, name, test_separate_checksum, buf, buf_len);
ADD_TEST_CASE(suite, name, test_incomplete_checksum, buf, buf_len);
ADD_TEST_CASE(suite, name, test_invalid_checksum, buf, buf_len);
+ ADD_TEST_CASE(suite, name, test_write_after_flush, buf, buf_len);
shared_ptr<SizeGenerator> size_32k(new ConstantSizeGenerator(1<<15));
shared_ptr<SizeGenerator> size_lognormal(new LogNormalSizeGenerator(20, 30));
@@ -397,5 +446,7 @@
add_tests(suite, gen_compressible_buffer(buf_len), buf_len, "compressible");
add_tests(suite, gen_random_buffer(buf_len), buf_len, "random");
+ suite->add(BOOST_TEST_CASE(test_no_write));
+
return suite;
}