require 'thrift/server'
-
+require 'sync'
# thrift/server already imports fastthread/thread
module Thrift
#
# we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
class NonblockingServer < ThreadPoolServer
+ def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20)
+ super
+ @sync = Sync.new
+ end
+
def serve
@server_thread = Thread.current
@serverTransport.listen
thread_group = ThreadGroup.new
loop do
break if @shutdown
- rd, = select([@serverTransport.handle, *connections.keys])
+ handles = [@serverTransport.handle]
+ @sync.synchronize(Sync::SH) do
+ handles.concat connections.keys
+ end
+ rd, = select(handles)
next if rd.nil?
rd.each do |socket|
if socket == @serverTransport.handle
buffer = ''
outtrans = @transportFactory.get_transport(client)
outprot = @protocolFactory.get_protocol(outtrans)
- connections[client.handle] = [client, buffer, outtrans, outprot]
+ @sync.synchronize(Sync::EX) do
+ connections[client.handle] = [client, buffer, outtrans, outprot]
+ end
else
- client, buffer, outtrans, outprot = connections[socket]
+ client, buffer, outtrans, outprot = nil # for scope
+ @sync.synchronize(Sync::SH) do
+ client, buffer, outtrans, outprot = connections[socket]
+ end
if socket.eof?
client.close
- connections.delete(socket)
+ @sync.synchronize(Sync::EX) do
+ connections.delete(socket)
+ end
else
buffer << client.read(4096, true)
if has_full_frame?(buffer)
- running_connections[socket] = connections.delete(socket)
+ @sync.synchronize(Sync::EX) do
+ running_connections[socket] = connections.delete(socket)
+ end
@thread_q.push :token
t = Thread.new(Thread.current) do |master|
begin
@processor.process(inprot, outprot)
if @shutdown
client.close
- running_connections.delete(socket)
+ @sync.synchronize(Sync::EX) do
+ running_connections.delete(socket)
+ end
else
- swapping_connections[socket] = running_connections.delete(socket)
- master.wakeup
+ @sync.synchronize(Sync::EX) do
+ swapping_connections[socket] = running_connections.delete(socket)
+ end
end
rescue => e
outtrans.close
@exception_q.push e
ensure
- running_connections.delete(socket)
- connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
+ should_wakeup = false
+ @sync.synchronize(Sync::EX) do
+ running_connections.delete(socket)
+ if swapping_connections.include? socket
+ connections[socket] = swapping_connections.delete(socket)
+ should_wakeup = true
+ end
+ end
+ master.wakeup if should_wakeup
intrans.close
@thread_q.pop
end
end
if @shutdown
@serverTransport.close
- connections.merge! running_connections
- connections.merge! swapping_connections
- connections.values.each do |client, buffer, outtrans, outprot|
+ handles = []
+ @sync.synchronize(Sync::SH) do
+ handles = connections
+ handles.merge! running_connections
+ handles.merge! swapping_connections
+ end
+ handles.values.each do |client, buffer, outtrans, outprot|
# can't close completely or we'll break active messages
# but lets at least stop accepting input
client.handle.close_read
end
thread_group.list.each { |t| t.kill } if @shutdown_kill
# now kill connections completely if they still exists
- connections.values.each do |client, buffer, outtrans, outprot|
+ handles.values.each do |client, buffer, outtrans, outprot|
client.close
end
end