rb: Completely rewrite Thrift::NonblockingServer

It now has a much better and cleaner architecture, a proper persistent thread pool,
a dedicated acceptor thread, and no concurrency issues


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669012 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/spec/nonblockingserver_spec.rb b/lib/rb/spec/nonblockingserver_spec.rb
index 773fc03..bf390bd 100644
--- a/lib/rb/spec/nonblockingserver_spec.rb
+++ b/lib/rb/spec/nonblockingserver_spec.rb
@@ -35,139 +35,189 @@
     end
 
     def shutdown
-      @server.shutdown
+      @server.shutdown(0, false)
     end
   end
 
-  before(:each) do
-    @port = 43251
-    handler = Handler.new
-    processor = NonblockingService::Processor.new(handler)
-    @transport = ServerSocket.new('localhost', @port)
-    transportFactory = FramedTransportFactory.new
-    @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
-    handler.server = @server
-    @server_thread = Thread.new do
-      begin
-        @server.serve
-      rescue => e
-        p e
-        puts e.backtrace * "\n"
-        raise e
-      end
+  class SpecTransport < Transport
+    def initialize(transport, queue)
+      @transport = transport
+      @queue = queue
+      @flushed = false
     end
-    Thread.pass
 
-    @clients = []
+    def open?
+      @transport.open?
+    end
+
+    def open
+      @transport.open
+    end
+
+    def close
+      @transport.close
+    end
+
+    def read(sz)
+      @transport.read(sz)
+    end
+
+    def write(buf,sz=nil)
+      @transport.write(buf, sz)
+    end
+
+    def flush
+      @queue.push :flushed unless @flushed or @queue.nil?
+      @flushed = true
+      @transport.flush
+    end
   end
 
-  after(:each) do
-    @clients.each { |client, trans| trans.close }
-    @server_thread.kill
-    @transport.close
-  end
-
-  def setup_client
-    transport = FramedTransport.new(Socket.new('localhost', @port))
-    protocol = BinaryProtocol.new(transport)
-    client = NonblockingService::Client.new(protocol)
-    transport.open
-    @clients << [client, transport]
-    client
-  end
-
-  def setup_client_thread(result)
-    queue = Queue.new
-    Thread.new do
-      client = setup_client
-      while (msg = queue.pop)
-        case msg
-        when :block
-          result << client.block
-        when :unblock
-          client.unblock
-        when :hello
-          result << client.greeting(true) # ignore result
-        when :sleep
-          client.sleep(0.5)
-          result << :slept
-        when :shutdown
-          client.shutdown
-        when :exit
-          result << :done
-          break
+  describe Thrift::NonblockingServer do
+    before(:each) do
+      @port = 43251
+      handler = Handler.new
+      processor = NonblockingService::Processor.new(handler)
+      @transport = ServerSocket.new('localhost', @port)
+      transportFactory = FramedTransportFactory.new
+      logger = Logger.new(STDERR)
+      logger.level = Logger::WARN
+      @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
+      handler.server = @server
+      @server_thread = Thread.new(Thread.current) do |master_thread|
+        begin
+          @server.serve
+        rescue => e
+          p e
+          puts e.backtrace * "\n"
+          master_thread.raise e
         end
       end
-      @clients.each { |c,t| t.close and break if c == client } #close the transport
+      Thread.pass
+
+      @clients = []
+      @catch_exceptions = false
     end
-    queue
-  end
 
-  it "should handle basic message passing" do
-    client = setup_client
-    client.greeting(true).should == Hello.new
-    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
-  end
-
-  it "should handle concurrent clients" do
-    queue = Queue.new
-    4.times { Thread.new { queue.push setup_client.block } }
-    setup_client.unblock
-    4.times { queue.pop.should be_true }
-  end
-
-  it "should handle messages from more than 5 long-lived connections" do
-    queues = []
-    result = Queue.new
-    7.times do |i|
-      queues << setup_client_thread(result)
-      Thread.pass if i == 4 # give the server time to accept connections
+    after(:each) do
+      @clients.each { |client, trans| trans.close }
+      # @server.shutdown(1)
+      @server_thread.kill
+      @transport.close
     end
-    client = setup_client
-    # block 4 connections
-    4.times { |i| queues[i] << :block }
-    queues[4] << :hello
-    queues[5] << :hello
-    queues[6] << :hello
-    3.times { result.pop.should == Hello.new }
-    client.greeting(true).should == Hello.new
-    queues[5] << :unblock
-    4.times { result.pop.should be_true }
-    queues[2] << :hello
-    result.pop.should == Hello.new
-    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
-    7.times { queues.shift << :exit }
-    client.greeting(true).should == Hello.new
-  end
 
