-- fix read timeout handling in TSocket

Summary:
- turns out that EAGAIN can be returned both when there is a transmission timeout and when there
  is a lack of system resources.

This diff has a hacky fix for respecting a user specified read timeout.

Reviewed By: Steve Grimm, Marc, Slee

Test Plan:
- Tested by trying to crash an srp machine

Revert Plan: No.

Notes:
- Also added functionality to allow users to specify the max number of recv retries (in the case
  when EAGAIN is returned due to a lack of system resources)


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665121 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 32fc2e7..0777807 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -36,9 +36,6 @@
 // Mutex to protect syscalls to netdb
 static Monitor s_netdb_monitor;
 
-// TODO(mcslee): Make this an option to the socket class
-#define MAX_RECV_RETRIES 20
-  
 TSocket::TSocket(string host, int port) : 
   host_(host),
   port_(port),
@@ -48,7 +45,8 @@
   recvTimeout_(0),
   lingerOn_(1),
   lingerVal_(0),
-  noDelay_(1) {
+  noDelay_(1),
+  maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
@@ -62,7 +60,8 @@
   recvTimeout_(0),
   lingerOn_(1),
   lingerVal_(0),
-  noDelay_(1) {
+  noDelay_(1),
+  maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
@@ -76,7 +75,8 @@
   recvTimeout_(0),
   lingerOn_(1),
   lingerVal_(0),
-  noDelay_(1) {
+  noDelay_(1),
+  maxRecvRetries_(5) {
   recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
   recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
 }
@@ -235,23 +235,52 @@
     throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
   }
 
-  uint32_t retries = 0;
-  
- try_again:
+  int32_t retries = 0;
+
+  // EAGAIN can be signalled both when a timeout has occurred and when
+  // the system is out of resources (an awesome undocumented feature).
+  // The following is an approximation of the time interval under which
+  // EAGAIN is taken to indicate an out of resources error.
+  uint32_t eagainThresholdMicros = 0;
+  if (recvTimeout_) {
+    // if a readTimeout is specified along with a max number of recv retries, then 
+    // the threshold will ensure that the read timeout is not exceeded even in the
+    // case of resource errors
+    eagainThresholdMicros = (recvTimeout_*1000)/ ((maxRecvRetries_>0) ? maxRecvRetries_ : 2);
+  }
+
+ try_again:  
   // Read from the socket
+  struct timeval begin;
+  gettimeofday(&begin, NULL);
   int got = recv(socket_, buf, len, 0);
+  struct timeval end;
+  gettimeofday(&end, NULL);
+  uint32_t readElapsedMicros =  (((end.tv_sec - begin.tv_sec) * 1000 * 1000)
+                                 + (((uint64_t)(end.tv_usec - begin.tv_usec))));
   ++g_socket_syscalls;
-  
+
   // Check for error on read
   if (got < 0) {   
-    // If temporarily out of resources, sleep a bit and try again
-    if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
-      usleep(50);
-      goto try_again;
+    if (errno == EAGAIN) {
+      // check if this is the lack of resources or timeout case
+      if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
+        if (retries++ < maxRecvRetries_) {
+          usleep(50);
+          goto try_again;
+        } else {
+          throw TTransportException(TTransportException::TIMED_OUT, 
+                                    "EAGAIN (unavailable resources)");
+        }
+      } else {
+        // infer that timeout has been hit
+        throw TTransportException(TTransportException::TIMED_OUT, 
+                                  "EAGAIN (timed out)");
+      }
     }
     
     // If interrupted, try again
-    if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+    if (errno == EINTR && retries++ < maxRecvRetries_) {
       goto try_again;
     }
     
@@ -407,4 +436,8 @@
   }
 }
 
+void TSocket::setMaxRecvRetries(int maxRecvRetries) {
+  maxRecvRetries_ = maxRecvRetries;
+}
+
 }}} // facebook::thrift::transport