From ade2c83d11abe21523607b768b68df1e1ff8bbaf Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Fri, 8 Sep 2006 03:41:50 +0000 Subject: [PATCH] Thrift PHP TSocketPool client Summary: Client that connects to one of an arbitrary pool of servers Reviewed By: aditya git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664795 13f79535-47bb-0310-9956-ffa450edef68 --- lib/java/src/server/TSimpleServer.java | 4 + lib/java/src/server/TThreadPoolServer.java | 4 + lib/php/Makefile.am | 3 +- lib/php/src/protocol/TBinaryProtocol.php | 10 +- lib/php/src/transport/TBufferedTransport.php | 1 - lib/php/src/transport/TSocket.php | 133 ++++++++-- lib/php/src/transport/TSocketPool.php | 265 +++++++++++++++++++ test/php/TestClient.php | 6 +- 8 files changed, 405 insertions(+), 21 deletions(-) create mode 100644 lib/php/src/transport/TSocketPool.php diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java index 76a57628..05156c3f 100644 --- a/lib/java/src/server/TSimpleServer.java +++ b/lib/java/src/server/TSimpleServer.java @@ -35,8 +35,12 @@ public class TSimpleServer extends TServer { io = transportFactory_.getIOTransports(client); while (processor_.process(io[0], io[1])); } + } catch (TTransportException ttx) { + // Client died, just move on } catch (TException tx) { tx.printStackTrace(); + } catch (Exception x) { + x.printStackTrace(); } if (io != null) { diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java index 2f5be8d6..7201cd3b 100644 --- a/lib/java/src/server/TThreadPoolServer.java +++ b/lib/java/src/server/TThreadPoolServer.java @@ -101,8 +101,12 @@ public class TThreadPoolServer extends TServer { try { io = transportFactory_.getIOTransports(client_); while (processor_.process(io[0], io[1])) {} + } catch (TTransportException ttx) { + // Assume the client died and continue silently } catch (TException tx) { tx.printStackTrace(); + } catch (Exception x) { + x.printStackTrace(); } if (io != null) { diff --git a/lib/php/Makefile.am b/lib/php/Makefile.am index 23208d9f..c7cd1069 100644 --- a/lib/php/Makefile.am +++ b/lib/php/Makefile.am @@ -7,7 +7,8 @@ protocol_SCRIPTS = src/protocol/TProtocol.php \ transport_SCRIPTS = src/transport/TTransport.php \ src/transport/TBufferedTransport.php \ src/transport/TChunkedTransport.php \ - src/transport/TSocket.php + src/transport/TSocket.php \ + src/transport/TSocketPool.php thriftdir = $(prefix)/php/thrift diff --git a/lib/php/src/protocol/TBinaryProtocol.php b/lib/php/src/protocol/TBinaryProtocol.php index 2b1384f6..4123f5f8 100644 --- a/lib/php/src/protocol/TBinaryProtocol.php +++ b/lib/php/src/protocol/TBinaryProtocol.php @@ -145,7 +145,9 @@ class TBinaryProtocol extends TProtocol { public function writeString($out, $value) { $len = strlen($value); $result = $this->writeI32($out, $len); - $out->write($value, $len); + if ($len) { + $out->write($value, $len); + } return $result + $len; } @@ -317,7 +319,11 @@ class TBinaryProtocol extends TProtocol { public function readString($in, &$value) { $result = $this->readI32($in, $len); - $value = $in->readAll($len); + if ($len) { + $value = $in->readAll($len); + } else { + $value = ''; + } return $result + $len; } } diff --git a/lib/php/src/transport/TBufferedTransport.php b/lib/php/src/transport/TBufferedTransport.php index dad96ff5..3c66135f 100644 --- a/lib/php/src/transport/TBufferedTransport.php +++ b/lib/php/src/transport/TBufferedTransport.php @@ -105,4 +105,3 @@ class TBufferedTransport extends TTransport { } ?> - diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php index 0a3b090e..74ef01fe 100644 --- a/lib/php/src/transport/TSocket.php +++ b/lib/php/src/transport/TSocket.php @@ -20,21 +20,49 @@ class TSocket extends TTransport { * * @var string */ - private $host_ = 'localhost'; + protected $host_ = 'localhost'; /** * Remote port * * @var int */ - private $port_ = '9090'; + protected $port_ = '9090'; + + /** + * Send timeout in milliseconds + * + * @var int + */ + private $sendTimeout_ = 100; + + /** + * Recv timeout in milliseconds + * + * @var int + */ + private $recvTimeout_ = 750; + + /** + * Is send timeout set? + * + * @var bool + */ + private $sendTimeoutSet_ = FALSE; /** * Persistent socket or plain? * * @var bool */ - private $persist_ = false; + private $persist_ = FALSE; + + /** + * Debugging on? + * + * @var bool + */ + private $debug_ = FALSE; /** * Socket constructor @@ -43,12 +71,39 @@ class TSocket extends TTransport { * @param int $port Remote port * @param bool $persist Whether to use a persistent socket */ - public function __construct($host='localhost', $port=9090, $persist=false) { + public function __construct($host='localhost', $port=9090, $persist=FALSE) { $this->host_ = $host; $this->port_ = $port; $this->persist_ = $persist; } + /** + * Sets the send timeout. + * + * @param int $timeout + */ + public function setSendTimeout($timeout) { + $this->sendTimeout_ = $timeout; + } + + /** + * Sets the receive timeout. + * + * @param int $timeout + */ + public function setRecvTimeout($timeout) { + $this->recvTimeout_ = $timeout; + } + + /** + * Sets debugging output on or off + * + * @param bool $debug + */ + public function setDebug($debug) { + $this->debug_ = $debug; + } + /** * Tests whether this is open * @@ -63,37 +118,73 @@ class TSocket extends TTransport { */ public function open() { if ($this->persist_) { - $this->handle_ = pfsockopen($this->host_, $this->port_); + $this->handle_ = pfsockopen($this->host_, + $this->port_, + $errno, + $errstr, + $this->sendTimeout_/1000.0); } else { - $this->handle_ = fsockopen($this->host_, $this->port_); + $this->handle_ = fsockopen($this->host_, + $this->port_, + $errno, + $errstr, + $this->sendTimeout_/1000.0); } - if ($this->handle_ === FALSE) { - throw new Exception('TSocket: Could not connect to '. - $this->host_.':'.$this->port_); + + // Connect failed? + if ($this->handle_ === FALSE) { + $error = 'TSocket: Could not connect to '.$this->host_.':'.$this->port_; + if ($this->debug_) { + error_log($error); + } + throw new Exception($error); } + + stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000); + $this->sendTimeoutSet_ = TRUE; } /** - * Closes the socket + * Closes the socket. */ public function close() { if (!$this->persist_) { - fclose($this->handle_); + @fclose($this->handle_); + $this->handle_ = null; } } /** * Uses stream get contents to do the reading + * + * @param int $len How many bytes + * @return string Binary data */ public function readAll($len) { - return stream_get_contents($this->handle_, $len); + if ($this->sendTimeoutSet_) { + stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000); + $this->sendTimeoutSet_ = FALSE; + } + $buf = @stream_get_contents($this->handle_, $len); + if ($buf === FALSE || strlen($buf) !== $len) { + throw new Exception('TSocket: Could not read '.$len.' bytes from '. + $this->host_.':'.$this->port_); + } + return $buf; } /** * Read from the socket + * + * @param int $len How many bytes + * @return string Binary data */ public function read($len) { - $data = fread($this->handle_, 1); + if ($this->sendTimeoutSet_) { + stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000); + $this->sendTimeoutSet_ = FALSE; + } + $data = @fread($this->handle_, 1); if ($data === FALSE) { throw new Exception('TSocket: Could not read '.$len.' bytes from '. $this->host_.':'.$this->port_); @@ -103,11 +194,17 @@ class TSocket extends TTransport { /** * Write to the socket. + * + * @param string $buf The data to write */ public function write($buf) { + if (!$this->sendTimeoutSet_) { + stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000); + $this->sendTimeoutSet_ = TRUE; + } while (!empty($buf)) { - $got = fwrite($this->handle_, $buf); - if ($got == false) { + $got = @fwrite($this->handle_, $buf); + if ($got === 0 || $got === FALSE) { throw new Exception('TSocket: Could not write '.strlen($buf).' bytes '. $this->host_.':'.$this->port_); } @@ -119,7 +216,11 @@ class TSocket extends TTransport { * Flush output to the socket. */ public function flush() { - fflush($this->handle_); + $ret = fflush($this->handle_); + if ($ret === FALSE) { + throw new Exception('TSocket: Could not flush: '. + $this->host_.':'.$this->port_); + } } } diff --git a/lib/php/src/transport/TSocketPool.php b/lib/php/src/transport/TSocketPool.php new file mode 100644 index 00000000..58f237a2 --- /dev/null +++ b/lib/php/src/transport/TSocketPool.php @@ -0,0 +1,265 @@ + + */ +class TSocketPool extends TSocket { + + /** + * Remote hostname + * + * @var array + */ + private $hosts_ = array('localhost'); + + /** + * Remote ports + * + * @var array + */ + private $ports_ = array('9090'); + + /** + * How many times to retry each host in connect + * + * @var int + */ + private $numRetries_ = 1; + + /** + * Retry interval in seconds, how long to not try a host if it has been + * marked as down. + * + * @var int + */ + private $retryInterval_ = 60; + + /** + * Max consecutive failures before marking a host down. + * + * @var int + */ + private $maxConsecutiveFailures_ = 1; + + /** + * Try hosts in order? or Randomized? + * + * @var bool + */ + private $randomize_ = TRUE; + + /** + * Always try last host, even if marked down? + * + * @var bool + */ + private $alwaysTryLast_ = TRUE; + + /** + * Socket pool constructor + * + * @param array $hosts List of remote hostnames + * @param mixed $ports Array of remote ports, or a single common port + * @param bool $persist Whether to use a persistent socket + */ + public function __construct($hosts=array('localhost'), + $ports=array(9090), + $persist=FALSE) { + parent::__construct(null, 0, $persist); + $this->hosts_ = $hosts; + + // Ports may be an array or a single port + if (is_array($ports)) { + $this->ports_ = $ports; + } else { + $this->ports_ = array(); + $num = count($hosts); + for ($i = 0; $i < $num; ++$i) { + $this->ports_ []= $ports; + } + } + } + + /** + * Sets how many time to keep retrying a host in the connect function. + * + * @param int $numRetries + */ + public function setNumRetries($numRetries) { + $this->numRetries_ = $numRetries; + } + + /** + * Sets how long to wait until retrying a host if it was marked down + * + * @param int $numRetries + */ + public function setRetryInterval($retryInterval) { + $this->retryInterval_ = $retryInterval; + } + + /** + * Sets how many time to keep retrying a host before marking it as down. + * + * @param int $numRetries + */ + public function setMaxConsecutiveFailures($maxConsecutiveFailures) { + $this->maxConsecutiveFailures_ = $maxConsecutiveFailures; + } + + /** + * Turns randomization in connect order on or off. + * + * @param bool $randomize + */ + public function setRandomize($randomize) { + $this->randomize_ = $randomize; + } + + /** + * Whether to always try the last server. + * + * @param bool $alwaysTryLast + */ + public function setAlwaysTryLast($alwaysTryLast) { + $this->alwaysTryLast_ = $alwaysTryLast; + } + + + /** + * Connects the socket by iterating through all the servers in the pool + * and trying to find one that works. + */ + public function open() { + $numServers = count($this->hosts_); + + // Check if a random server from the pool should be hit + if ($this->randomize_) { + $startingPoint = mt_rand(0, $numServers-1); + } else { + $startingPoint = 0; + } + $i = $startingPoint; + + do { + $host = $this->hosts_[$i]; + $port = $this->ports_[$i]; + + // Check APC cache for a record of this server being down + $failtimeKey = 'thrift_failtime:'.$host_.':'.$port.'~'; + + // Cache miss? Assume it's OK + $lastFailtime = apc_fetch($failtimeKey); + if ($lastFailtime === FALSE) { + $lastFailtime = 0; + } + + $retryIntervalPassed = FALSE; + + // Cache hit...make sure enough the retry interval has elapsed + if ($lastFailtime > 0) { + $elapsed = time() - $lastFailtime; + if ($elapsed > $retryInterval) { + $retryIntervalPassed = TRUE; + if ($this->debug_) { + error_log('TSocketPool: retryInterval '. + '('.$this->retryInterval_.') '. + 'has passed for host '.$host.':'.$port); + } + } + } + + // Only connect if not in the middle of a fail interval, OR if this + // is the LAST server we are trying, just hammer away on it + $isLastServer = FALSE; + if ($alwaysTryLast) { + $isLastServer = + ( (($i+1) % $numServers) == $startingPoint ) ? TRUE : FALSE; + } + + if (($lastFailtime === 0) || + ($isLastServer) || + ($lastFailtime > 0 && $retryIntervalPassed)) { + + // Set underlying TSocket params to this one + $this->host_ = $host; + $this->port_ = $port; + + for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) { + try { + parent::open(); + + // Only clear the failure counts if required to do so + if ($lastFailtime > 0) { + apc_store($failtimeKey, 0); + } + // Successful connection, return now + return; + + } catch (Exception $x) { + // Connection failed + } + } + + // Mark failure of this host in the cache + $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~'; + + // Ignore cache misses + $consecfails = apc_fetch($consecfailsKey); + if ($consecfails === FALSE) { + $consecfails = 0; + } + + // Increment by one + $consecfails++; + + // Log and cache this failure + if ($consecfails >= $this->maxConsecutiveFailures_) { + if ($this->debug_) { + error_log('TSocketPool: marking '.$host.':'.$port. + ' as down for '.$this->retryInterval.' seconds '. + 'after '.$consecfails.' failed connect attempts.'); + } + // Store the failure time + apc_store($failtimeKey, time()); + + // Clear the count of consecutive failures + apc_store($consecfailsKey, 0); + } else { + apc_store($consecfailsKey, $consecfails); + } + } + $i = ($i + 1) % $numServers; + + } while ($i != $startingPoint); + + // Holy shit we failed them all. The system is totally ill! + $error = 'TSocketPool: All hosts in pool are down. '; + $hostlist = implode(',', $this->hosts_); + $error .= '('.$hostlist.':'.$this->port_.')'; + if ($this->debug_) { + error_log($error); + } + throw new Exception($error); + } +} + +?> diff --git a/test/php/TestClient.php b/test/php/TestClient.php index 27a66eac..235abbe2 100644 --- a/test/php/TestClient.php +++ b/test/php/TestClient.php @@ -17,7 +17,7 @@ require_once $GLOBALS['THRIFT_ROOT'].'/Thrift.php'; require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php'; /** Include the socket layer */ -require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php'; +require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocketPool.php'; /** Include the socket layer */ require_once $GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php'; @@ -36,7 +36,11 @@ if ($argc > 2) { $host = $argv[1]; } +$hosts = array('localhost', '8.2.3.5'); + $socket = new TSocket($host, $port); +$socket = new TSocketPool($hosts, $port); +$socket->setDebug(TRUE); if ($MODE == 'inline') { $transport = $socket; -- 2.17.1