From 2905078d167144a5405119f14d4ce96fa6d0ae42 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Fri, 29 Sep 2006 00:12:30 +0000 Subject: [PATCH] Better socket timeout and options support for Thrift C++ Summary: Also compile without degugging symbols for the linked library Reviewed By: aditya git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664810 13f79535-47bb-0310-9956-ffa450edef68 --- README | 4 + lib/cpp/configure.ac | 4 + lib/cpp/src/concurrency/PosixThreadFactory.cc | 2 +- lib/cpp/src/transport/TServerSocket.cc | 49 ++++- lib/cpp/src/transport/TServerSocket.h | 8 +- lib/cpp/src/transport/TSocket.cc | 180 ++++++++++++++---- lib/cpp/src/transport/TSocket.h | 34 ++++ 7 files changed, 245 insertions(+), 36 deletions(-) diff --git a/README b/README index 27009935..b508f2bb 100644 --- a/README +++ b/README @@ -57,6 +57,10 @@ You may need to specify the locacation of the boost files explicitly: If you i ./configure --with-boost=/usr/local +Note that by default the thrift C++ library is built with no debugging symbols included. If you would like debugging symbols during development work, run: + + ./configure --CFLAGS='-g -O2' + Run ./configure --help to see other configuration options Make thrift diff --git a/lib/cpp/configure.ac b/lib/cpp/configure.ac index d571ba02..2d30c592 100644 --- a/lib/cpp/configure.ac +++ b/lib/cpp/configure.ac @@ -80,4 +80,8 @@ AC_PROG_INSTALL AC_PROG_LIBTOOL +CFLAGS="-O2" + +CXXFLAGS="-O2" + AC_OUTPUT(Makefile) diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index 5df87ec2..130976c9 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -161,7 +161,7 @@ class PosixThreadFactory::Impl { int quanta = (HIGHEST - LOWEST) + 1; float stepsperquanta = (max_priority - min_priority) / quanta; - if(priority <= HIGHEST) { + if (priority <= HIGHEST) { return (int)(min_priority + stepsperquanta * priority); } else { // should never get here for priority increments. diff --git a/lib/cpp/src/transport/TServerSocket.cc b/lib/cpp/src/transport/TServerSocket.cc index 75bd504d..8a14ea5c 100644 --- a/lib/cpp/src/transport/TServerSocket.cc +++ b/lib/cpp/src/transport/TServerSocket.cc @@ -1,5 +1,6 @@ #include #include +#include #include #include "TSocket.h" @@ -11,12 +12,31 @@ namespace facebook { namespace thrift { namespace transport { using namespace boost; TServerSocket::TServerSocket(int port) : - port_(port), serverSocket_(0), acceptBacklog_(1024) {} + port_(port), + serverSocket_(0), + acceptBacklog_(1024), + sendTimeout_(0), + recvTimeout_(0) {} + +TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) : + port_(port), + serverSocket_(0), + acceptBacklog_(1024), + sendTimeout_(sendTimeout), + recvTimeout_(recvTimeout) {} TServerSocket::~TServerSocket() { close(); } +void TServerSocket::setSendTimeout(int sendTimeout) { + sendTimeout_ = sendTimeout; +} + +void TServerSocket::setRecvTimeout(int recvTimeout) { + recvTimeout_ = recvTimeout; +} + void TServerSocket::listen() { serverSocket_ = socket(AF_INET, SOCK_STREAM, 0); if (serverSocket_ == -1) { @@ -34,6 +54,16 @@ void TServerSocket::listen() { throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR"); } + // Defer accept + #ifdef TCP_DEFER_ACCEPT + if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT, + &one, sizeof(one))) { + perror("TServerSocket::listen() TCP_DEFER_ACCEPT"); + close(); + throw TTransportException(TTX_NOT_OPEN, "Could not set TCP_DEFER_ACCEPT"); + } + #endif // #ifdef TCP_DEFER_ACCEPT + // Turn linger off, don't want to block on calls to close struct linger ling = {0, 0}; if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, @@ -43,6 +73,14 @@ void TServerSocket::listen() { throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER"); } + // TCP Nodelay, speed over bandwidth + if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, + &one, sizeof(one))) { + close(); + perror("setsockopt TCP_NODELAY"); + throw TTransportException(TTX_NOT_OPEN, "Could not set TCP_NODELAY"); + } + // Bind to a port struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); @@ -83,7 +121,14 @@ shared_ptr TServerSocket::acceptImpl() { throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno); } - return shared_ptr(new TSocket(clientSocket)); + shared_ptr client(new TSocket(clientSocket)); + if (sendTimeout_ > 0) { + client->setSendTimeout(sendTimeout_); + } + if (recvTimeout_ > 0) { + client->setRecvTimeout(recvTimeout_); + } + return client; } void TServerSocket::close() { diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h index 29cfdfef..496d540f 100644 --- a/lib/cpp/src/transport/TServerSocket.h +++ b/lib/cpp/src/transport/TServerSocket.h @@ -17,8 +17,13 @@ class TSocket; class TServerSocket : public TServerTransport { public: TServerSocket(int port); + TServerSocket(int port, int sendTimeout, int recvTimeout); + ~TServerSocket(); + void setSendTimeout(int sendTimeout); + void setRecvTimeout(int recvTimeout); + void listen(); void close(); @@ -26,10 +31,11 @@ class TServerSocket : public TServerTransport { shared_ptr acceptImpl(); private: - int port_; int serverSocket_; int acceptBacklog_; + int sendTimeout_; + int recvTimeout_; }; }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TSocket.cc b/lib/cpp/src/transport/TSocket.cc index eef76ef8..de3bea7b 100644 --- a/lib/cpp/src/transport/TSocket.cc +++ b/lib/cpp/src/transport/TSocket.cc @@ -6,14 +6,19 @@ #include #include #include +#include +#include +#include "concurrency/Monitor.h" #include "TSocket.h" #include "TTransportException.h" namespace facebook { namespace thrift { namespace transport { using namespace std; +using namespace facebook::thrift::concurrency; +// Global var to track total socket sys calls uint32_t g_socket_syscalls = 0; /** @@ -23,18 +28,35 @@ uint32_t g_socket_syscalls = 0; */ // Mutex to protect syscalls to netdb -pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER; +static Monitor s_netdb_monitor; // TODO(mcslee): Make this an option to the socket class #define MAX_RECV_RETRIES 20 - -TSocket::TSocket(string host, int port) : - host_(host), port_(port), socket_(0) {} - -TSocket::TSocket(int socket) { - socket_ = socket; + +TSocket::TSocket(string host, int port) : + host_(host), + port_(port), + socket_(0), + connTimeout_(0), + sendTimeout_(0), + recvTimeout_(0), + lingerOn_(1), + lingerVal_(0), + noDelay_(1) { } +TSocket::TSocket(int socket) : + host_(""), + port_(0), + socket_(socket), + connTimeout_(0), + sendTimeout_(0), + recvTimeout_(0), + lingerOn_(1), + lingerVal_(0), + noDelay_(1) { +} + TSocket::~TSocket() { close(); } @@ -51,25 +73,35 @@ void TSocket::open() { close(); throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno); } - + + // Send timeout + if (sendTimeout_ > 0) { + setSendTimeout(sendTimeout_); + } + + // Recv timeout + if (recvTimeout_ > 0) { + setRecvTimeout(recvTimeout_); + } + + // Linger + setLinger(lingerOn_, lingerVal_); + + // No delay + setNoDelay(noDelay_); + // Lookup the hostname struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port_); - /* - if (inet_pton(AF_INET, host_.c_str(), &addr.sin_addr) < 0) { - perror("TSocket::open() inet_pton"); - } - */ - { - // TODO(mcslee): Fix scope-locking here to protect hostname lookups - // scopelock sl(&netdb_mutex); + // Scope lock on host entry lookup + Synchronized s(s_netdb_monitor); struct hostent *host_entry = gethostbyname(host_.c_str()); if (host_entry == NULL) { - // perror("dns error: failed call to gethostbyname.\n"); + perror("TSocket: dns error: failed call to gethostbyname."); close(); throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed"); } @@ -79,18 +111,69 @@ void TSocket::open() { host_entry->h_addr_list[0], host_entry->h_length); } + + // Set the socket to be non blocking for connect if a timeout exists + int flags = fcntl(socket_, F_GETFL, 0); + if (connTimeout_ > 0) { + fcntl(socket_, F_SETFL, flags | O_NONBLOCK); + } else { + fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK); + } + + // Conn timeout + struct timeval c = {(int)(connTimeout_/1000), + (int)((connTimeout_%1000)*1000)}; // Connect the socket int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr)); - // Connect failed - if (ret < 0) { - perror("TSocket::open() connect"); + if (ret == 0) { + goto done; + } + + if (errno != EINPROGRESS) { close(); + char buff[1024]; + sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_); + perror(buff); throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno); } - // Connection was successful + fd_set fds; + FD_ZERO(&fds); + FD_SET(socket_, &fds); + ret = select(socket_+1, NULL, &fds, NULL, &c); + + if (ret > 0) { + // Ensure connected + int val; + socklen_t lon; + lon = sizeof(int); + int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon); + if (ret2 == -1) { + close(); + perror("TSocket::open() getsockopt SO_ERROR"); + throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno); + } + if (val == 0) { + goto done; + } + close(); + perror("TSocket::open() SO_ERROR was set"); + throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno); + } else if (ret == 0) { + close(); + perror("TSocket::open() timeed out"); + throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno); + } else { + close(); + perror("TSocket::open() select error"); + throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno); + } + + done: + // Set socket back to normal mode (blocking) + fcntl(socket_, F_SETFL, flags); } void TSocket::close() { @@ -167,12 +250,11 @@ void TSocket::write(const uint8_t* buf, uint32_t len) { while (sent < len) { int flags = 0; - - #if defined(MSG_NOSIGNAL) + #ifdef MSG_NOSIGNAL // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we // check for the EPIPE return condition and close the socket in that case flags |= MSG_NOSIGNAL; - #endif // defined(MSG_NOSIGNAL) + #endif // ifdef MSG_NOSIGNAL int b = send(socket_, buf + sent, len - sent, flags); ++g_socket_syscalls; @@ -207,29 +289,63 @@ void TSocket::write(const uint8_t* buf, uint32_t len) { } void TSocket::setLinger(bool on, int linger) { - // TODO(mcslee): Store these options so they can be set pre-connect + lingerOn_ = on; + lingerVal_ = linger; if (socket_ <= 0) { return; } - struct linger ling = {(on ? 1 : 0), linger}; - if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) { - close(); + struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_}; + int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l)); + if (ret == -1) { perror("TSocket::setLinger()"); } } void TSocket::setNoDelay(bool noDelay) { - // TODO(mcslee): Store these options so they can be set pre-connect + noDelay_ = noDelay; if (socket_ <= 0) { return; } // Set socket to NODELAY - int val = (noDelay ? 1 : 0); - if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) { - close(); + int v = noDelay_ ? 1 : 0; + int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)); + if (ret == -1) { perror("TSocket::setNoDelay()"); } } + +void TSocket::setConnTimeout(int ms) { + connTimeout_ = ms; +} + +void TSocket::setRecvTimeout(int ms) { + recvTimeout_ = ms; + if (socket_ <= 0) { + return; + } + + struct timeval r = {(int)(recvTimeout_/1000), + (int)((recvTimeout_%1000)*1000)}; + int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r)); + if (ret == -1) { + perror("TSocket::setRecvTimeout()"); + } +} + +void TSocket::setSendTimeout(int ms) { + sendTimeout_ = ms; + if (socket_ <= 0) { + return; + } + + struct timeval s = {(int)(sendTimeout_/1000), + (int)((sendTimeout_%1000)*1000)}; + int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s)); + if (ret == -1) { + perror("TSocket::setSendTimeout()"); + } +} + }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index f1a065fb..b946b6ae 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -82,6 +82,22 @@ class TSocket : public TTransport { */ void setNoDelay(bool noDelay); + /** + * Set the connect timeout + */ + void setConnTimeout(int ms); + + /** + * Set the receive timeout + */ + void setRecvTimeout(int ms); + + /** + * Set the send timeout + */ + void setSendTimeout(int ms); + + private: /** * Constructor to create socket from raw UNIX handle. Never called directly @@ -97,6 +113,24 @@ class TSocket : public TTransport { /** Underlying UNIX socket handle */ int socket_; + + /** Connect timeout in ms */ + int connTimeout_; + + /** Send timeout in ms */ + int sendTimeout_; + + /** Recv timeout in ms */ + int recvTimeout_; + + /** Linger on */ + bool lingerOn_; + + /** Linger val */ + int lingerVal_; + + /** Nodelay */ + bool noDelay_; }; }}} // facebook::thrift::transport -- 2.17.1