From: David Reiss Date: Tue, 29 Apr 2008 00:29:41 +0000 (+0000) Subject: Persistent conns in TSocketPool X-Git-Tag: 0.2.0~804 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=1997f10e282e5d188b65b45190ceb5a7e1f95072;p=common%2Fthrift.git Persistent conns in TSocketPool Summary: Added support for persistent conns in TSocketPool Also, added some util functions in TNonblockingServer Reviewed By: mcslee Test Plan: Ran a test search cluster with these changes - open was only called once (I put fprintfs in open and close), after which the socket was reused Revert: OK TracCamp Project: Thrift DiffCamp Revision: 11425 git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665668 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 7c6bc7fa..0c3c6d54 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -64,6 +64,9 @@ class TNonblockingServer : public TServer { // Event struct, for use with eventBase_ struct event serverEvent_; + // Number of TConnection object we've created + size_t numTConnections_; + /** * This is a stack of all the objects that have been created but that * are NOT currently in use. When we close a connection, we place it on this @@ -82,7 +85,8 @@ class TNonblockingServer : public TServer { port_(port), frameResponses_(true), threadPoolProcessing_(false), - eventBase_(NULL) {} + eventBase_(NULL), + numTConnections_(0) {} TNonblockingServer(boost::shared_ptr processor, boost::shared_ptr protocolFactory, @@ -93,7 +97,8 @@ class TNonblockingServer : public TServer { port_(port), frameResponses_(true), threadManager_(threadManager), - eventBase_(NULL) { + eventBase_(NULL), + numTConnections_(0) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); setOutputTransportFactory(boost::shared_ptr(new TTransportFactory())); setInputProtocolFactory(protocolFactory); @@ -113,7 +118,8 @@ class TNonblockingServer : public TServer { port_(port), frameResponses_(true), threadManager_(threadManager), - eventBase_(NULL) { + eventBase_(NULL), + numTConnections_(0) { setInputTransportFactory(inputTransportFactory); setOutputTransportFactory(outputTransportFactory); setInputProtocolFactory(inputProtocolFactory); @@ -128,6 +134,10 @@ class TNonblockingServer : public TServer { threadPoolProcessing_ = (threadManager != NULL); } + boost::shared_ptr getThreadManager() { + return threadManager_; + } + bool isThreadPoolProcessing() const { return threadPoolProcessing_; } @@ -148,6 +158,18 @@ class TNonblockingServer : public TServer { return eventBase_; } + void incrementNumConnections(size_t incr=1) { + numTConnections_ += incr; + } + + size_t getNumConnections() { + return numTConnections_; + } + + size_t getNumIdleConnections() { + return connectionStack_.size(); + } + TConnection* createConnection(int socket, short flags); void returnConnection(TConnection* connection); @@ -163,7 +185,6 @@ class TNonblockingServer : public TServer { void registerEvents(event_base* base); void serve(); - }; /** @@ -304,6 +325,11 @@ class TConnection { outputTransport_ = boost::shared_ptr(new TMemoryBuffer()); init(socket, eventFlags, s); + server_->incrementNumConnections(); + } + + ~TConnection() { + server_->incrementNumConnections(-1); } // Initialize diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp index e22a4b8b..d5a65012 100644 --- a/lib/cpp/src/transport/TSocketPool.cpp +++ b/lib/cpp/src/transport/TSocketPool.cpp @@ -23,6 +23,7 @@ using boost::shared_ptr; TSocketPoolServer::TSocketPoolServer() : host_(""), port_(0), + socket_(-1), lastFailTime_(0), consecutiveFailures_(0) {} @@ -32,6 +33,7 @@ TSocketPoolServer::TSocketPoolServer() TSocketPoolServer::TSocketPoolServer(const string &host, int port) : host_(host), port_(port), + socket_(-1), lastFailTime_(0), consecutiveFailures_(0) {} @@ -100,7 +102,12 @@ TSocketPool::TSocketPool(const string& host, int port) : TSocket(), } TSocketPool::~TSocketPool() { - close(); + vector< shared_ptr >::const_iterator iter = servers_.begin(); + vector< shared_ptr >::const_iterator iterEnd = servers_.end(); + for (; iter != iterEnd; ++iter) { + setCurrentServer(*iter); + TSocketPool::close(); + } } void TSocketPool::addServer(const string& host, int port) { @@ -136,6 +143,13 @@ void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) { alwaysTryLast_ = alwaysTryLast; } +void TSocketPool::setCurrentServer(const shared_ptr &server) { + currentServer_ = server; + host_ = server->host_; + port_ = server->port_; + socket_ = server->socket_; +} + /* TODO: without apc we ignore a lot of functionality from the php version */ void TSocketPool::open() { if (randomize_) { @@ -145,16 +159,21 @@ void TSocketPool::open() { unsigned int numServers = servers_.size(); for (unsigned int i = 0; i < numServers; ++i) { - TSocketPoolServer &server = *(servers_[i]); - bool retryIntervalPassed = (server.lastFailTime_ == 0); + shared_ptr &server = servers_[i]; + bool retryIntervalPassed = (server->lastFailTime_ == 0); bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false; - host_ = server.host_; - port_ = server.port_; + // Impersonate the server socket + setCurrentServer(server); + + if (isOpen()) { + // already open means we're done + return; + } - if (server.lastFailTime_ > 0) { + if (server->lastFailTime_ > 0) { // The server was marked as down, so check if enough time has elapsed to retry - int elapsedTime = time(NULL) - server.lastFailTime_; + int elapsedTime = time(NULL) - server->lastFailTime_; if (elapsedTime > retryInterval_) { retryIntervalPassed = true; } @@ -165,23 +184,28 @@ void TSocketPool::open() { try { TSocket::open(); + // Copy over the opened socket so that we can keep it persistent + server->socket_ = socket_; + // reset lastFailTime_ is required - if (server.lastFailTime_) { - server.lastFailTime_ = 0; + if (server->lastFailTime_) { + server->lastFailTime_ = 0; } // success return; } catch (TException e) { + string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what(); + GlobalOutput(errStr.c_str()); // connection failed } } - ++server.consecutiveFailures_; - if (server.consecutiveFailures_ > maxConsecutiveFailures_) { + ++server->consecutiveFailures_; + if (server->consecutiveFailures_ > maxConsecutiveFailures_) { // Mark server as down - server.consecutiveFailures_ = 0; - server.lastFailTime_ = time(NULL); + server->consecutiveFailures_ = 0; + server->lastFailTime_ = time(NULL); } } } @@ -190,4 +214,11 @@ void TSocketPool::open() { throw TTransportException(TTransportException::NOT_OPEN); } +void TSocketPool::close() { + if (isOpen()) { + TSocket::close(); + currentServer_->socket_ = -1; + } +} + }}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TSocketPool.h b/lib/cpp/src/transport/TSocketPool.h index 6c3b0f75..a455acaf 100644 --- a/lib/cpp/src/transport/TSocketPool.h +++ b/lib/cpp/src/transport/TSocketPool.h @@ -36,6 +36,9 @@ class TSocketPoolServer { // Port to connect on int port_; + // Socket for the server + int socket_; + // Last time connecting to this server failed int lastFailTime_; @@ -138,11 +141,21 @@ class TSocketPool : public TSocket { */ void open(); + /* + * Closes the UNIX socket + */ + void close(); + protected: + void setCurrentServer(const boost::shared_ptr &server); + /** List of servers to connect to */ std::vector< boost::shared_ptr > servers_; + /** Current server */ + boost::shared_ptr currentServer_; + /** How many times to retry each host in connect */ int numRetries_;