From: Aditya Agarwal Date: Wed, 23 May 2007 02:14:58 +0000 (+0000) Subject: -- fix read timeout handling in TSocket X-Git-Tag: 0.2.0~1351 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=e04475b5aaa2bba920a2c1b4021a1a447f947dd9;p=common%2Fthrift.git -- fix read timeout handling in TSocket Summary: - turns out that EAGAIN can be returned both when there is a transmission timeout and when there is a lack of system resources. This diff has a hacky fix for respecting a user specified read timeout. Reviewed By: Steve Grimm, Marc, Slee Test Plan: - Tested by trying to crash an srp machine Revert Plan: No. Notes: - Also added functionality to allow users to specify the max number of recv retries (in the case when EAGAIN is returned due to a lack of system resources) git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665121 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h index 735cd878..575a3edd 100644 --- a/lib/cpp/src/concurrency/Exception.h +++ b/lib/cpp/src/concurrency/Exception.h @@ -20,9 +20,19 @@ class InvalidArgumentException : public facebook::thrift::TException {}; class IllegalStateException : public facebook::thrift::TException {}; -class TimedOutException : public facebook::thrift::TException {}; +class TimedOutException : public facebook::thrift::TException { +public: + TimedOutException():TException("TimedOutException"){}; + TimedOutException(const std::string& message ) : + TException(message) {} +}; -class TooManyPendingTasksException : public facebook::thrift::TException {}; +class TooManyPendingTasksException : public facebook::thrift::TException { +public: + TooManyPendingTasksException():TException("TooManyPendingTasksException"){}; + TooManyPendingTasksException(const std::string& message ) : + TException(message) {} +}; class SystemResourceException : public facebook::thrift::TException { public: diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index 32fc2e77..0777807b 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -36,9 +36,6 @@ uint32_t g_socket_syscalls = 0; // Mutex to protect syscalls to netdb static Monitor s_netdb_monitor; -// TODO(mcslee): Make this an option to the socket class -#define MAX_RECV_RETRIES 20 - TSocket::TSocket(string host, int port) : host_(host), port_(port), @@ -48,7 +45,8 @@ TSocket::TSocket(string host, int port) : recvTimeout_(0), lingerOn_(1), lingerVal_(0), - noDelay_(1) { + noDelay_(1), + maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); } @@ -62,7 +60,8 @@ TSocket::TSocket() : recvTimeout_(0), lingerOn_(1), lingerVal_(0), - noDelay_(1) { + noDelay_(1), + maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); } @@ -76,7 +75,8 @@ TSocket::TSocket(int socket) : recvTimeout_(0), lingerOn_(1), lingerVal_(0), - noDelay_(1) { + noDelay_(1), + maxRecvRetries_(5) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); } @@ -235,23 +235,52 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket"); } - uint32_t retries = 0; - - try_again: + int32_t retries = 0; + + // EAGAIN can be signalled both when a timeout has occurred and when + // the system is out of resources (an awesome undocumented feature). + // The following is an approximation of the time interval under which + // EAGAIN is taken to indicate an out of resources error. + uint32_t eagainThresholdMicros = 0; + if (recvTimeout_) { + // if a readTimeout is specified along with a max number of recv retries, then + // the threshold will ensure that the read timeout is not exceeded even in the + // case of resource errors + eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2); + } + + try_again: // Read from the socket + struct timeval begin; + gettimeofday(&begin, NULL); int got = recv(socket_, buf, len, 0); + struct timeval end; + gettimeofday(&end, NULL); + uint32_t readElapsedMicros = (((end.tv_sec - begin.tv_sec) * 1000 * 1000) + + (((uint64_t)(end.tv_usec - begin.tv_usec)))); ++g_socket_syscalls; - + // Check for error on read if (got < 0) { - // If temporarily out of resources, sleep a bit and try again - if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) { - usleep(50); - goto try_again; + if (errno == EAGAIN) { + // check if this is the lack of resources or timeout case + if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) { + if (retries++ < maxRecvRetries_) { + usleep(50); + goto try_again; + } else { + throw TTransportException(TTransportException::TIMED_OUT, + "EAGAIN (unavailable resources)"); + } + } else { + // infer that timeout has been hit + throw TTransportException(TTransportException::TIMED_OUT, + "EAGAIN (timed out)"); + } } // If interrupted, try again - if (errno == EINTR && retries++ < MAX_RECV_RETRIES) { + if (errno == EINTR && retries++ < maxRecvRetries_) { goto try_again; } @@ -407,4 +436,8 @@ void TSocket::setSendTimeout(int ms) { } } +void TSocket::setMaxRecvRetries(int maxRecvRetries) { + maxRecvRetries_ = maxRecvRetries; +} + }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index b00f6ffb..515172d5 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -19,6 +19,7 @@ namespace facebook { namespace thrift { namespace transport { * TCP Socket implementation of the TTransport interface. * * @author Mark Slee + * @author Aditya Agarwal */ class TSocket : public TTransport { /** @@ -130,6 +131,12 @@ class TSocket : public TTransport { */ void setSendTimeout(int ms); + /** + * Set the max number of recv retries in case of an EAGAIN + * error + */ + void setMaxRecvRetries(int maxRecvRetries); + protected: /** * Constructor to create socket from raw UNIX handle. Never called directly @@ -164,6 +171,9 @@ class TSocket : public TTransport { /** Nodelay */ bool noDelay_; + /** Recv EGAIN retries */ + int maxRecvRetries_; + /** Recv timeout timeval */ struct timeval recvTimeval_; };