Thrift: Reverting TZlibTransport
Summary:
Stupid Red Hat.
The dev boxes don't have the development packages for zlib.
Blame Rev: 59856
Reviewed By: mcslee
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665263 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index ad8d46c..0b13bae 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -26,8 +26,8 @@
namespace facebook { namespace thrift {
-class TOutput {
- public:
+class TOutput{
+public:
TOutput() : f_(&perrorTimeWrapper) {}
inline void setOutputFunction(void (*function)(const char *)){
@@ -47,7 +47,7 @@
fprintf(stderr, "%s ", dbgtime);
perror(msg);
}
- private:
+private:
void (*f_)(const char *);
};
@@ -58,7 +58,7 @@
}
class TException : public std::exception {
- public:
+public:
TException() {}
TException(const std::string& message) :
@@ -74,13 +74,13 @@
}
}
- protected:
+protected:
std::string message_;
};
class TApplicationException : public TException {
- public:
+public:
/**
* Error codes for the various types of exceptions.
@@ -134,7 +134,7 @@
uint32_t read(protocol::TProtocol* iprot);
uint32_t write(protocol::TProtocol* oprot) const;
- protected:
+protected:
/**
* Error code
*/
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index 9b75826..bb8b8a9 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -34,9 +34,7 @@
TIMED_OUT = 3,
END_OF_FILE = 4,
INTERRUPTED = 5,
- BAD_ARGS = 6,
- CORRUPTED_DATA = 7,
- INTERNAL_ERROR = 8,
+ BAD_ARGS = 6
};
TTransportException() :
diff --git a/lib/cpp/src/transport/TZlibTransport.cpp b/lib/cpp/src/transport/TZlibTransport.cpp
deleted file mode 100644
index 59e3a36..0000000
--- a/lib/cpp/src/transport/TZlibTransport.cpp
+++ /dev/null
@@ -1,285 +0,0 @@
-// Copyright (c) 2006- Facebook
-// Distributed under the Thrift Software License
-//
-// See accompanying file LICENSE or visit the Thrift site at:
-// http://developers.facebook.com/thrift/
-
-#include <cassert>
-#include <algorithm>
-#include <transport/TZlibTransport.h>
-#include <zlib.h>
-
-using std::string;
-
-namespace facebook { namespace thrift { namespace transport {
-
-// Don't call this outside of the constructor.
-void TZlibTransport::initZlib() {
- int rv;
- bool r_init = false;
- try {
- rstream_ = new z_stream;
- wstream_ = new z_stream;
-
- rstream_->zalloc = Z_NULL;
- wstream_->zalloc = Z_NULL;
- rstream_->zfree = Z_NULL;
- wstream_->zfree = Z_NULL;
- rstream_->opaque = Z_NULL;
- wstream_->opaque = Z_NULL;
-
- rstream_->next_in = crbuf_;
- wstream_->next_in = uwbuf_;
- rstream_->next_out = urbuf_;
- wstream_->next_out = cwbuf_;
- rstream_->avail_in = 0;
- wstream_->avail_in = 0;
- rstream_->avail_out = urbuf_size_;
- wstream_->avail_out = cwbuf_size_;
-
- rv = inflateInit(rstream_);
- checkZlibRv(rv, rstream_->msg);
-
- // Have to set this flag so we know whether to de-initialize.
- r_init = true;
-
- rv = deflateInit(wstream_, Z_DEFAULT_COMPRESSION);
- checkZlibRv(rv, wstream_->msg);
- }
-
- catch (...) {
- if (r_init) {
- rv = inflateEnd(rstream_);
- checkZlibRvNothrow(rv, rstream_->msg);
- }
- // There is no way we can get here if wstream_ was initialized.
-
- throw;
- }
-}
-
-inline void TZlibTransport::checkZlibRv(int status, const char* message) {
- if (status != Z_OK) {
- throw TZlibTransportException(status, message);
- }
-}
-
-inline void TZlibTransport::checkZlibRvNothrow(int status, const char* message) {
- if (status != Z_OK) {
- string output = "TZlibTransport: zlib failure in destructor: " +
- TZlibTransportException::errorMessage(status, message);
- GlobalOutput(output.c_str());
- }
-}
-
-TZlibTransport::~TZlibTransport() {
- int rv;
- rv = inflateEnd(rstream_);
- checkZlibRvNothrow(rv, rstream_->msg);
- rv = deflateEnd(wstream_);
- checkZlibRvNothrow(rv, wstream_->msg);
-
- delete[] urbuf_;
- delete[] crbuf_;
- delete[] uwbuf_;
- delete[] cwbuf_;
- delete rstream_;
- delete wstream_;
-}
-
-bool TZlibTransport::isOpen() {
- return (readAvail() > 0) || transport_->isOpen();
-}
-
-// READING STRATEGY
-//
-// We have two buffers for reading: one containing the compressed data (crbuf_)
-// and one containing the uncompressed data (urbuf_). When read is called,
-// we repeat the following steps until we have satisfied the request:
-// - Copy data from urbuf_ into the caller's buffer.
-// - If we had enough, return.
-// - If urbuf_ is empty, read some data into it from the underlying transport.
-// - Inflate data from crbuf_ into urbuf_.
-//
-// In standalone objects, we set input_ended_ to true when inflate returns
-// Z_STREAM_END. This allows to make sure that a checksum was verified.
-
-inline int TZlibTransport::readAvail() {
- return urbuf_size_ - rstream_->avail_out - urpos_;
-}
-
-uint32_t TZlibTransport::read(uint8_t* buf, uint32_t len) {
- int need = len;
-
- // TODO(dreiss): Skip urbuf on big reads.
-
- while (true) {
- // Copy out whatever we have available, then give them the min of
- // what we have and what they want, then advance indices.
- int give = std::min(readAvail(), need);
- memcpy(buf, urbuf_ + urpos_, give);
- need -= give;
- buf += give;
- urpos_ += give;
-
- // If they were satisfied, we are done.
- if (need == 0) {
- return len;
- }
-
- // If we get to this point, we need to get some more data.
-
- // If zlib has reported the end of a stream, we can't really do any more.
- if (input_ended_) {
- return len - need;
- }
-
- // The uncompressed read buffer is empty, so reset the stream fields.
- rstream_->next_out = urbuf_;
- rstream_->avail_out = urbuf_size_;
- urpos_ = 0;
-
- // If we don't have any more compressed data available,
- // read some from the underlying transport.
- if (rstream_->avail_in == 0) {
- uint32_t got = transport_->read(crbuf_, crbuf_size_);
- if (got == 0) {
- return len - need;
- }
- rstream_->next_in = crbuf_;
- rstream_->avail_in = got;
- }
-
- // We have some compressed data now. Uncompress it.
- int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
-
- if (zlib_rv == Z_STREAM_END) {
- if (standalone_) {
- input_ended_ = true;
- }
- } else {
- checkZlibRv(zlib_rv, rstream_->msg);
- }
-
- // Okay. The read buffer should have whatever we can give it now.
- // Loop back to the start and try to give some more.
- }
-}
-
-
-// WRITING STRATEGY
-//
-// We buffer up small writes before sending them to zlib, so our logic is:
-// - Is the write big?
-// - Send the buffer to zlib.
-// - Send this data to zlib.
-// - Is the write small?
-// - Is there insufficient space in the buffer for it?
-// - Send the buffer to zlib.
-// - Copy the data to the buffer.
-//
-// We have two buffers for writing also: the uncompressed buffer (mentioned
-// above) and the compressed buffer. When sending data to zlib we loop over
-// the following until the source (uncompressed buffer or big write) is empty:
-// - Is there no more space in the compressed buffer?
-// - Write the compressed buffer to the underlying transport.
-// - Deflate from the source into the compressed buffer.
-
-void TZlibTransport::write(const uint8_t* buf, uint32_t len) {
- // zlib's "deflate" function has enough logic in it that I think
- // we're better off (performance-wise) buffering up small writes.
- if ((int)len > MIN_DIRECT_DEFLATE_SIZE) {
- flushToZlib(uwbuf_, uwpos_);
- uwpos_ = 0;
- flushToZlib(buf, len);
- } else if (len > 0) {
- if (uwbuf_size_ - uwpos_ < (int)len) {
- flushToZlib(uwbuf_, uwpos_);
- uwpos_ = 0;
- }
- memcpy(uwbuf_ + uwpos_, buf, len);
- uwpos_ += len;
- }
-}
-
-void TZlibTransport::flush() {
- flushToZlib(uwbuf_, uwpos_, true);
- assert((int)wstream_->avail_out != cwbuf_size_);
- transport_->write(cwbuf_, cwbuf_size_ - wstream_->avail_out);
- transport_->flush();
-}
-
-void TZlibTransport::flushToZlib(const uint8_t* buf, int len, bool finish) {
- int flush = (finish ? Z_FINISH : Z_NO_FLUSH);
-
- wstream_->next_in = const_cast<uint8_t*>(buf);
- wstream_->avail_in = len;
-
- while (wstream_->avail_in > 0 || finish) {
- // If our ouput buffer is full, flush to the underlying transport.
- if (wstream_->avail_out == 0) {
- transport_->write(cwbuf_, cwbuf_size_);
- wstream_->next_out = cwbuf_;
- wstream_->avail_out = cwbuf_size_;
- }
-
- int zlib_rv = deflate(wstream_, flush);
-
- if (finish && zlib_rv == Z_STREAM_END) {
- assert(wstream_->avail_in == 0);
- break;
- }
-
- checkZlibRv(zlib_rv, wstream_->msg);
- }
-}
-
-bool TZlibTransport::borrow(uint8_t* buf, uint32_t len) {
- // Don't try to be clever with shifting buffers.
- // If we have enough data, give it, otherwise
- // let the protcol use its slow path.
- if (readAvail() >= (int)len) {
- memcpy(buf, urbuf_ + urpos_, len);
- return true;
- }
- return false;
-}
-
-void TZlibTransport::consume(uint32_t len) {
- if (readAvail() >= (int)len) {
- urpos_ += len;
- } else {
- throw TTransportException(TTransportException::BAD_ARGS,
- "consume did not follow a borrow.");
- }
-}
-
-void TZlibTransport::verifyChecksum() {
- if (!standalone_) {
- throw TTransportException(
- TTransportException::BAD_ARGS,
- "TZLibTransport can only verify checksums for standalone objects.");
- }
-
- if (!input_ended_) {
- // This should only be called when reading is complete,
- // but it's possible that the whole checksum has not been fed to zlib yet.
- // We try to read an extra byte here to force zlib to finish the stream.
- // It might not always be easy to "unread" this byte,
- // but we throw an exception if we get it, which is not really
- // a recoverable error, so it doesn't matter.
- uint8_t buf[1];
- uint32_t got = this->read(buf, sizeof(buf));
- if (got || !input_ended_) {
- throw TTransportException(
- TTransportException::CORRUPTED_DATA,
- "Zlib stream not complete.");
- }
- }
-
- // If the checksum had been bad, we would have gotten an error while
- // inflating.
-}
-
-
-}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TZlibTransport.h b/lib/cpp/src/transport/TZlibTransport.h
deleted file mode 100644
index cc2522d..0000000
--- a/lib/cpp/src/transport/TZlibTransport.h
+++ /dev/null
@@ -1,207 +0,0 @@
-// Copyright (c) 2006- Facebook
-// Distributed under the Thrift Software License
-//
-// See accompanying file LICENSE or visit the Thrift site at:
-// http://developers.facebook.com/thrift/
-
-#ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_
-#define _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_ 1
-
-#include <boost/lexical_cast.hpp>
-#include <transport/TTransport.h>
-
-struct z_stream_s;
-
-namespace facebook { namespace thrift { namespace transport {
-
-class TZlibTransportException : public TTransportException {
- public:
- TZlibTransportException(int status, const char* msg) :
- TTransportException(TTransportException::INTERNAL_ERROR,
- errorMessage(status, msg)),
- zlib_status_(status),
- zlib_msg_(msg == NULL ? "(null)" : msg) {}
-
- virtual ~TZlibTransportException() throw() {}
-
- int getZlibStatus() { return zlib_status_; }
- std::string getZlibMessage() { return zlib_msg_; }
-
- static std::string errorMessage(int status, const char* msg) {
- std::string rv = "zlib error: ";
- if (msg) {
- rv += msg;
- } else {
- rv += "(no message)";
- }
- rv += " (status = ";
- rv += boost::lexical_cast<std::string>(status);
- rv += ")";
- return rv;
- }
-
- int zlib_status_;
- std::string zlib_msg_;
-};
-
-/**
- * This transport uses zlib's compressed format on the "far" side.
- *
- * There are two kinds of TZlibTransport objects:
- * - Standalone objects are used to encode self-contained chunks of data
- * (like structures). They include checksums.
- * - Non-standalone transports are used for RPC. They are not implemented yet.
- *
- * TODO(dreiss): Don't do an extra copy of the compressed data if
- * the underlying transport is TBuffered or TMemory.
- *
- * @author David Reiss <dreiss@facebook.com>
- */
-class TZlibTransport : public TTransport {
- public:
-
- /**
- * @param transport The transport to read compressed data from
- * and write compressed data to.
- * @param use_for_rpc True if this object will be used for RPC,
- * false if this is a standalone object.
- * @param urbuf_size Uncompressed buffer size for reading.
- * @param crbuf_size Compressed buffer size for reading.
- * @param uwbuf_size Uncompressed buffer size for writing.
- * @param cwbuf_size Compressed buffer size for writing.
- *
- * TODO(dreiss): Write a constructor that isn't a pain.
- */
- TZlibTransport(boost::shared_ptr<TTransport> transport,
- bool use_for_rpc,
- int urbuf_size = DEFAULT_URBUF_SIZE,
- int crbuf_size = DEFAULT_CRBUF_SIZE,
- int uwbuf_size = DEFAULT_UWBUF_SIZE,
- int cwbuf_size = DEFAULT_CWBUF_SIZE) :
- transport_(transport),
- standalone_(!use_for_rpc),
- urpos_(0),
- uwpos_(0),
- input_ended_(false),
- output_flushed_(false),
- urbuf_size_(urbuf_size),
- crbuf_size_(crbuf_size),
- uwbuf_size_(uwbuf_size),
- cwbuf_size_(cwbuf_size),
- urbuf_(NULL),
- crbuf_(NULL),
- uwbuf_(NULL),
- cwbuf_(NULL),
- rstream_(NULL),
- wstream_(NULL)
- {
-
- if (!standalone_) {
- throw TTransportException(
- TTransportException::BAD_ARGS,
- "TZLibTransport has not been tested for RPC.");
- }
-
- if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
- // Have to copy this into a local because of a linking issue.
- int minimum = MIN_DIRECT_DEFLATE_SIZE;
- throw TTransportException(
- TTransportException::BAD_ARGS,
- "TZLibTransport: uncompressed write buffer must be at least"
- + boost::lexical_cast<std::string>(minimum) + ".");
- }
-
- try {
- urbuf_ = new uint8_t[urbuf_size];
- crbuf_ = new uint8_t[crbuf_size];
- uwbuf_ = new uint8_t[uwbuf_size];
- cwbuf_ = new uint8_t[cwbuf_size];
-
- // Don't call this outside of the constructor.
- initZlib();
-
- } catch (...) {
- delete[] urbuf_;
- delete[] crbuf_;
- delete[] uwbuf_;
- delete[] cwbuf_;
- throw;
- }
- }
-
- // Don't call this outside of the constructor.
- void initZlib();
-
- ~TZlibTransport();
-
- bool isOpen();
-
- void open() {
- transport_->open();
- }
-
- void close() {
- transport_->close();
- }
-
- uint32_t read(uint8_t* buf, uint32_t len);
-
- void write(const uint8_t* buf, uint32_t len);
-
- void flush();
-
- bool borrow(uint8_t* buf, uint32_t len);
-
- void consume(uint32_t len);
-
- void verifyChecksum();
-
- /**
- * TODO(someone_smart): Choose smart defaults.
- */
- static const int DEFAULT_URBUF_SIZE = 128;
- static const int DEFAULT_CRBUF_SIZE = 1024;
- static const int DEFAULT_UWBUF_SIZE = 128;
- static const int DEFAULT_CWBUF_SIZE = 1024;
-
- protected:
-
- inline void checkZlibRv(int status, const char* msg);
- inline void checkZlibRvNothrow(int status, const char* msg);
- inline int readAvail();
- void flushToZlib(const uint8_t* buf, int len, bool finish = false);
-
- // Writes smaller than this are buffered up.
- // Larger (or equal) writes are dumped straight to zlib.
- static const int MIN_DIRECT_DEFLATE_SIZE = 32;
-
- boost::shared_ptr<TTransport> transport_;
- bool standalone_;
-
- int urpos_;
- int uwpos_;
-
- /// True iff zlib has reached the end of a stream.
- /// This is only ever true in standalone protcol objects.
- bool input_ended_;
- /// True iff we have flushed the output stream.
- /// This is only ever true in standalone protcol objects.
- bool output_flushed_;
-
- int urbuf_size_;
- int crbuf_size_;
- int uwbuf_size_;
- int cwbuf_size_;
-
- uint8_t* urbuf_;
- uint8_t* crbuf_;
- uint8_t* uwbuf_;
- uint8_t* cwbuf_;
-
- struct z_stream_s* rstream_;
- struct z_stream_s* wstream_;
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_