From: Kevin Clark Date: Wed, 18 Jun 2008 01:17:44 +0000 (+0000) Subject: rb: Completely rewrite Thrift::NonblockingServer X-Git-Tag: 0.2.0~539 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=e45bf59417f0e2594708d26e16b66ccba9d8bfee;p=common%2Fthrift.git rb: Completely rewrite Thrift::NonblockingServer It now has a much better and cleaner architecture, a proper persistent thread pool, a dedicated acceptor thread, and no concurrency issues git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669012 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/rb/lib/thrift/server/nonblockingserver.rb b/lib/rb/lib/thrift/server/nonblockingserver.rb index 920175ba..283d5f59 100644 --- a/lib/rb/lib/thrift/server/nonblockingserver.rb +++ b/lib/rb/lib/thrift/server/nonblockingserver.rb @@ -1,161 +1,269 @@ require 'thrift/server' -require 'sync' -# thrift/server already imports fastthread/thread +require 'logger' +require 'thread' module Thrift # this class expects to always use a FramedTransport for reading messages - #-- - # this isn't very pretty, but we're working around the fact that FramedTransport - # and the processors are all written in a synchronous manner. - # So lets read data off the wire ourselves, check if we have a full frame, and - # only then hand it to the transport to parse - # - # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods - class NonblockingServer < ThreadPoolServer - def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20) - super - @sync = Sync.new + class NonblockingServer < Server + def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20, logger = nil) + super(processor, serverTransport, transportFactory, protocolFactory) + @num_threads = num + if logger.nil? + @logger = Logger.new(STDERR) + @logger.level = Logger::WARN + else + @logger = logger + end + @shutdown_semaphore = Mutex.new end def serve - @server_thread = Thread.current + @logger.info "Starting #{self}" @serverTransport.listen + @io_manager = start_io_manager begin - connections = {} - running_connections = {} - # the swapping_connections stuff is to ensure the thread doesn't - # put the connection back into the regular list, then have the server - # thread process it again, then have the first thread remove it from - # the running_connections list - swapping_connections = {} - thread_group = ThreadGroup.new loop do - break if @shutdown - handles = [@serverTransport.handle] - @sync.synchronize(Sync::SH) do - handles.concat connections.keys + socket = @serverTransport.accept + @logger.debug "Accepted socket: #{socket.inspect}" + @io_manager.add_connection socket + end + rescue IOError => e + # we must be shutting down + @logger.info "#{self} is shutting down, goodbye" + end + ensure + @serverTransport.close + @io_manager.ensure_closed unless @io_manager.nil? + end + + def shutdown(timeout = 0, block = true) + @shutdown_semaphore.synchronize do + return if @is_shutdown + @is_shutdown = true + end + # nonblocking is intended for calling from within a Handler + # but we can't change the order of operations here, so lets thread + shutdown_proc = lambda do + @io_manager.shutdown(timeout) + @serverTransport.close # this will break the accept loop + end + if block + shutdown_proc.call + else + Thread.new &shutdown_proc + end + end + + private + + def start_io_manager + iom = IOManager.new(@processor, @serverTransport, @transportFactory, @protocolFactory, @num_threads, @logger) + iom.spawn + iom + end + + class IOManager # :nodoc: + def initialize(processor, serverTransport, transportFactory, protocolFactory, num, logger) + @processor = processor + @serverTransport = serverTransport + @transportFactory = transportFactory + @protocolFactory = protocolFactory + @num_threads = num + @logger = logger + @connections = [] + @buffers = Hash.new { |h,k| h[k] = '' } + @signal_queue = Queue.new + @signal_pipes = IO.pipe + @signal_pipes[1].sync = true + @worker_queue = Queue.new + @shutdown_queue = Queue.new + end + + def add_connection(socket) + signal [:connection, socket] + end + + def spawn + @iom_thread = Thread.new do + @logger.debug "Starting #{self}" + run + end + end + + def shutdown(timeout = 0) + @logger.debug "#{self} is shutting down workers" + @worker_queue.clear + @num_threads.times { @worker_queue.push [:shutdown] } + signal [:shutdown, timeout] + @shutdown_queue.pop + @signal_pipes[0].close + @signal_pipes[1].close + @logger.debug "#{self} is shutting down, goodbye" + end + + def ensure_closed + kill_worker_threads if @worker_threads + @iom_thread.kill + end + + private + + def run + spin_worker_threads + + loop do + rd, = select([@signal_pipes[0], *@connections]) + if rd.delete @signal_pipes[0] + break if read_signals == :shutdown end - rd, = select(handles) - next if rd.nil? - rd.each do |socket| - if socket == @serverTransport.handle - client = @serverTransport.accept - buffer = '' - outtrans = @transportFactory.get_transport(client) - outprot = @protocolFactory.get_protocol(outtrans) - @sync.synchronize(Sync::EX) do - connections[client.handle] = [client, buffer, outtrans, outprot] - end + rd.each do |fd| + if fd.handle.eof? + remove_connection fd else - client, buffer, outtrans, outprot = nil # for scope - @sync.synchronize(Sync::SH) do - client, buffer, outtrans, outprot = connections[socket] - end - if socket.eof? - client.close - @sync.synchronize(Sync::EX) do - connections.delete(socket) - end - else - buffer << client.read(4096, true) - if has_full_frame?(buffer) - @sync.synchronize(Sync::EX) do - running_connections[socket] = connections.delete(socket) - end - @thread_q.push :token - t = Thread.new(Thread.current) do |master| - begin - membuf = MemoryBuffer.new(buffer) - intrans = @transportFactory.get_transport(membuf) - inprot = @protocolFactory.get_protocol(intrans) - @processor.process(inprot, outprot) - if @shutdown - client.close - @sync.synchronize(Sync::EX) do - running_connections.delete(socket) - end - else - @sync.synchronize(Sync::EX) do - swapping_connections[socket] = running_connections.delete(socket) - end - end - rescue => e - outtrans.close - @exception_q.push e - ensure - should_wakeup = false - @sync.synchronize(Sync::EX) do - running_connections.delete(socket) - if swapping_connections.include? socket - connections[socket] = swapping_connections.delete(socket) - should_wakeup = true - end - end - master.wakeup if should_wakeup - intrans.close - @thread_q.pop - end - end - thread_group.add t - end - end + read_connection fd end end end - if @shutdown - @serverTransport.close - handles = [] - @sync.synchronize(Sync::SH) do - handles = connections - handles.merge! running_connections - handles.merge! swapping_connections - end - handles.values.each do |client, buffer, outtrans, outprot| - # can't close completely or we'll break active messages - # but lets at least stop accepting input - client.handle.close_read - end - start = Time.now.to_f - until thread_group.list.empty? - if @shutdown_timeout - now = Time.now.to_f - cur_timeout = @shutdown_timeout - (now - start) - break if cur_timeout <= 0 - thread_group.list.first.join(cur_timeout) - else - thread_group.list.first.join + join_worker_threads(@shutdown_timeout) + ensure + @shutdown_queue.push :shutdown + end + + def read_connection(fd) + buffer = '' + begin + buffer << fd.read_nonblock(4096) while true + rescue Errno::EAGAIN, EOFError + @buffers[fd] << buffer + end + frame = slice_frame!(@buffers[fd]) + if frame + @worker_queue.push [:frame, fd, frame] + end + end + + def spin_worker_threads + @logger.debug "#{self} is spinning up worker threads" + @worker_threads = [] + @num_threads.times do + @worker_threads << spin_thread + end + end + + def spin_thread + Worker.new(@processor, @transportFactory, @protocolFactory, @logger, @worker_queue).spawn + end + + def signal(msg) + @signal_queue << msg + @signal_pipes[1].write " " + end + + def read_signals + # clear the signal pipe + begin + @signal_pipes[0].read_nonblock(1024) while true + rescue Errno::EAGAIN + end + # now read the signals + begin + loop do + signal, obj = @signal_queue.pop(true) + case signal + when :connection + @connections << obj + when :shutdown + @shutdown_timeout = obj + return :shutdown end end - thread_group.list.each { |t| t.kill } if @shutdown_kill - # now kill connections completely if they still exists - handles.values.each do |client, buffer, outtrans, outprot| - client.close + rescue ThreadError + # out of signals + end + end + + def remove_connection(fd) + # don't explicitly close it, a thread may still be writing to it + @connections.delete fd + @buffers.delete fd + end + + def join_worker_threads(shutdown_timeout) + start = Time.now + @worker_threads.each do |t| + if shutdown_timeout > 0 + timeout = Time.now - (start + shutdown_timeout) + break if timeout <= 0 + t.join(timeout) + else + t.join end end - ensure - @serverTransport.close + kill_worker_threads end - end - # Stop accepting new messages and wait for active messages to finish - # If the given timeout passes without the active messages finishing, - # control will exit from #serve and leave the remaining threads active. - # If you pass true for kill, the remaining threads will be reaped instead. - # A false timeout means wait indefinitely - def shutdown(timeout = nil, kill = false) - @shutdown_timeout = timeout - @shutdown_kill = kill - @shutdown = true - @server_thread.wakeup - end + def kill_worker_threads + @worker_threads.each do |t| + t.kill if t.status + end + @worker_threads.clear + end - private + def slice_frame!(buf) + if buf.length >= 4 + size = buf.unpack('N').first + if buf.length >= size + 4 + buf.slice!(0, size + 4) + else + nil + end + else + nil + end + end + + class Worker # :nodoc: + def initialize(processor, transportFactory, protocolFactory, logger, queue) + @processor = processor + @transportFactory = transportFactory + @protocolFactory = protocolFactory + @logger = logger + @queue = queue + end - def has_full_frame?(buf) - return no unless buf.length >= 4 - size = buf.unpack('N').first - size + 4 <= buf.length + def spawn + Thread.new do + @logger.debug "#{self} is spawning" + run + end + end + + private + + def run + loop do + cmd, *args = @queue.pop + case cmd + when :shutdown + @logger.debug "#{self} is shutting down, goodbye" + break + when :frame + fd, frame = args + begin + otrans = @transportFactory.get_transport(fd) + oprot = @protocolFactory.get_protocol(otrans) + membuf = MemoryBuffer.new(frame) + itrans = @transportFactory.get_transport(membuf) + iprot = @protocolFactory.get_protocol(itrans) + @processor.process(iprot, oprot) + rescue => e + @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}" + end + end + end + end + end end end -end +end \ No newline at end of file diff --git a/lib/rb/spec/nonblockingserver_spec.rb b/lib/rb/spec/nonblockingserver_spec.rb index 773fc039..bf390bd3 100644 --- a/lib/rb/spec/nonblockingserver_spec.rb +++ b/lib/rb/spec/nonblockingserver_spec.rb @@ -35,139 +35,189 @@ class ThriftNonblockingServerSpec < Spec::ExampleGroup end def shutdown - @server.shutdown + @server.shutdown(0, false) end end - before(:each) do - @port = 43251 - handler = Handler.new - processor = NonblockingService::Processor.new(handler) - @transport = ServerSocket.new('localhost', @port) - transportFactory = FramedTransportFactory.new - @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5) - handler.server = @server - @server_thread = Thread.new do - begin - @server.serve - rescue => e - p e - puts e.backtrace * "\n" - raise e - end + class SpecTransport < Transport + def initialize(transport, queue) + @transport = transport + @queue = queue + @flushed = false end - Thread.pass - @clients = [] - end + def open? + @transport.open? + end - after(:each) do - @clients.each { |client, trans| trans.close } - @server_thread.kill - @transport.close - end + def open + @transport.open + end + + def close + @transport.close + end + + def read(sz) + @transport.read(sz) + end + + def write(buf,sz=nil) + @transport.write(buf, sz) + end - def setup_client - transport = FramedTransport.new(Socket.new('localhost', @port)) - protocol = BinaryProtocol.new(transport) - client = NonblockingService::Client.new(protocol) - transport.open - @clients << [client, transport] - client + def flush + @queue.push :flushed unless @flushed or @queue.nil? + @flushed = true + @transport.flush + end end - def setup_client_thread(result) - queue = Queue.new - Thread.new do - client = setup_client - while (msg = queue.pop) - case msg - when :block - result << client.block - when :unblock - client.unblock - when :hello - result << client.greeting(true) # ignore result - when :sleep - client.sleep(0.5) - result << :slept - when :shutdown - client.shutdown - when :exit - result << :done - break + describe Thrift::NonblockingServer do + before(:each) do + @port = 43251 + handler = Handler.new + processor = NonblockingService::Processor.new(handler) + @transport = ServerSocket.new('localhost', @port) + transportFactory = FramedTransportFactory.new + logger = Logger.new(STDERR) + logger.level = Logger::WARN + @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger) + handler.server = @server + @server_thread = Thread.new(Thread.current) do |master_thread| + begin + @server.serve + rescue => e + p e + puts e.backtrace * "\n" + master_thread.raise e end end - @clients.each { |c,t| t.close and break if c == client } #close the transport + Thread.pass + + @clients = [] + @catch_exceptions = false end - queue - end - it "should handle basic message passing" do - client = setup_client - client.greeting(true).should == Hello.new - client.greeting(false).should == Hello.new(:greeting => 'Aloha!') - end + after(:each) do + @clients.each { |client, trans| trans.close } + # @server.shutdown(1) + @server_thread.kill + @transport.close + end - it "should handle concurrent clients" do - queue = Queue.new - 4.times { Thread.new { queue.push setup_client.block } } - setup_client.unblock - 4.times { queue.pop.should be_true } - end + def setup_client(queue = nil) + transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue) + protocol = BinaryProtocol.new(transport) + client = NonblockingService::Client.new(protocol) + transport.open + @clients << [client, transport] + client + end - it "should handle messages from more than 5 long-lived connections" do - queues = [] - result = Queue.new - 7.times do |i| - queues << setup_client_thread(result) - Thread.pass if i == 4 # give the server time to accept connections - end - client = setup_client - # block 4 connections - 4.times { |i| queues[i] << :block } - queues[4] << :hello - queues[5] << :hello - queues[6] << :hello - 3.times { result.pop.should == Hello.new } - client.greeting(true).should == Hello.new - queues[5] << :unblock - 4.times { result.pop.should be_true } - queues[2] << :hello - result.pop.should == Hello.new - client.greeting(false).should == Hello.new(:greeting => 'Aloha!') - 7.times { queues.shift << :exit } - client.greeting(true).should == Hello.new - end + def setup_client_thread(result) + queue = Queue.new + Thread.new do + begin + client = setup_client + while (msg = queue.pop) + case msg + when :block + result << client.block + when :unblock + client.unblock + when :hello + result << client.greeting(true) # ignore result + when :sleep + client.sleep(0.5) + result << :slept + when :shutdown + client.shutdown + when :exit + result << :done + break + end + end + @clients.each { |c,t| t.close and break if c == client } #close the transport + rescue => e + raise e unless @catch_exceptions + end + end + queue + end - it "should shut down when asked" do - @server.shutdown - @server_thread.join(2).should be_an_instance_of(Thread) - end + it "should handle basic message passing" do + client = setup_client + client.greeting(true).should == Hello.new + client.greeting(false).should == Hello.new(:greeting => 'Aloha!') + end - it "should continue processing active messages when shutting down" do - result = Queue.new - client = setup_client_thread(result) - client << :sleep - sleep 0.1 # give the server time to start processing the client's message - @server.shutdown - @server_thread.join(2).should be_an_instance_of(Thread) - result.pop.should == :slept - end + it "should handle concurrent clients" do + queue = Queue.new + trans_queue = Queue.new + 4.times { Thread.new { queue.push setup_client(trans_queue).block } } + 4.times { trans_queue.pop } + setup_client.unblock + 4.times { queue.pop.should be_true } + end - it "should kill active messages when they don't expire while shutting down" do - result = Queue.new - client = setup_client_thread(result) - client << :block - sleep 0.1 # start processing the client's message - @server.shutdown(1, true) - @server_thread.join(3).should_not be_nil - end + it "should handle messages from more than 5 long-lived connections" do + queues = [] + result = Queue.new + 7.times do |i| + queues << setup_client_thread(result) + Thread.pass if i == 4 # give the server time to accept connections + end + client = setup_client + # block 4 connections + 4.times { |i| queues[i] << :block } + queues[4] << :hello + queues[5] << :hello + queues[6] << :hello + 3.times { result.pop.should == Hello.new } + client.greeting(true).should == Hello.new + queues[5] << :unblock + 4.times { result.pop.should be_true } + queues[2] << :hello + result.pop.should == Hello.new + client.greeting(false).should == Hello.new(:greeting => 'Aloha!') + 7.times { queues.shift << :exit } + client.greeting(true).should == Hello.new + end - it "should allow shutting down in response to a message" do - client = setup_client - client.greeting(true).should == Hello.new - client.shutdown - @server_thread.join(2).should_not be_nil + it "should shut down when asked" do + # connect first to ensure it's running + client = setup_client + client.greeting(false) # force a message pass + @server.shutdown + @server_thread.join(2).should be_an_instance_of(Thread) + end + + it "should continue processing active messages when shutting down" do + result = Queue.new + client = setup_client_thread(result) + client << :sleep + sleep 0.1 # give the server time to start processing the client's message + @server.shutdown + @server_thread.join(2).should be_an_instance_of(Thread) + result.pop.should == :slept + end + + it "should kill active messages when they don't expire while shutting down" do + result = Queue.new + client = setup_client_thread(result) + client << :block + sleep 0.1 # start processing the client's message + @server.shutdown(1) + @catch_exceptions = true + @server_thread.join(3).should_not be_nil + end + + it "should allow shutting down in response to a message" do + client = setup_client + client.greeting(true).should == Hello.new + client.shutdown + @server_thread.join(2).should_not be_nil + end end end