THRIFT-926. cpp: Make TZlibTransport::flush() behave like other transports

Previously, TZlibTransport::flush() finished the zlib stream, so calling
write() after flush() would result in an error.  Now it just flushes the
data, without finishing the stream.  A new TZlibTransport::finish()
function has been added to finish the stream.

This breaks compatibility.  I'm aware of anyone using this code outside
of Facebook, though.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005151 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
index e5ddeee..7f95e38 100644
--- a/lib/cpp/test/TransportTest.cpp
+++ b/lib/cpp/test/TransportTest.cpp
@@ -36,6 +36,7 @@
 #include <transport/TBufferTransports.h>
 #include <transport/TFDTransport.h>
 #include <transport/TFileTransport.h>
+#include <transport/TZlibTransport.h>
 
 using namespace apache::thrift::transport;
 
@@ -178,6 +179,22 @@
   boost::shared_ptr<TMemoryBuffer> buf;
 };
 
+class CoupledZlibTransports : public CoupledTransports<TZlibTransport> {
+ public:
+  CoupledZlibTransports() :
+    buf(new TMemoryBuffer) {
+    in = new TZlibTransport(buf, false);
+    out = new TZlibTransport(buf, false);
+  }
+
+  ~CoupledZlibTransports() {
+    delete in;
+    delete out;
+  }
+
+  boost::shared_ptr<TMemoryBuffer> buf;
+};
+
 class CoupledFDTransports : public CoupledTransports<TFDTransport> {
  public:
   CoupledFDTransports() {
@@ -363,7 +380,16 @@
         read_size = rchunk_size - chunk_read;
       }
 
-      int bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
+      int bytes_read = -1;
+      try {
+        bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
+      } catch (TTransportException& e) {
+        BOOST_FAIL("read(pos=" << total_read << ", size=" << read_size <<
+                   ") threw exception \"" << e.what() <<
+                   "\"; written so far: " << total_written << " / " <<
+                   totalSize << " bytes");
+      }
+
       BOOST_REQUIRE_MESSAGE(bytes_read > 0,
                             "read(pos=" << total_read << ", size=" <<
                             read_size << ") returned " << bytes_read <<
@@ -449,6 +475,17 @@
     BUFFER_TESTS(CoupledBufferedTransports)
     BUFFER_TESTS(CoupledFramedTransports)
 
+    TEST_RW(CoupledZlibTransports, 1024*1024*10, 0, 0);
+    TEST_RW(CoupledZlibTransports, 1024*1024*10, rand4k, rand4k);
+    TEST_RW(CoupledZlibTransports, 1024*1024*5, 167, 163);
+    TEST_RW(CoupledZlibTransports, 1024*64, 1, 1);
+
+    TEST_RW(CoupledZlibTransports, 1024*1024*10, 0, 0, rand4k, rand4k);
+    TEST_RW(CoupledZlibTransports, 1024*1024*10,
+            rand4k, rand4k, rand4k, rand4k);
+    TEST_RW(CoupledZlibTransports, 1024*1024*5, 167, 163, rand4k, rand4k);
+    TEST_RW(CoupledZlibTransports, 1024*64, 1, 1, rand4k, rand4k);
+
     // TFDTransport tests
     // Since CoupledFDTransports tests with a pipe, writes will block
     // if there is too much outstanding unread data in the pipe.
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index e952e71..e2403d7 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -145,7 +145,7 @@
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
   shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
   zlib_trans->write(buf, buf_len);
-  zlib_trans->flush();
+  zlib_trans->finish();
 
   boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
   uint32_t got = zlib_trans->read(mirror.get(), buf_len);
@@ -164,7 +164,7 @@
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
   shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
   zlib_trans->write(buf, buf_len);
-  zlib_trans->flush();
+  zlib_trans->finish();
   string tmp_buf;
   membuf->appendBufferToString(tmp_buf);
   zlib_trans.reset(new TZlibTransport(membuf, false,
@@ -184,7 +184,7 @@
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
   shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
   zlib_trans->write(buf, buf_len);
-  zlib_trans->flush();
+  zlib_trans->finish();
   string tmp_buf;
   membuf->appendBufferToString(tmp_buf);
   tmp_buf.erase(tmp_buf.length() - 1);
@@ -222,7 +222,7 @@
     tot += write_len;
   }
 
-  zlib_trans->flush();
+  zlib_trans->finish();
 
   tot = 0;
   boost::shared_array<uint8_t> mirror(new uint8_t[buf_len]);
@@ -246,7 +246,7 @@
   shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
   shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
   zlib_trans->write(buf, buf_len);
-  zlib_trans->flush();
+  zlib_trans->finish();
   string tmp_buf;
   membuf->appendBufferToString(tmp_buf);
   // Modify a byte at the end of the buffer (part of the checksum).
@@ -279,6 +279,54 @@
   }
 }
 
+void test_write_after_flush(const uint8_t* buf, uint32_t buf_len) {
+  // write some data
+  shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
+  shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+  zlib_trans->write(buf, buf_len);
+
+  // call finish()
+  zlib_trans->finish();
+
+  // make sure write() throws an error
+  try {
+    uint8_t write_buf[] = "a";
+    zlib_trans->write(write_buf, 1);
+    BOOST_ERROR("write() after finish() did not raise an exception");
+  } catch (TTransportException& ex) {
+    BOOST_CHECK_EQUAL(ex.getType(), TTransportException::BAD_ARGS);
+  }
+
+  // make sure flush() throws an error
+  try {
+    zlib_trans->flush();
+    BOOST_ERROR("flush() after finish() did not raise an exception");
+  } catch (TTransportException& ex) {
+    BOOST_CHECK_EQUAL(ex.getType(), TTransportException::BAD_ARGS);
+  }
+
+  // make sure finish() throws an error
+  try {
+    zlib_trans->finish();
+    BOOST_ERROR("finish() after finish() did not raise an exception");
+  } catch (TTransportException& ex) {
+    BOOST_CHECK_EQUAL(ex.getType(), TTransportException::BAD_ARGS);
+  }
+}
+
+void test_no_write() {
+  // Verify that no data is written to the underlying transport if we
+  // never write data to the TZlibTransport.
+  shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
+  {
+    // Create a TZlibTransport object, and immediately destroy it
+    // when it goes out of scope.
+    TZlibTransport w_zlib_trans(membuf, false);
+  }
+
+  BOOST_CHECK_EQUAL(membuf->available_read(), 0);
+}
+
 /*
  * Initialization
  */
@@ -301,6 +349,7 @@
   ADD_TEST_CASE(suite, name, test_separate_checksum, buf, buf_len);
   ADD_TEST_CASE(suite, name, test_incomplete_checksum, buf, buf_len);
   ADD_TEST_CASE(suite, name, test_invalid_checksum, buf, buf_len);
+  ADD_TEST_CASE(suite, name, test_write_after_flush, buf, buf_len);
 
   shared_ptr<SizeGenerator> size_32k(new ConstantSizeGenerator(1<<15));
   shared_ptr<SizeGenerator> size_lognormal(new LogNormalSizeGenerator(20, 30));
@@ -397,5 +446,7 @@
   add_tests(suite, gen_compressible_buffer(buf_len), buf_len, "compressible");
   add_tests(suite, gen_random_buffer(buf_len), buf_len, "random");
 
+  suite->add(BOOST_TEST_CASE(test_no_write));
+
   return suite;
 }