| # | 
 | # Licensed to the Apache Software Foundation (ASF) under one | 
 | # or more contributor license agreements. See the NOTICE file | 
 | # distributed with this work for additional information | 
 | # regarding copyright ownership. The ASF licenses this file | 
 | # to you under the Apache License, Version 2.0 (the | 
 | # "License"); you may not use this file except in compliance | 
 | # with the License. You may obtain a copy of the License at | 
 | # | 
 | #   http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, | 
 | # software distributed under the License is distributed on an | 
 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | # KIND, either express or implied. See the License for the | 
 | # specific language governing permissions and limitations | 
 | # under the License. | 
 | # | 
 |  | 
 | require 'spec_helper' | 
 |  | 
 | describe 'NonblockingServer' do | 
 |  | 
 |   class Handler | 
 |     def initialize | 
 |       @queue = Queue.new | 
 |     end | 
 |  | 
 |     attr_accessor :server | 
 |  | 
 |     def greeting(english) | 
 |       if english | 
 |         SpecNamespace::Hello.new | 
 |       else | 
 |         SpecNamespace::Hello.new(:greeting => "Aloha!") | 
 |       end | 
 |     end | 
 |  | 
 |     def block | 
 |       @queue.pop | 
 |     end | 
 |  | 
 |     def unblock(n) | 
 |       n.times { @queue.push true } | 
 |     end | 
 |  | 
 |     def sleep(time) | 
 |       Kernel.sleep time | 
 |     end | 
 |  | 
 |     def shutdown | 
 |       @server.shutdown(0, false) | 
 |     end | 
 |   end | 
 |  | 
 |   class SpecTransport < Thrift::BaseTransport | 
 |     def initialize(transport, queue) | 
 |       @transport = transport | 
 |       @queue = queue | 
 |       @flushed = false | 
 |     end | 
 |  | 
 |     def open? | 
 |       @transport.open? | 
 |     end | 
 |  | 
 |     def open | 
 |       @transport.open | 
 |     end | 
 |  | 
 |     def close | 
 |       @transport.close | 
 |     end | 
 |  | 
 |     def read(sz) | 
 |       @transport.read(sz) | 
 |     end | 
 |  | 
 |     def write(buf,sz=nil) | 
 |       @transport.write(buf, sz) | 
 |     end | 
 |  | 
 |     def flush | 
 |       @queue.push :flushed unless @flushed or @queue.nil? | 
 |       @flushed = true | 
 |       @transport.flush | 
 |     end | 
 |   end | 
 |  | 
 |   class SpecServerSocket < Thrift::ServerSocket | 
 |     def initialize(host, port, queue) | 
 |       super(host, port) | 
 |       @queue = queue | 
 |     end | 
 |  | 
 |     def listen | 
 |       super | 
 |       @queue.push :listen | 
 |     end | 
 |   end | 
 |  | 
 |   describe Thrift::NonblockingServer do | 
 |     before(:each) do | 
 |       @port = 43251 | 
 |       handler = Handler.new | 
 |       processor = SpecNamespace::NonblockingService::Processor.new(handler) | 
 |       queue = Queue.new | 
 |       @transport = SpecServerSocket.new('localhost', @port, queue) | 
 |       transport_factory = Thrift::FramedTransportFactory.new | 
 |       logger = Logger.new(STDERR) | 
 |       logger.level = Logger::WARN | 
 |       @server = Thrift::NonblockingServer.new(processor, @transport, transport_factory, nil, 5, logger) | 
 |       handler.server = @server | 
 |       @server_thread = Thread.new(Thread.current) do |master_thread| | 
 |         begin | 
 |           @server.serve | 
 |         rescue => e | 
 |           p e | 
 |           puts e.backtrace * "\n" | 
 |           master_thread.raise e | 
 |         end | 
 |       end | 
 |       queue.pop | 
 |  | 
 |       @clients = [] | 
 |       @catch_exceptions = false | 
 |     end | 
 |  | 
 |     after(:each) do | 
 |       @clients.each { |client, trans| trans.close } | 
 |       # @server.shutdown(1) | 
 |       @server_thread.kill | 
 |       @transport.close | 
 |     end | 
 |  | 
 |     def setup_client(queue = nil) | 
 |       transport = SpecTransport.new(Thrift::FramedTransport.new(Thrift::Socket.new('localhost', @port)), queue) | 
 |       protocol = Thrift::BinaryProtocol.new(transport) | 
 |       client = SpecNamespace::NonblockingService::Client.new(protocol) | 
 |       transport.open | 
 |       @clients << [client, transport] | 
 |       client | 
 |     end | 
 |  | 
 |     def setup_client_thread(result) | 
 |       queue = Queue.new | 
 |       Thread.new do | 
 |         begin | 
 |           client = setup_client | 
 |           while (cmd = queue.pop) | 
 |             msg, *args = cmd | 
 |             case msg | 
 |             when :block | 
 |               result << client.block | 
 |             when :unblock | 
 |               client.unblock(args.first) | 
 |             when :hello | 
 |               result << client.greeting(true) # ignore result | 
 |             when :sleep | 
 |               client.sleep(args[0] || 0.5) | 
 |               result << :slept | 
 |             when :shutdown | 
 |               client.shutdown | 
 |             when :exit | 
 |               result << :done | 
 |               break | 
 |             end | 
 |           end | 
 |           @clients.each { |c,t| t.close and break if c == client } #close the transport | 
 |         rescue => e | 
 |           raise e unless @catch_exceptions | 
 |         end | 
 |       end | 
 |       queue | 
 |     end | 
 |  | 
 |     it "should handle basic message passing" do | 
 |       client = setup_client | 
 |       client.greeting(true).should == SpecNamespace::Hello.new | 
 |       client.greeting(false).should == SpecNamespace::Hello.new(:greeting => 'Aloha!') | 
 |       @server.shutdown | 
 |     end | 
 |  | 
 |     it "should handle concurrent clients" do | 
 |       queue = Queue.new | 
 |       trans_queue = Queue.new | 
 |       4.times do | 
 |         Thread.new(Thread.current) do |main_thread| | 
 |           begin | 
 |             queue.push setup_client(trans_queue).block | 
 |           rescue => e | 
 |             main_thread.raise e | 
 |           end | 
 |         end | 
 |       end | 
 |       4.times { trans_queue.pop } | 
 |       setup_client.unblock(4) | 
 |       4.times { queue.pop.should be_true } | 
 |       @server.shutdown | 
 |     end | 
 |  | 
 |     it "should handle messages from more than 5 long-lived connections" do | 
 |       queues = [] | 
 |       result = Queue.new | 
 |       7.times do |i| | 
 |         queues << setup_client_thread(result) | 
 |         Thread.pass if i == 4 # give the server time to accept connections | 
 |       end | 
 |       client = setup_client | 
 |       # block 4 connections | 
 |       4.times { |i| queues[i] << :block } | 
 |       queues[4] << :hello | 
 |       queues[5] << :hello | 
 |       queues[6] << :hello | 
 |       3.times { result.pop.should == SpecNamespace::Hello.new } | 
 |       client.greeting(true).should == SpecNamespace::Hello.new | 
 |       queues[5] << [:unblock, 4] | 
 |       4.times { result.pop.should be_true } | 
 |       queues[2] << :hello | 
 |       result.pop.should == SpecNamespace::Hello.new | 
 |       client.greeting(false).should == SpecNamespace::Hello.new(:greeting => 'Aloha!') | 
 |       7.times { queues.shift << :exit } | 
 |       client.greeting(true).should == SpecNamespace::Hello.new | 
 |       @server.shutdown | 
 |     end | 
 |  | 
 |     it "should shut down when asked" do | 
 |       # connect first to ensure it's running | 
 |       client = setup_client | 
 |       client.greeting(false) # force a message pass | 
 |       @server.shutdown | 
 |       @server_thread.join(2).should be_an_instance_of(Thread) | 
 |     end | 
 |  | 
 |     it "should continue processing active messages when shutting down" do | 
 |       result = Queue.new | 
 |       client = setup_client_thread(result) | 
 |       client << :sleep | 
 |       sleep 0.1 # give the server time to start processing the client's message | 
 |       @server.shutdown | 
 |       @server_thread.join(2).should be_an_instance_of(Thread) | 
 |       result.pop.should == :slept | 
 |     end | 
 |  | 
 |     it "should kill active messages when they don't expire while shutting down" do | 
 |       result = Queue.new | 
 |       client = setup_client_thread(result) | 
 |       client << [:sleep, 10] | 
 |       sleep 0.1 # start processing the client's message | 
 |       @server.shutdown(1) | 
 |       @catch_exceptions = true | 
 |       @server_thread.join(3).should_not be_nil | 
 |       result.should be_empty | 
 |     end | 
 |  | 
 |     it "should allow shutting down in response to a message" do | 
 |       client = setup_client | 
 |       client.greeting(true).should == SpecNamespace::Hello.new | 
 |       client.shutdown | 
 |       @server_thread.join(2).should_not be_nil | 
 |     end | 
 |   end | 
 | end |