From: Kevin Clark Date: Fri, 18 Jul 2008 21:49:50 +0000 (+0000) Subject: rb: Add optional timeout argument to Thrift::Socket [THRIFT-74] X-Git-Tag: 0.2.0~468 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=5ae0bab24791b48caab35a820be19567134f1256;p=common%2Fthrift.git rb: Add optional timeout argument to Thrift::Socket [THRIFT-74] Socket.new and UNIXSocket.new both now have a new optional argument: timeout. There's also a timeout field accessor. This timeout is used when reading or writing. Author: Kevin Ballard git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@678053 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/rb/lib/thrift/transport/socket.rb b/lib/rb/lib/thrift/transport/socket.rb index c7ed521b..8f58352e 100644 --- a/lib/rb/lib/thrift/transport/socket.rb +++ b/lib/rb/lib/thrift/transport/socket.rb @@ -11,14 +11,15 @@ require 'socket' module Thrift class Socket < Transport - def initialize(host='localhost', port=9090) + def initialize(host='localhost', port=9090, timeout=nil) @host = host @port = port + @timeout = timeout @desc = "#{host}:#{port}" @handle = nil end - attr_accessor :handle + attr_accessor :handle, :timeout def open begin @@ -35,11 +36,31 @@ module Thrift def write(str) raise IOError, "closed stream" unless open? begin - @handle.write(str) - rescue StandardError + if @timeout.nil? or @timeout == 0 + @handle.write(str) + else + len = 0 + start = Time.now + while Time.now - start < @timeout + rd, wr, = IO.select(nil, [@handle], nil, @timeout) + if wr and not wr.empty? + len += @handle.write_nonblock(str[len..-1]) + break if len >= str.length + end + end + if len < str.length + raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}") + else + len + end + end + rescue TransportException => e + # pass this on + raise e + rescue StandardError => e @handle.close @handle = nil - raise TransportException.new(TransportException::NOT_OPEN) + raise TransportException.new(TransportException::NOT_OPEN, e.message) end end @@ -47,7 +68,26 @@ module Thrift raise IOError, "closed stream" unless open? begin - data = @handle.readpartial(sz) + if @timeout.nil? or @timeout == 0 + data = @handle.readpartial(sz) + else + # it's possible to interrupt select for something other than the timeout + # so we need to ensure we've waited long enough + start = Time.now + rd = nil # scoping + loop do + rd, = IO.select([@handle], nil, nil, @timeout) + break if (rd and not rd.empty?) or Time.now - start >= @timeout + end + if rd.nil? or rd.empty? + raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}") + else + data = @handle.readpartial(sz) + end + end + rescue TransportException => e + # don't let this get caught by the StandardError handler + raise e rescue StandardError => e @handle.close unless @handle.closed? @handle = nil diff --git a/lib/rb/lib/thrift/transport/unixsocket.rb b/lib/rb/lib/thrift/transport/unixsocket.rb index b24e7dfb..2bf95e9e 100644 --- a/lib/rb/lib/thrift/transport/unixsocket.rb +++ b/lib/rb/lib/thrift/transport/unixsocket.rb @@ -3,8 +3,9 @@ require 'socket' module Thrift class UNIXSocket < Socket - def initialize(path) + def initialize(path, timeout=nil) @path = path + @timeout = timeout @desc = @path # for read()'s error @handle = nil end diff --git a/lib/rb/spec/socket_spec.rb b/lib/rb/spec/socket_spec.rb index 3fede966..c5866ee7 100644 --- a/lib/rb/spec/socket_spec.rb +++ b/lib/rb/spec/socket_spec.rb @@ -28,6 +28,11 @@ class ThriftSocketSpec < Spec::ExampleGroup TCPSocket.should_receive(:new).with('my.domain', 1234) Socket.new('my.domain', 1234).open end + + it "should accept an optional timeout" do + TCPSocket.stub!(:new) + Socket.new('localhost', 8080, 5).timeout.should == 5 + end end describe ServerSocket do diff --git a/lib/rb/spec/socket_spec_shared.rb b/lib/rb/spec/socket_spec_shared.rb index b32ab44a..2d17fd3c 100644 --- a/lib/rb/spec/socket_spec_shared.rb +++ b/lib/rb/spec/socket_spec_shared.rb @@ -51,4 +51,35 @@ shared_examples_for "a socket" do lambda { @socket.write("fail") }.should raise_error(IOError, "closed stream") lambda { @socket.read(10) }.should raise_error(IOError, "closed stream") end + + it "should support the timeout accessor for read" do + @socket.timeout = 3 + @socket.open + IO.should_receive(:select).with([@handle], nil, nil, 3).and_return([[@handle], [], []]) + @handle.should_receive(:readpartial).with(17).and_return("test data") + @socket.read(17).should == "test data" + end + + it "should support the timeout accessor for write" do + @socket.timeout = 3 + @socket.open + IO.should_receive(:select).with(nil, [@handle], nil, 3).twice.and_return([[], [@handle], []]) + @handle.should_receive(:write_nonblock).with("test data").and_return(4) + @handle.should_receive(:write_nonblock).with(" data").and_return(5) + @socket.write("test data").should == 9 + end + + it "should raise an error when read times out" do + @socket.timeout = 0.5 + @socket.open + IO.should_receive(:select).with([@handle], nil, nil, 0.5).at_least(1).times.and_return(nil) + lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::TIMED_OUT } + end + + it "should raise an error when write times out" do + @socket.timeout = 0.5 + @socket.open + IO.should_receive(:select).with(nil, [@handle], nil, 0.5).any_number_of_times.and_return(nil) + lambda { @socket.write("test data") }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::TIMED_OUT } + end end diff --git a/lib/rb/spec/unixsocket_spec.rb b/lib/rb/spec/unixsocket_spec.rb index ff5c96c6..222a1c79 100644 --- a/lib/rb/spec/unixsocket_spec.rb +++ b/lib/rb/spec/unixsocket_spec.rb @@ -20,6 +20,11 @@ class ThriftUNIXSocketSpec < Spec::ExampleGroup ::UNIXSocket.should_receive(:new).and_raise(StandardError) lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN } end + + it "should accept an optional timeout" do + ::UNIXSocket.stub!(:new) + UNIXSocket.new(@path, 5).timeout.should == 5 + end end describe UNIXServerSocket do