From 022b224ebdd932bb8e4fe54beea02589c3661b8b Mon Sep 17 00:00:00 2001 From: Kevin Clark Date: Thu, 5 Mar 2009 21:05:37 +0000 Subject: [PATCH] THRIFT-96. cpp: TSocket.peek fails on FreeBSD Author: Alexander Shigin git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@750585 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/transport/TBufferTransports.h | 152 ++++++++++------------ lib/cpp/src/transport/TSocket.cpp | 32 ++++- 2 files changed, 92 insertions(+), 92 deletions(-) diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h index 9d9510d6..4e3f187a 100644 --- a/lib/cpp/src/transport/TBufferTransports.h +++ b/lib/cpp/src/transport/TBufferTransports.h @@ -156,6 +156,64 @@ class TBufferBase : public TTransport { }; +/** + * Base class for all transport which wraps transport to new one. + */ +class TUnderlyingTransport : public TBufferBase { + public: + static const int DEFAULT_BUFFER_SIZE = 512; + + virtual bool peek() { + return (rBase_ < rBound_) || transport_->peek(); + } + + void open() { + transport_->open(); + } + + bool isOpen() { + return transport_->isOpen(); + } + + void close() { + flush(); + transport_->close(); + } + + boost::shared_ptr getUnderlyingTransport() { + return transport_; + } + + protected: + boost::shared_ptr transport_; + + uint32_t rBufSize_; + uint32_t wBufSize_; + boost::scoped_array rBuf_; + boost::scoped_array wBuf_; + + TUnderlyingTransport(boost::shared_ptr transport, uint32_t sz) + : transport_(transport) + , rBufSize_(sz) + , wBufSize_(sz) + , rBuf_(new uint8_t[rBufSize_]) + , wBuf_(new uint8_t[wBufSize_]) {} + + TUnderlyingTransport(boost::shared_ptr transport) + : transport_(transport) + , rBufSize_(DEFAULT_BUFFER_SIZE) + , wBufSize_(DEFAULT_BUFFER_SIZE) + , rBuf_(new uint8_t[rBufSize_]) + , wBuf_(new uint8_t[wBufSize_]) {} + + TUnderlyingTransport(boost::shared_ptr transport, uint32_t rsz, uint32_t wsz) + : transport_(transport) + , rBufSize_(rsz) + , wBufSize_(wsz) + , rBuf_(new uint8_t[rBufSize_]) + , wBuf_(new uint8_t[wBufSize_]) {} +}; + /** * Buffered transport. For reads it will read more data than is requested * and will serve future data out of a local buffer. For writes, data is @@ -164,64 +222,37 @@ class TBufferBase : public TTransport { * @author Mark Slee * @author David Reiss */ -class TBufferedTransport : public TBufferBase { +class TBufferedTransport : public TUnderlyingTransport { public: - static const int DEFAULT_BUFFER_SIZE = 512; - /// Use default buffer sizes. TBufferedTransport(boost::shared_ptr transport) - : transport_(transport) - , rBufSize_(DEFAULT_BUFFER_SIZE) - , wBufSize_(DEFAULT_BUFFER_SIZE) - , rBuf_(new uint8_t[rBufSize_]) - , wBuf_(new uint8_t[wBufSize_]) + : TUnderlyingTransport(transport) { initPointers(); } /// Use specified buffer sizes. TBufferedTransport(boost::shared_ptr transport, uint32_t sz) - : transport_(transport) - , rBufSize_(sz) - , wBufSize_(sz) - , rBuf_(new uint8_t[rBufSize_]) - , wBuf_(new uint8_t[wBufSize_]) + : TUnderlyingTransport(transport, sz) { initPointers(); } /// Use specified read and write buffer sizes. TBufferedTransport(boost::shared_ptr transport, uint32_t rsz, uint32_t wsz) - : transport_(transport) - , rBufSize_(rsz) - , wBufSize_(wsz) - , rBuf_(new uint8_t[rBufSize_]) - , wBuf_(new uint8_t[wBufSize_]) + : TUnderlyingTransport(transport, rsz, wsz) { initPointers(); } - void open() { - transport_->open(); - } - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { + virtual bool peek() { + /* shigin: see THRIFT-96 discussion */ if (rBase_ == rBound_) { setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); } return (rBound_ > rBase_); } - - void close() { - flush(); - transport_->close(); - } - virtual uint32_t readSlow(uint8_t* buf, uint32_t len); virtual void writeSlow(const uint8_t* buf, uint32_t len); @@ -242,23 +273,12 @@ class TBufferedTransport : public TBufferBase { */ virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); - boost::shared_ptr getUnderlyingTransport() { - return transport_; - } - protected: void initPointers() { setReadBuffer(rBuf_.get(), 0); setWriteBuffer(wBuf_.get(), wBufSize_); // Write size never changes. } - - boost::shared_ptr transport_; - - uint32_t rBufSize_; - uint32_t wBufSize_; - boost::scoped_array rBuf_; - boost::scoped_array wBuf_; }; @@ -292,49 +312,22 @@ class TBufferedTransportFactory : public TTransportFactory { * @author Mark Slee * @author David Reiss */ -class TFramedTransport : public TBufferBase { +class TFramedTransport : public TUnderlyingTransport { public: - static const int DEFAULT_BUFFER_SIZE = 512; - /// Use default buffer sizes. TFramedTransport(boost::shared_ptr transport) - : transport_(transport) - , rBufSize_(0) - , wBufSize_(DEFAULT_BUFFER_SIZE) - , rBuf_() - , wBuf_(new uint8_t[wBufSize_]) + : TUnderlyingTransport(transport) { initPointers(); } TFramedTransport(boost::shared_ptr transport, uint32_t sz) - : transport_(transport) - , rBufSize_(0) - , wBufSize_(sz) - , rBuf_() - , wBuf_(new uint8_t[wBufSize_]) + : TUnderlyingTransport(transport, sz) { initPointers(); } - void open() { - transport_->open(); - } - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { - return (rBase_ < rBound_) || transport_->peek(); - } - - void close() { - flush(); - transport_->close(); - } - virtual uint32_t readSlow(uint8_t* buf, uint32_t len); virtual void writeSlow(const uint8_t* buf, uint32_t len); @@ -343,10 +336,6 @@ class TFramedTransport : public TBufferBase { const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); - boost::shared_ptr getUnderlyingTransport() { - return transport_; - } - protected: /** * Reads a frame of input from the underlying stream. @@ -361,13 +350,6 @@ class TFramedTransport : public TBufferBase { int32_t pad = 0; this->write((uint8_t*)&pad, sizeof(pad)); } - - boost::shared_ptr transport_; - - uint32_t rBufSize_; - uint32_t wBufSize_; - boost::scoped_array rBuf_; - boost::scoped_array wBuf_; }; /** diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index 04cd81e1..1696053d 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -96,6 +96,17 @@ bool TSocket::peek() { int r = recv(socket_, &buf, 1, MSG_PEEK); if (r == -1) { int errno_copy = errno; + #ifdef __FreeBSD__ + /* shigin: + * freebsd returns -1 and ECONNRESET if socket was closed by + * the other side + */ + if (errno_copy == ECONNRESET) + { + close(); + return false; + } + #endif GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy); throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy); } @@ -284,6 +295,7 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { struct timeval begin; gettimeofday(&begin, NULL); int got = recv(socket_, buf, len, 0); + int errno_copy = errno; //gettimeofday can change errno struct timeval end; gettimeofday(&end, NULL); uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000) @@ -292,7 +304,7 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { // Check for error on read if (got < 0) { - if (errno == EAGAIN) { + if (errno_copy == EAGAIN) { // check if this is the lack of resources or timeout case if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) { if (retries++ < maxRecvRetries_) { @@ -310,31 +322,37 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { } // If interrupted, try again - if (errno == EINTR && retries++ < maxRecvRetries_) { + if (errno_copy == EINTR && retries++ < maxRecvRetries_) { goto try_again; } // Now it's not a try again case, but a real probblez - int errno_copy = errno; // Copy errno because we're allocating memory. GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy); // If we disconnect with no linger time - if (errno == ECONNRESET) { + if (errno_copy == ECONNRESET) { + #ifdef __FreeBSD__ + /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with + * ECONNRESET if peer performed shutdown + */ + close(); + return 0; + #else throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET"); + #endif } // This ish isn't open - if (errno == ENOTCONN) { + if (errno_copy == ENOTCONN) { throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN"); } // Timed out! - if (errno == ETIMEDOUT) { + if (errno_copy == ETIMEDOUT) { throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT"); } // Some other error, whatevz - errno_copy = errno; throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy); } -- 2.17.1