Implement NonblockingServer and add specs


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@668999 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/spec/ThriftSpec.thrift b/lib/rb/spec/ThriftSpec.thrift
index 101caff..1012e51 100644
--- a/lib/rb/spec/ThriftSpec.thrift
+++ b/lib/rb/spec/ThriftSpec.thrift
@@ -16,3 +16,11 @@
 struct BoolStruct {
   1: bool yesno = 1
 }
+
+service NonblockingService {
+  Hello greeting(1:bool english)
+  bool block()
+  async void unblock()
+  async void shutdown()
+  void sleep(1:double seconds)
+}
diff --git a/lib/rb/spec/gen-rb/NonblockingService.rb b/lib/rb/spec/gen-rb/NonblockingService.rb
new file mode 100644
index 0000000..46e7c60
--- /dev/null
+++ b/lib/rb/spec/gen-rb/NonblockingService.rb
@@ -0,0 +1,192 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+require 'thrift/protocol'
+require 'thrift'
+require 'ThriftSpec_types'
+
+    module SpecNamespace
+      module NonblockingService
+        class Client
+          include Thrift::Client
+
+          def greeting(english)
+            send_greeting(english)
+            return recv_greeting()
+          end
+
+          def send_greeting(english)
+            send_message('greeting', Greeting_args, :english => english)
+          end
+
+          def recv_greeting()
+            result = receive_message(Greeting_result)
+            return result.success unless result.success.nil?
+            raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'greeting failed: unknown result')
+          end
+
+          def block()
+            send_block()
+            return recv_block()
+          end
+
+          def send_block()
+            send_message('block', Block_args)
+          end
+
+          def recv_block()
+            result = receive_message(Block_result)
+            return result.success unless result.success.nil?
+            raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'block failed: unknown result')
+          end
+
+          def unblock()
+            send_unblock()
+          end
+
+          def send_unblock()
+            send_message('unblock', Unblock_args)
+          end
+          def shutdown()
+            send_shutdown()
+          end
+
+          def send_shutdown()
+            send_message('shutdown', Shutdown_args)
+          end
+          def sleep(seconds)
+            send_sleep(seconds)
+            recv_sleep()
+          end
+
+          def send_sleep(seconds)
+            send_message('sleep', Sleep_args, :seconds => seconds)
+          end
+
+          def recv_sleep()
+            result = receive_message(Sleep_result)
+            return
+          end
+
+        end
+
+        class Processor
+          include Thrift::Processor
+
+          def process_greeting(seqid, iprot, oprot)
+            args = read_args(iprot, Greeting_args)
+            result = Greeting_result.new()
+            result.success = @handler.greeting(args.english)
+            write_result(result, oprot, 'greeting', seqid)
+          end
+
+          def process_block(seqid, iprot, oprot)
+            args = read_args(iprot, Block_args)
+            result = Block_result.new()
+            result.success = @handler.block()
+            write_result(result, oprot, 'block', seqid)
+          end
+
+          def process_unblock(seqid, iprot, oprot)
+            args = read_args(iprot, Unblock_args)
+            @handler.unblock()
+            return
+          end
+
+          def process_shutdown(seqid, iprot, oprot)
+            args = read_args(iprot, Shutdown_args)
+            @handler.shutdown()
+            return
+          end
+
+          def process_sleep(seqid, iprot, oprot)
+            args = read_args(iprot, Sleep_args)
+            result = Sleep_result.new()
+            @handler.sleep(args.seconds)
+            write_result(result, oprot, 'sleep', seqid)
+          end
+
+        end
+
+        # HELPER FUNCTIONS AND STRUCTURES
+
+        class Greeting_args
+          include Thrift::Struct
+          attr_accessor :english
+          FIELDS = {
+            1 => {:type => Thrift::Types::BOOL, :name => 'english'}
+          }
+        end
+
+        class Greeting_result
+          include Thrift::Struct
+          attr_accessor :success
+          FIELDS = {
+            0 => {:type => Thrift::Types::STRUCT, :name => 'success', :class => Hello}
+          }
+        end
+
+        class Block_args
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Block_result
+          include Thrift::Struct
+          attr_accessor :success
+          FIELDS = {
+            0 => {:type => Thrift::Types::BOOL, :name => 'success'}
+          }
+        end
+
+        class Unblock_args
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Unblock_result
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Shutdown_args
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Shutdown_result
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Sleep_args
+          include Thrift::Struct
+          attr_accessor :seconds
+          FIELDS = {
+            1 => {:type => Thrift::Types::DOUBLE, :name => 'seconds'}
+          }
+        end
+
+        class Sleep_result
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+      end
+
+    end
diff --git a/lib/rb/spec/nonblockingserver_spec.rb b/lib/rb/spec/nonblockingserver_spec.rb
new file mode 100644
index 0000000..5850413
--- /dev/null
+++ b/lib/rb/spec/nonblockingserver_spec.rb
@@ -0,0 +1,166 @@
+require File.dirname(__FILE__) + '/spec_helper'
+require 'thrift/server/nonblockingserver'
+$:.unshift File.dirname(__FILE__) + '/gen-rb'
+require 'NonblockingService'
+
+class ThriftNonblockingServerSpec < Spec::ExampleGroup
+  include Thrift
+  include SpecNamespace
+
+  class Handler
+    def initialize
+      @queue = Queue.new
+    end
+
+    attr_accessor :server
+
+    def greeting(english)
+      if english
+        SpecNamespace::Hello.new
+      else
+        SpecNamespace::Hello.new(:greeting => "Aloha!")
+      end
+    end
+
+    def block
+      @queue.pop
+    end
+
+    def unblock
+      @queue.num_waiting.times { @queue.push true }
+    end
+
+    def sleep(time)
+      Kernel.sleep time
+    end
+
+    def shutdown
+      @server.shutdown
+    end
+  end
+
+  before(:each) do
+    @port = 43251
+    handler = Handler.new
+    processor = NonblockingService::Processor.new(handler)
+    @transport = ServerSocket.new('localhost', @port)
+    transportFactory = FramedTransportFactory.new
+    @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
+    handler.server = @server
+    @server_thread = Thread.new do
+      begin
+        @server.serve
+      rescue => e
+        p e
+        puts e.backtrace * "\n"
+        raise e
+      end
+    end
+    Thread.pass
+
+    @clients = []
+  end
+
+  after(:each) do
+    @clients.each { |client, trans| trans.close }
+    @server_thread.kill
+    @transport.close
+  end
+
+  def setup_client
+    transport = FramedTransport.new(Socket.new('localhost', @port))
+    protocol = BinaryProtocol.new(transport)
+    client = NonblockingService::Client.new(protocol)
+    transport.open
+    @clients << [client, transport]
+    client
+  end
+
+  def setup_client_thread(result)
+    queue = Queue.new
+    Thread.new do
+      client = setup_client
+      while (msg = queue.pop)
+        case msg
+        when :block
+          result << client.block
+        when :unblock
+          client.unblock
+        when :hello
+          result << client.greeting(true) # ignore result
+        when :sleep
+          client.sleep(0.5)
+          result << :slept
+        when :shutdown
+          client.shutdown
+        when :exit
+          result << :done
+          break
+        end
+      end
+      @clients.each { |c,t| t.close and break if c == client } #close the transport
+    end
+    queue
+  end
+
+  it "should handle basic message passing" do
+    client = setup_client
+    client.greeting(true).should == Hello.new
+    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+  end
+
+  it "should handle concurrent clients" do
+    queue = Queue.new
+    4.times { Thread.new { queue.push setup_client.block } }
+    setup_client.unblock
+    4.times { queue.pop.should be_true }
+  end
+
+  it "should handle messages from more than 5 long-lived connections" do
+    queues = []
+    result = Queue.new
+    7.times do |i|
+      queues << setup_client_thread(result)
+      Thread.pass if i == 4 # give the server time to accept connections
+    end
+    client = setup_client
+    # block 4 connections
+    4.times { |i| queues[i] << :block }
+    queues[4] << :hello
+    queues[5] << :hello
+    queues[6] << :hello
+    3.times { result.pop.should == Hello.new }
+    client.greeting(true).should == Hello.new
+    queues[5] << :unblock
+    4.times { result.pop.should be_true }
+    queues[2] << :hello
+    result.pop.should == Hello.new
+    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+    7.times { queues.shift << :exit }
+    client.greeting(true).should == Hello.new
+  end
+
+  it "should shut down when asked" do
+    @server.shutdown
+    @server_thread.join(2).should be_an_instance_of(Thread)
+  end
+
+  it "should continue processing active messages when shutting down" do
+    result = Queue.new
+    client = setup_client_thread(result)
+    client << :sleep
+    sleep 0.1 # give the server time to start processing the client's message
+    @server.shutdown
+    @server_thread.join(2).should be_an_instance_of(Thread)
+    result.pop.should == :slept
+  end
+
+  it "should kill active messages when they don't expire while shutting down" do
+    result = Queue.new
+    client = setup_client_thread(result)
+    client << :block
+    sleep 0.1 # start processing the client's message
+    @server.shutdown(1, true)
+    @server_thread.join(3).should_not be_nil
+  end
+end