|  | // Copyright (c) 2007- Facebook | 
|  | // Distributed under the Thrift Software License | 
|  | // | 
|  | // See accompanying file LICENSE or visit the Thrift site at: | 
|  | // http://developers.facebook.com/thrift/ | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <iostream> | 
|  |  | 
|  | #include "TSocketPool.h" | 
|  |  | 
|  | namespace facebook { namespace thrift { namespace transport { | 
|  |  | 
|  | using namespace std; | 
|  |  | 
|  | using boost::shared_ptr; | 
|  |  | 
|  | /** | 
|  | * TSocketPoolServer implementation | 
|  | * | 
|  | * @author Akhil Wable <akhil@facebook.com> | 
|  | */ | 
|  | TSocketPoolServer::TSocketPoolServer() | 
|  | : host_(""), | 
|  | port_(0), | 
|  | socket_(-1), | 
|  | lastFailTime_(0), | 
|  | consecutiveFailures_(0) {} | 
|  |  | 
|  | /** | 
|  | * Constructor for TSocketPool server | 
|  | */ | 
|  | TSocketPoolServer::TSocketPoolServer(const string &host, int port) | 
|  | : host_(host), | 
|  | port_(port), | 
|  | socket_(-1), | 
|  | lastFailTime_(0), | 
|  | consecutiveFailures_(0) {} | 
|  |  | 
|  | /** | 
|  | * TSocketPool implementation. | 
|  | * | 
|  | * @author Jason Sobel <jsobel@facebook.com> | 
|  | */ | 
|  |  | 
|  | TSocketPool::TSocketPool() : TSocket(), | 
|  | numRetries_(1), | 
|  | retryInterval_(60), | 
|  | maxConsecutiveFailures_(1), | 
|  | randomize_(true), | 
|  | alwaysTryLast_(true) { | 
|  | } | 
|  |  | 
|  | TSocketPool::TSocketPool(const vector<string> &hosts, | 
|  | const vector<int> &ports) : TSocket(), | 
|  | numRetries_(1), | 
|  | retryInterval_(60), | 
|  | maxConsecutiveFailures_(1), | 
|  | randomize_(true), | 
|  | alwaysTryLast_(true) | 
|  | { | 
|  | if (hosts.size() != ports.size()) { | 
|  | GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size"); | 
|  | throw TTransportException(TTransportException::BAD_ARGS); | 
|  | } | 
|  |  | 
|  | for (unsigned int i = 0; i < hosts.size(); ++i) { | 
|  | addServer(hosts[i], ports[i]); | 
|  | } | 
|  | } | 
|  |  | 
|  | TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(), | 
|  | numRetries_(1), | 
|  | retryInterval_(60), | 
|  | maxConsecutiveFailures_(1), | 
|  | randomize_(true), | 
|  | alwaysTryLast_(true) | 
|  | { | 
|  | for (unsigned i = 0; i < servers.size(); ++i) { | 
|  | addServer(servers[i].first, servers[i].second); | 
|  | } | 
|  | } | 
|  |  | 
|  | TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(), | 
|  | servers_(servers), | 
|  | numRetries_(1), | 
|  | retryInterval_(60), | 
|  | maxConsecutiveFailures_(1), | 
|  | randomize_(true), | 
|  | alwaysTryLast_(true) | 
|  | { | 
|  | } | 
|  |  | 
|  | TSocketPool::TSocketPool(const string& host, int port) : TSocket(), | 
|  | numRetries_(1), | 
|  | retryInterval_(60), | 
|  | maxConsecutiveFailures_(1), | 
|  | randomize_(true), | 
|  | alwaysTryLast_(true) | 
|  | { | 
|  | addServer(host, port); | 
|  | } | 
|  |  | 
|  | TSocketPool::~TSocketPool() { | 
|  | vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin(); | 
|  | vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end(); | 
|  | for (; iter != iterEnd; ++iter) { | 
|  | setCurrentServer(*iter); | 
|  | TSocketPool::close(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void TSocketPool::addServer(const string& host, int port) { | 
|  | servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port))); | 
|  | } | 
|  |  | 
|  | void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) { | 
|  | servers_ = servers; | 
|  | } | 
|  |  | 
|  | void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) { | 
|  | servers = servers_; | 
|  | } | 
|  |  | 
|  | void TSocketPool::setNumRetries(int numRetries) { | 
|  | numRetries_ = numRetries; | 
|  | } | 
|  |  | 
|  | void TSocketPool::setRetryInterval(int retryInterval) { | 
|  | retryInterval_ = retryInterval; | 
|  | } | 
|  |  | 
|  |  | 
|  | void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) { | 
|  | maxConsecutiveFailures_ = maxConsecutiveFailures; | 
|  | } | 
|  |  | 
|  | void TSocketPool::setRandomize(bool randomize) { | 
|  | randomize_ = randomize; | 
|  | } | 
|  |  | 
|  | void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) { | 
|  | alwaysTryLast_ = alwaysTryLast; | 
|  | } | 
|  |  | 
|  | void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) { | 
|  | currentServer_ = server; | 
|  | host_ = server->host_; | 
|  | port_ = server->port_; | 
|  | socket_ = server->socket_; | 
|  | } | 
|  |  | 
|  | /* TODO: without apc we ignore a lot of functionality from the php version */ | 
|  | void TSocketPool::open() { | 
|  | if (randomize_) { | 
|  | random_shuffle(servers_.begin(), servers_.end()); | 
|  | } | 
|  |  | 
|  | unsigned int numServers = servers_.size(); | 
|  | for (unsigned int i = 0; i < numServers; ++i) { | 
|  |  | 
|  | shared_ptr<TSocketPoolServer> &server = servers_[i]; | 
|  | bool retryIntervalPassed = (server->lastFailTime_ == 0); | 
|  | bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false; | 
|  |  | 
|  | // Impersonate the server socket | 
|  | setCurrentServer(server); | 
|  |  | 
|  | if (isOpen()) { | 
|  | // already open means we're done | 
|  | return; | 
|  | } | 
|  |  | 
|  | if (server->lastFailTime_ > 0) { | 
|  | // The server was marked as down, so check if enough time has elapsed to retry | 
|  | int elapsedTime = time(NULL) - server->lastFailTime_; | 
|  | if (elapsedTime > retryInterval_) { | 
|  | retryIntervalPassed = true; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (retryIntervalPassed || isLastServer) { | 
|  | for (int j = 0; j < numRetries_; ++j) { | 
|  | try { | 
|  | TSocket::open(); | 
|  |  | 
|  | // Copy over the opened socket so that we can keep it persistent | 
|  | server->socket_ = socket_; | 
|  |  | 
|  | // reset lastFailTime_ is required | 
|  | if (server->lastFailTime_) { | 
|  | server->lastFailTime_ = 0; | 
|  | } | 
|  |  | 
|  | // success | 
|  | return; | 
|  | } catch (TException e) { | 
|  | string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what(); | 
|  | GlobalOutput(errStr.c_str()); | 
|  | // connection failed | 
|  | } | 
|  | } | 
|  |  | 
|  | ++server->consecutiveFailures_; | 
|  | if (server->consecutiveFailures_ > maxConsecutiveFailures_) { | 
|  | // Mark server as down | 
|  | server->consecutiveFailures_ = 0; | 
|  | server->lastFailTime_ = time(NULL); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | GlobalOutput("TSocketPool::open: all connections failed"); | 
|  | throw TTransportException(TTransportException::NOT_OPEN); | 
|  | } | 
|  |  | 
|  | void TSocketPool::close() { | 
|  | if (isOpen()) { | 
|  | TSocket::close(); | 
|  | currentServer_->socket_ = -1; | 
|  | } | 
|  | } | 
|  |  | 
|  | }}} // facebook::thrift::transport |