From: Kevin Clark Date: Wed, 18 Jun 2008 01:16:02 +0000 (+0000) Subject: Implement NonblockingServer and add specs X-Git-Tag: 0.2.0~552 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=e0fdddea44ac97988c7aaf775961a02619eacd22;p=common%2Fthrift.git Implement NonblockingServer and add specs git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@668999 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/rb/lib/thrift/server/nonblockingserver.rb b/lib/rb/lib/thrift/server/nonblockingserver.rb new file mode 100644 index 00000000..54dad04d --- /dev/null +++ b/lib/rb/lib/thrift/server/nonblockingserver.rb @@ -0,0 +1,129 @@ +require 'thrift/server' + +# thrift/server already imports fastthread/thread + +module Thrift + # this class expects to always use a FramedTransport for reading messages + #-- + # this isn't very pretty, but we're working around the fact that FramedTransport + # and the processors are all written in a synchronous manner. + # So lets read data off the wire ourselves, check if we have a full frame, and + # only then hand it to the transport to parse + # + # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods + class NonblockingServer < ThreadPoolServer + def serve + @server_thread = Thread.current + @serverTransport.listen + + begin + connections = {} + running_connections = {} + # the swapping_connections stuff is to ensure the thread doesn't + # put the connection back into the regular list, then have the server + # thread process it again, then have the first thread remove it from + # the running_connections list + swapping_connections = {} + thread_group = ThreadGroup.new + loop do + break if @shutdown + rd, = select([@serverTransport.handle, *connections.keys]) + next if rd.nil? + rd.each do |socket| + if socket == @serverTransport.handle + client = @serverTransport.accept + buffer = '' + outtrans = @transportFactory.get_transport(client) + outprot = @protocolFactory.get_protocol(outtrans) + connections[client.handle] = [client, buffer, outtrans, outprot] + else + client, buffer, outtrans, outprot = connections[socket] + if socket.eof? + client.close + connections.delete(socket) + else + buffer << client.read(4096, true) + if has_full_frame?(buffer) + running_connections[socket] = connections.delete(socket) + @thread_q.push :token + t = Thread.new(Thread.current) do |master| + begin + membuf = MemoryBuffer.new(buffer) + intrans = @transportFactory.get_transport(membuf) + inprot = @protocolFactory.get_protocol(intrans) + @processor.process(inprot, outprot) + if @shutdown + client.close + running_connections.delete(socket) + else + swapping_connections[socket] = running_connections.delete(socket) + master.wakeup + end + rescue => e + outtrans.close + @exception_q.push e + ensure + running_connections.delete(socket) + connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket + intrans.close + @thread_q.pop + end + end + thread_group.add t + end + end + end + end + end + if @shutdown + @serverTransport.close + connections.merge! running_connections + connections.merge! swapping_connections + connections.values.each do |client, buffer, outtrans, outprot| + # can't close completely or we'll break active messages + # but lets at least stop accepting input + client.handle.close_read + end + start = Time.now.to_f + until thread_group.list.empty? + if @shutdown_timeout + now = Time.now.to_f + cur_timeout = @shutdown_timeout - (now - start) + break if cur_timeout <= 0 + thread_group.list.first.join(cur_timeout) + else + thread_group.list.first.join + end + end + thread_group.list.each { |t| t.kill } if @shutdown_kill + # now kill connections completely if they still exists + connections.values.each do |client, buffer, outtrans, outprot| + client.close + end + end + ensure + @serverTransport.close + end + end + + # Stop accepting new messages and wait for active messages to finish + # If the given timeout passes without the active messages finishing, + # control will exit from #serve and leave the remaining threads active. + # If you pass true for kill, the remaining threads will be reaped instead. + # A false timeout means wait indefinitely + def shutdown(timeout = nil, kill = false) + @shutdown_timeout = timeout + @shutdown_kill = kill + @shutdown = true + @server_thread.wakeup + end + + private + + def has_full_frame?(buf) + return no unless buf.length >= 4 + size = buf.unpack('N').first + size + 4 <= buf.length + end + end +end diff --git a/lib/rb/spec/ThriftSpec.thrift b/lib/rb/spec/ThriftSpec.thrift index 101caffe..1012e513 100644 --- a/lib/rb/spec/ThriftSpec.thrift +++ b/lib/rb/spec/ThriftSpec.thrift @@ -16,3 +16,11 @@ struct Foo { struct BoolStruct { 1: bool yesno = 1 } + +service NonblockingService { + Hello greeting(1:bool english) + bool block() + async void unblock() + async void shutdown() + void sleep(1:double seconds) +} diff --git a/lib/rb/spec/gen-rb/NonblockingService.rb b/lib/rb/spec/gen-rb/NonblockingService.rb new file mode 100644 index 00000000..46e7c60f --- /dev/null +++ b/lib/rb/spec/gen-rb/NonblockingService.rb @@ -0,0 +1,192 @@ +# +# Autogenerated by Thrift +# +# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING +# + +require 'thrift/protocol' +require 'thrift' +require 'ThriftSpec_types' + + module SpecNamespace + module NonblockingService + class Client + include Thrift::Client + + def greeting(english) + send_greeting(english) + return recv_greeting() + end + + def send_greeting(english) + send_message('greeting', Greeting_args, :english => english) + end + + def recv_greeting() + result = receive_message(Greeting_result) + return result.success unless result.success.nil? + raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'greeting failed: unknown result') + end + + def block() + send_block() + return recv_block() + end + + def send_block() + send_message('block', Block_args) + end + + def recv_block() + result = receive_message(Block_result) + return result.success unless result.success.nil? + raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'block failed: unknown result') + end + + def unblock() + send_unblock() + end + + def send_unblock() + send_message('unblock', Unblock_args) + end + def shutdown() + send_shutdown() + end + + def send_shutdown() + send_message('shutdown', Shutdown_args) + end + def sleep(seconds) + send_sleep(seconds) + recv_sleep() + end + + def send_sleep(seconds) + send_message('sleep', Sleep_args, :seconds => seconds) + end + + def recv_sleep() + result = receive_message(Sleep_result) + return + end + + end + + class Processor + include Thrift::Processor + + def process_greeting(seqid, iprot, oprot) + args = read_args(iprot, Greeting_args) + result = Greeting_result.new() + result.success = @handler.greeting(args.english) + write_result(result, oprot, 'greeting', seqid) + end + + def process_block(seqid, iprot, oprot) + args = read_args(iprot, Block_args) + result = Block_result.new() + result.success = @handler.block() + write_result(result, oprot, 'block', seqid) + end + + def process_unblock(seqid, iprot, oprot) + args = read_args(iprot, Unblock_args) + @handler.unblock() + return + end + + def process_shutdown(seqid, iprot, oprot) + args = read_args(iprot, Shutdown_args) + @handler.shutdown() + return + end + + def process_sleep(seqid, iprot, oprot) + args = read_args(iprot, Sleep_args) + result = Sleep_result.new() + @handler.sleep(args.seconds) + write_result(result, oprot, 'sleep', seqid) + end + + end + + # HELPER FUNCTIONS AND STRUCTURES + + class Greeting_args + include Thrift::Struct + attr_accessor :english + FIELDS = { + 1 => {:type => Thrift::Types::BOOL, :name => 'english'} + } + end + + class Greeting_result + include Thrift::Struct + attr_accessor :success + FIELDS = { + 0 => {:type => Thrift::Types::STRUCT, :name => 'success', :class => Hello} + } + end + + class Block_args + include Thrift::Struct + FIELDS = { + + } + end + + class Block_result + include Thrift::Struct + attr_accessor :success + FIELDS = { + 0 => {:type => Thrift::Types::BOOL, :name => 'success'} + } + end + + class Unblock_args + include Thrift::Struct + FIELDS = { + + } + end + + class Unblock_result + include Thrift::Struct + FIELDS = { + + } + end + + class Shutdown_args + include Thrift::Struct + FIELDS = { + + } + end + + class Shutdown_result + include Thrift::Struct + FIELDS = { + + } + end + + class Sleep_args + include Thrift::Struct + attr_accessor :seconds + FIELDS = { + 1 => {:type => Thrift::Types::DOUBLE, :name => 'seconds'} + } + end + + class Sleep_result + include Thrift::Struct + FIELDS = { + + } + end + + end + + end diff --git a/lib/rb/spec/nonblockingserver_spec.rb b/lib/rb/spec/nonblockingserver_spec.rb new file mode 100644 index 00000000..58504139 --- /dev/null +++ b/lib/rb/spec/nonblockingserver_spec.rb @@ -0,0 +1,166 @@ +require File.dirname(__FILE__) + '/spec_helper' +require 'thrift/server/nonblockingserver' +$:.unshift File.dirname(__FILE__) + '/gen-rb' +require 'NonblockingService' + +class ThriftNonblockingServerSpec < Spec::ExampleGroup + include Thrift + include SpecNamespace + + class Handler + def initialize + @queue = Queue.new + end + + attr_accessor :server + + def greeting(english) + if english + SpecNamespace::Hello.new + else + SpecNamespace::Hello.new(:greeting => "Aloha!") + end + end + + def block + @queue.pop + end + + def unblock + @queue.num_waiting.times { @queue.push true } + end + + def sleep(time) + Kernel.sleep time + end + + def shutdown + @server.shutdown + end + end + + before(:each) do + @port = 43251 + handler = Handler.new + processor = NonblockingService::Processor.new(handler) + @transport = ServerSocket.new('localhost', @port) + transportFactory = FramedTransportFactory.new + @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5) + handler.server = @server + @server_thread = Thread.new do + begin + @server.serve + rescue => e + p e + puts e.backtrace * "\n" + raise e + end + end + Thread.pass + + @clients = [] + end + + after(:each) do + @clients.each { |client, trans| trans.close } + @server_thread.kill + @transport.close + end + + def setup_client + transport = FramedTransport.new(Socket.new('localhost', @port)) + protocol = BinaryProtocol.new(transport) + client = NonblockingService::Client.new(protocol) + transport.open + @clients << [client, transport] + client + end + + def setup_client_thread(result) + queue = Queue.new + Thread.new do + client = setup_client + while (msg = queue.pop) + case msg + when :block + result << client.block + when :unblock + client.unblock + when :hello + result << client.greeting(true) # ignore result + when :sleep + client.sleep(0.5) + result << :slept + when :shutdown + client.shutdown + when :exit + result << :done + break + end + end + @clients.each { |c,t| t.close and break if c == client } #close the transport + end + queue + end + + it "should handle basic message passing" do + client = setup_client + client.greeting(true).should == Hello.new + client.greeting(false).should == Hello.new(:greeting => 'Aloha!') + end + + it "should handle concurrent clients" do + queue = Queue.new + 4.times { Thread.new { queue.push setup_client.block } } + setup_client.unblock + 4.times { queue.pop.should be_true } + end + + it "should handle messages from more than 5 long-lived connections" do + queues = [] + result = Queue.new + 7.times do |i| + queues << setup_client_thread(result) + Thread.pass if i == 4 # give the server time to accept connections + end + client = setup_client + # block 4 connections + 4.times { |i| queues[i] << :block } + queues[4] << :hello + queues[5] << :hello + queues[6] << :hello + 3.times { result.pop.should == Hello.new } + client.greeting(true).should == Hello.new + queues[5] << :unblock + 4.times { result.pop.should be_true } + queues[2] << :hello + result.pop.should == Hello.new + client.greeting(false).should == Hello.new(:greeting => 'Aloha!') + 7.times { queues.shift << :exit } + client.greeting(true).should == Hello.new + end + + it "should shut down when asked" do + @server.shutdown + @server_thread.join(2).should be_an_instance_of(Thread) + end + + it "should continue processing active messages when shutting down" do + result = Queue.new + client = setup_client_thread(result) + client << :sleep + sleep 0.1 # give the server time to start processing the client's message + @server.shutdown + @server_thread.join(2).should be_an_instance_of(Thread) + result.pop.should == :slept + end + + it "should kill active messages when they don't expire while shutting down" do + result = Queue.new + client = setup_client_thread(result) + client << :block + sleep 0.1 # start processing the client's message + @server.shutdown(1, true) + @server_thread.join(3).should_not be_nil + end +end