blob: 219fbf97a966ec6cd3127f9d5c69f6aecbeed0bd [file] [log] [blame]
# Copyright (c) 2006- Facebook
# Distributed under the Apache Software License
#
# See accompanying file LICENSE or visit the Thrift site at:
# http://developers.facebook.com/thrift/
#
# Author: Mark Slee <mcslee@facebook.com>
#
require 'thrift/transport'
require 'socket'
module Thrift
class Socket < Transport
def initialize(host='localhost', port=9090, timeout=nil)
@host = host
@port = port
@timeout = timeout
@desc = "#{host}:#{port}"
@handle = nil
end
attr_accessor :handle, :timeout
def open
begin
addrinfo = ::Socket::getaddrinfo(@host, @port).first
@handle = ::Socket.new(addrinfo[4], ::Socket::SOCK_STREAM, 0)
sockaddr = ::Socket.sockaddr_in(addrinfo[1], addrinfo[3])
begin
@handle.connect_nonblock(sockaddr)
rescue Errno::EINPROGRESS
unless IO.select(nil, [ @handle ], nil, @timeout)
raise TransportException.new(TransportException::NOT_OPEN, "Connection timeout to #{@desc}")
end
begin
@handle.connect_nonblock(sockaddr)
rescue Errno::EISCONN
end
end
@handle
rescue StandardError => e
raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}: #{e}")
end
end
def open?
!@handle.nil? and !@handle.closed?
end
def write(str)
raise IOError, "closed stream" unless open?
begin
if @timeout.nil? or @timeout == 0
@handle.write(str)
else
len = 0
start = Time.now
while Time.now - start < @timeout
rd, wr, = IO.select(nil, [@handle], nil, @timeout)
if wr and not wr.empty?
len += @handle.write_nonblock(str[len..-1])
break if len >= str.length
end
end
if len < str.length
raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")
else
len
end
end
rescue TransportException => e
# pass this on
raise e
rescue StandardError => e
@handle.close
@handle = nil
raise TransportException.new(TransportException::NOT_OPEN, e.message)
end
end
def read(sz)
raise IOError, "closed stream" unless open?
begin
if @timeout.nil? or @timeout == 0
data = @handle.readpartial(sz)
else
# it's possible to interrupt select for something other than the timeout
# so we need to ensure we've waited long enough
start = Time.now
rd = nil # scoping
loop do
rd, = IO.select([@handle], nil, nil, @timeout)
break if (rd and not rd.empty?) or Time.now - start >= @timeout
end
if rd.nil? or rd.empty?
raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")
else
data = @handle.readpartial(sz)
end
end
rescue TransportException => e
# don't let this get caught by the StandardError handler
raise e
rescue StandardError => e
@handle.close unless @handle.closed?
@handle = nil
raise TransportException.new(TransportException::NOT_OPEN, e.message)
end
if (data.nil? or data.length == 0)
raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")
end
data
end
def close
@handle.close unless @handle.nil? or @handle.closed?
@handle = nil
end
def to_io
@handle
end
end
deprecate_class! :TSocket => Socket
class ServerSocket < ServerTransport
# call-seq: initialize(host = nil, port)
def initialize(host_or_port, port = nil)
if port
@host = host_or_port
@port = port
else
@host = nil
@port = host_or_port
end
@handle = nil
end
attr_reader :handle
def listen
@handle = TCPServer.new(@host, @port)
end
def accept
unless @handle.nil?
sock = @handle.accept
trans = Socket.new
trans.handle = sock
trans
end
end
def close
@handle.close unless @handle.nil? or @handle.closed?
@handle = nil
end
def closed?
@handle.nil? or @handle.closed?
end
alias to_io handle
end
deprecate_class! :TServerSocket => ServerSocket
end