From e4489d3e698a2378038227f37d49212b52acf8b1 Mon Sep 17 00:00:00 2001 From: Kevin Clark Date: Wed, 18 Jun 2008 01:16:58 +0000 Subject: [PATCH] Add synchronization around shared resources in NonblockingServer git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669006 13f79535-47bb-0310-9956-ffa450edef68 --- lib/rb/lib/thrift/server/nonblockingserver.rb | 62 ++++++++++++++----- 1 file changed, 47 insertions(+), 15 deletions(-) diff --git a/lib/rb/lib/thrift/server/nonblockingserver.rb b/lib/rb/lib/thrift/server/nonblockingserver.rb index 54dad04d..920175ba 100644 --- a/lib/rb/lib/thrift/server/nonblockingserver.rb +++ b/lib/rb/lib/thrift/server/nonblockingserver.rb @@ -1,5 +1,5 @@ require 'thrift/server' - +require 'sync' # thrift/server already imports fastthread/thread module Thrift @@ -12,6 +12,11 @@ module Thrift # # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods class NonblockingServer < ThreadPoolServer + def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20) + super + @sync = Sync.new + end + def serve @server_thread = Thread.current @serverTransport.listen @@ -27,7 +32,11 @@ module Thrift thread_group = ThreadGroup.new loop do break if @shutdown - rd, = select([@serverTransport.handle, *connections.keys]) + handles = [@serverTransport.handle] + @sync.synchronize(Sync::SH) do + handles.concat connections.keys + end + rd, = select(handles) next if rd.nil? rd.each do |socket| if socket == @serverTransport.handle @@ -35,16 +44,25 @@ module Thrift buffer = '' outtrans = @transportFactory.get_transport(client) outprot = @protocolFactory.get_protocol(outtrans) - connections[client.handle] = [client, buffer, outtrans, outprot] + @sync.synchronize(Sync::EX) do + connections[client.handle] = [client, buffer, outtrans, outprot] + end else - client, buffer, outtrans, outprot = connections[socket] + client, buffer, outtrans, outprot = nil # for scope + @sync.synchronize(Sync::SH) do + client, buffer, outtrans, outprot = connections[socket] + end if socket.eof? client.close - connections.delete(socket) + @sync.synchronize(Sync::EX) do + connections.delete(socket) + end else buffer << client.read(4096, true) if has_full_frame?(buffer) - running_connections[socket] = connections.delete(socket) + @sync.synchronize(Sync::EX) do + running_connections[socket] = connections.delete(socket) + end @thread_q.push :token t = Thread.new(Thread.current) do |master| begin @@ -54,17 +72,27 @@ module Thrift @processor.process(inprot, outprot) if @shutdown client.close - running_connections.delete(socket) + @sync.synchronize(Sync::EX) do + running_connections.delete(socket) + end else - swapping_connections[socket] = running_connections.delete(socket) - master.wakeup + @sync.synchronize(Sync::EX) do + swapping_connections[socket] = running_connections.delete(socket) + end end rescue => e outtrans.close @exception_q.push e ensure - running_connections.delete(socket) - connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket + should_wakeup = false + @sync.synchronize(Sync::EX) do + running_connections.delete(socket) + if swapping_connections.include? socket + connections[socket] = swapping_connections.delete(socket) + should_wakeup = true + end + end + master.wakeup if should_wakeup intrans.close @thread_q.pop end @@ -77,9 +105,13 @@ module Thrift end if @shutdown @serverTransport.close - connections.merge! running_connections - connections.merge! swapping_connections - connections.values.each do |client, buffer, outtrans, outprot| + handles = [] + @sync.synchronize(Sync::SH) do + handles = connections + handles.merge! running_connections + handles.merge! swapping_connections + end + handles.values.each do |client, buffer, outtrans, outprot| # can't close completely or we'll break active messages # but lets at least stop accepting input client.handle.close_read @@ -97,7 +129,7 @@ module Thrift end thread_group.list.each { |t| t.kill } if @shutdown_kill # now kill connections completely if they still exists - connections.values.each do |client, buffer, outtrans, outprot| + handles.values.each do |client, buffer, outtrans, outprot| client.close end end -- 2.17.1