-  it "should shut down when asked" do
-    @server.shutdown
-    @server_thread.join(2).should be_an_instance_of(Thread)
-  end
+    def setup_client(queue = nil)
+      transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
+      protocol = BinaryProtocol.new(transport)
+      client = NonblockingService::Client.new(protocol)
+      transport.open
+      @clients << [client, transport]
+      client
+    end
 
-  it "should continue processing active messages when shutting down" do
-    result = Queue.new
-    client = setup_client_thread(result)
-    client << :sleep
-    sleep 0.1 # give the server time to start processing the client's message
-    @server.shutdown
-    @server_thread.join(2).should be_an_instance_of(Thread)
-    result.pop.should == :slept
-  end
+    def setup_client_thread(result)
+      queue = Queue.new
+      Thread.new do
+        begin
+          client = setup_client
+          while (msg = queue.pop)
+            case msg
+            when :block
+              result << client.block
+            when :unblock
+              client.unblock
+            when :hello
+              result << client.greeting(true) # ignore result
+            when :sleep
+              client.sleep(0.5)
+              result << :slept
+            when :shutdown
+              client.shutdown
+            when :exit
+              result << :done
+              break
+            end
+          end
+          @clients.each { |c,t| t.close and break if c == client } #close the transport
+        rescue => e
+          raise e unless @catch_exceptions
+        end
+      end
+      queue
+    end
 
-  it "should kill active messages when they don't expire while shutting down" do
-    result = Queue.new
-    client = setup_client_thread(result)
-    client << :block
-    sleep 0.1 # start processing the client's message
-    @server.shutdown(1, true)
-    @server_thread.join(3).should_not be_nil
-  end
+    it "should handle basic message passing" do
+      client = setup_client
+      client.greeting(true).should == Hello.new
+      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+    end
 
-  it "should allow shutting down in response to a message" do
-    client = setup_client
-    client.greeting(true).should == Hello.new
-    client.shutdown
-    @server_thread.join(2).should_not be_nil
+    it "should handle concurrent clients" do
+      queue = Queue.new
+      trans_queue = Queue.new
+      4.times { Thread.new { queue.push setup_client(trans_queue).block } }
+      4.times { trans_queue.pop }
+      setup_client.unblock
+      4.times { queue.pop.should be_true }
+    end
+
+    it "should handle messages from more than 5 long-lived connections" do
+      queues = []
+      result = Queue.new
+      7.times do |i|
+        queues << setup_client_thread(result)
+        Thread.pass if i == 4 # give the server time to accept connections
+      end
+      client = setup_client
+      # block 4 connections
+      4.times { |i| queues[i] << :block }
+      queues[4] << :hello
+      queues[5] << :hello
+      queues[6] << :hello
+      3.times { result.pop.should == Hello.new }
+      client.greeting(true).should == Hello.new
+      queues[5] << :unblock
+      4.times { result.pop.should be_true }
+      queues[2] << :hello
+      result.pop.should == Hello.new
+      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+      7.times { queues.shift << :exit }
+      client.greeting(true).should == Hello.new
+    end
+
+    it "should shut down when asked" do
+      # connect first to ensure it's running
+      client = setup_client
+      client.greeting(false) # force a message pass
+      @server.shutdown
+      @server_thread.join(2).should be_an_instance_of(Thread)
+    end
+
+    it "should continue processing active messages when shutting down" do
+      result = Queue.new
+      client = setup_client_thread(result)
+      client << :sleep
+      sleep 0.1 # give the server time to start processing the client's message
+      @server.shutdown
+      @server_thread.join(2).should be_an_instance_of(Thread)
+      result.pop.should == :slept
+    end
+
+    it "should kill active messages when they don't expire while shutting down" do
+      result = Queue.new
+      client = setup_client_thread(result)
+      client << :block
+      sleep 0.1 # start processing the client's message
+      @server.shutdown(1)
+      @catch_exceptions = true
+      @server_thread.join(3).should_not be_nil
+    end
+
+    it "should allow shutting down in response to a message" do
+      client = setup_client
+      client.greeting(true).should == Hello.new
+      client.shutdown
+      @server_thread.join(2).should_not be_nil
+    end
   end
 end