From: Bryan Duxbury Date: Fri, 4 Mar 2011 01:25:17 +0000 (+0000) Subject: THRIFT-638. php: BufferedTransport + C extensions block until recv timeout is reached... X-Git-Tag: 0.7.0~162 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=c46f32ce181164ecd28de7c328e5cd704282a699;p=common%2Fthrift.git THRIFT-638. php: BufferedTransport + C extensions block until recv timeout is reached on last fread call This patch refactors TSocket to make use of stream_select() for timeout detection. Patch: Nicholas Telford git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1076917 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php index 8297631b..f7130161 100644 --- a/lib/php/src/transport/TSocket.php +++ b/lib/php/src/transport/TSocket.php @@ -50,25 +50,40 @@ class TSocket extends TTransport { protected $port_ = '9090'; /** - * Send timeout in milliseconds + * Send timeout in seconds. + * + * Combined with sendTimeoutUsec this is used for send timeouts. * * @var int */ - private $sendTimeout_ = 100; + private $sendTimeoutSec_ = 0; /** - * Recv timeout in milliseconds + * Send timeout in microseconds. + * + * Combined with sendTimeoutSec this is used for send timeouts. * * @var int */ - private $recvTimeout_ = 750; + private $sendTimeoutUsec_ = 100000; /** - * Is send timeout set? + * Recv timeout in seconds * - * @var bool + * Combined with recvTimeoutUsec this is used for recv timeouts. + * + * @var int + */ + private $recvTimeoutSec_ = 0; + + /** + * Recv timeout in microseconds + * + * Combined with recvTimeoutSec this is used for recv timeouts. + * + * @var int */ - private $sendTimeoutSet_ = FALSE; + private $recvTimeoutUsec_ = 750000; /** * Persistent socket or plain? @@ -123,7 +138,9 @@ class TSocket extends TTransport { * @param int $timeout Timeout in milliseconds. */ public function setSendTimeout($timeout) { - $this->sendTimeout_ = $timeout; + $this->sendTimeoutSec_ = floor($timeout / 1000); + $this->sendTimeoutUsec_ = + ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000; } /** @@ -132,7 +149,9 @@ class TSocket extends TTransport { * @param int $timeout Timeout in milliseconds. */ public function setRecvTimeout($timeout) { - $this->recvTimeout_ = $timeout; + $this->recvTimeoutSec_ = floor($timeout / 1000); + $this->recvTimeoutUsec_ = + ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000; } /** @@ -192,13 +211,13 @@ class TSocket extends TTransport { $this->port_, $errno, $errstr, - $this->sendTimeout_/1000.0); + $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000)); } else { $this->handle_ = @fsockopen($this->host_, $this->port_, $errno, $errstr, - $this->sendTimeout_/1000.0); + $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000)); } // Connect failed? @@ -209,9 +228,6 @@ class TSocket extends TTransport { } throw new TException($error); } - - stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000); - $this->sendTimeoutSet_ = TRUE; } /** @@ -225,66 +241,30 @@ class TSocket extends TTransport { } /** - * Uses stream get contents to do the reading + * Read from the socket at most $len bytes. + * + * This method will not wait for all the requested data, it will return as + * soon as any data is received. * - * @param int $len How many bytes + * @param int $len Maximum number of bytes to read. * @return string Binary data */ - public function readAll($len) { - if ($this->sendTimeoutSet_) { - stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000); - $this->sendTimeoutSet_ = FALSE; - } - // This call does not obey stream_set_timeout values! - // $buf = @stream_get_contents($this->handle_, $len); - - $pre = null; - while (TRUE) { - $buf = @fread($this->handle_, $len); - if ($buf === FALSE) { - $md = stream_get_meta_data($this->handle_); - if (true === $md['timed_out'] && false === $md['blocked']) { - throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. - $this->host_.':'.$this->port_); - } else { + public function read($len) { + $null = null; + $read = array($this->handle_); + $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec_, $this->recvTimeoutUsec_); + + if ($readable > 0) { + $data = @stream_socket_recvfrom($this->handle_, $len); + if ($data === false) { throw new TTransportException('TSocket: Could not read '.$len.' bytes from '. $this->host_.':'.$this->port_); - } - } - else if (($sz = strlen($buf)) < $len) { - if((strlen($buf) == 0) && feof($this->handle_)){ + } elseif($data == '' && feof($this->handle_)) { throw new TTransportException('TSocket read 0 bytes'); - }; - - $md = stream_get_meta_data($this->handle_); - if (true === $md['timed_out'] && false === $md['blocked']) { - throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. - $this->host_.':'.$this->port_); - } else { - $pre .= $buf; - $len -= $sz; } - } else { - return $pre.$buf; - } - } - } - /** - * Read from the socket - * - * @param int $len How many bytes - * @return string Binary data - */ - public function read($len) { - if ($this->sendTimeoutSet_) { - stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000); - $this->sendTimeoutSet_ = FALSE; - } - $data = @fread($this->handle_, $len); - if ($data === FALSE) { - $md = stream_get_meta_data($this->handle_); - if (true === $md['timed_out'] && false === $md['blocked']) { + return $data; + } else if ($readable === 0) { throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '. $this->host_.':'.$this->port_); } else { @@ -292,12 +272,6 @@ class TSocket extends TTransport { $this->host_.':'.$this->port_); } } - elseif((strlen($data) == 0) && feof($this->handle_)) - { - throw new TTransportException('TSocket read 0 bytes'); - }; - return $data; - } /** * Write to the socket. @@ -305,15 +279,23 @@ class TSocket extends TTransport { * @param string $buf The data to write */ public function write($buf) { - if (!$this->sendTimeoutSet_) { - stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000); - $this->sendTimeoutSet_ = TRUE; - } + $null = null; + $write = array($this->handle_); + + // keep writing until all the data has been written while (strlen($buf) > 0) { - $got = @fwrite($this->handle_, $buf); - if ($got === 0 || $got === FALSE) { - $md = stream_get_meta_data($this->handle_); - if ($md['timed_out']) { + // wait for stream to become available for writing + $writable = @stream_select($null, $write, $null, $this->sendTimeoutSec_, $this->sendTimeoutUsec_); + if ($writable > 0) { + // write buffer to stream + $written = @stream_socket_sendto($this->handle_, $buf); + if ($written === -1 || $written === false) { + throw new TTransportException('TSocket: Could not write '.strlen($buf).' bytes '. + $this->host_.':'.$this->port_); + } + // determine how much of the buffer is left to write + $buf = substr($buf, $written); + } else if ($writable === 0) { throw new TTransportException('TSocket: timed out writing '.strlen($buf).' bytes from '. $this->host_.':'.$this->port_); } else { @@ -321,18 +303,18 @@ class TSocket extends TTransport { $this->host_.':'.$this->port_); } } - $buf = substr($buf, $got); } - } /** * Flush output to the socket. + * + * Since read(), readAll() and write() operate on the sockets directly, + * this is a no-op + * + * If you wish to have flushable buffering behaviour, wrap this TSocket + * in a TBufferedTransport. */ public function flush() { - $ret = fflush($this->handle_); - if ($ret === FALSE) { - throw new TException('TSocket: Could not flush: '. - $this->host_.':'.$this->port_); + // no-op } } -}