using namespace std;
+/**
+ * TSocketPoolServer implementation
+ *
+ * @author Akhil Wable <akhil@facebook.com>
+ */
+TSocketPoolServer::TSocketPoolServer()
+ : host_(""),
+ port_(0),
+ lastFailTime_(0),
+ consecutiveFailures_(0) {}
+
+/**
+ * Constructor for TSocketPool server
+ */
+TSocketPoolServer::TSocketPoolServer(const std::string &host, int port)
+ : host_(host),
+ port_(port),
+ lastFailTime_(0),
+ consecutiveFailures_(0) {}
+
/**
* TSocketPool implementation.
*
}
TSocketPool::TSocketPool(const vector<pair<string, int> > servers) : TSocket(),
- servers_(servers),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true)
{
+ for (unsigned i = 0; i < servers.size(); ++i) {
+ addServer(servers[i].first, servers[i].second);
+ }
}
TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
}
void TSocketPool::addServer(const string& host, int port) {
- servers_.push_back(pair<string, int>(host, port));
+ servers_.push_back(TSocketPoolServer(host, port));
}
void TSocketPool::setNumRetries(int numRetries) {
std::random_shuffle(servers_.begin(), servers_.end());
}
- for (unsigned int i = 0; i < servers_.size(); ++i) {
- host_ = servers_[i].first;
- port_ = servers_[i].second;
+ unsigned int numServers = servers_.size();
+ for (unsigned int i = 0; i < numServers; ++i) {
+
+ TSocketPoolServer &server = servers_[i];
+ bool retryIntervalPassed = (server.lastFailTime_ == 0);
+ bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
- for (int j = 0; j < numRetries_; ++j) {
- try {
- TSocket::open();
+ if (server.lastFailTime_ > 0) {
+ // The server was marked as down, so check if enough time has elapsed to retry
+ int elapsedTime = time(NULL) - server.lastFailTime_;
+ if (elapsedTime > retryInterval_) {
+ retryIntervalPassed = true;
+ }
+ }
- // success
- return;
- } catch (TException e) {
- // connection failed
+ if (retryIntervalPassed || isLastServer) {
+ for (int j = 0; j < numRetries_; ++j) {
+ try {
+ TSocket::open();
+
+ // reset lastFailTime_ is required
+ if (server.lastFailTime_) {
+ server.lastFailTime_ = 0;
+ }
+
+ // success
+ return;
+ } catch (TException e) {
+ // connection failed
+ }
}
}
+
+ ++server.consecutiveFailures_;
+ if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
+ // Mark server as down
+ server.consecutiveFailures_ = 0;
+ server.lastFailTime_ = time(NULL);
+ }
}
GlobalOutput("TSocketPool::open: all connections failed");
namespace facebook { namespace thrift { namespace transport {
+ /**
+ * Class to hold server information for TSocketPool
+ *
+ * @author Akhil Wable <akhil@facebook.com>
+ */
+class TSocketPoolServer {
+
+ public:
+ /**
+ * Default constructor for server info
+ */
+ TSocketPoolServer();
+
+ /**
+ * Constructor for TSocketPool server
+ */
+ TSocketPoolServer(const std::string &host, int port);
+
+ // Host name
+ std::string host_;
+
+ // Port to connect on
+ int port_;
+
+ // Last time connecting to this server failed
+ int lastFailTime_;
+
+ // Number of consecutive times connecting to this server failed
+ int consecutiveFailures_;
+};
+
/**
* TCP Socket implementation of the TTransport interface.
*
protected:
/** List of servers to connect to */
- std::vector<std::pair<std::string, int> > servers_;
+ std::vector<TSocketPoolServer> servers_;
/** How many times to retry each host in connect */
int numRetries_;