From: Kevin Clark Date: Wed, 18 Jun 2008 01:18:35 +0000 (+0000) Subject: rb: Implement Thrift::UNIXSocket and Thrift::UNIXServerSocket X-Git-Tag: 0.2.0~532 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=2ddd8ed40fabf1fc8b7c6fdf370b791e045951c7;p=common%2Fthrift.git rb: Implement Thrift::UNIXSocket and Thrift::UNIXServerSocket In benchmarking it turns out these don't give any noticeable performance boost, but as I've already written them, somebody may want them for something. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669019 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/rb/benchmark/fairness.rb b/lib/rb/benchmark/fairness.rb index 02f1cec6..67543f33 100644 --- a/lib/rb/benchmark/fairness.rb +++ b/lib/rb/benchmark/fairness.rb @@ -2,6 +2,7 @@ require 'rubygems' $:.unshift File.dirname(__FILE__) + '/../lib' require 'thrift' require 'thrift/server/nonblockingserver' +require 'thrift/transport/unixsocket' $:.unshift File.dirname(__FILE__) + "/gen-rb" require 'BenchmarkService' require 'thread' @@ -29,10 +30,10 @@ module Server end end - def self.start_server(serverClass) + def self.start_server(serverClass, trans = nil) handler = BenchmarkHandler.new processor = ThriftBenchmark::BenchmarkService::Processor.new(handler) - transport = ServerSocket.new(HOST, PORT) + transport = trans || ServerSocket.new(HOST, PORT) transportFactory = FramedTransportFactory.new args = [processor, transport, transportFactory, nil, 20] if serverClass == NonblockingServer @@ -60,24 +61,13 @@ module Server end end -module Client - include Thrift - - def self.start_client(&block) - transport = FramedTransport.new(Socket.new(HOST, PORT)) - protocol = BinaryProtocol.new(transport) - client = ThriftBenchmark::BenchmarkService::Client.new(protocol) - # transport.open - Thread.new do - block.call(client, transport) - end - end -end - class BenchmarkManager def initialize(opts) - @host = opts.fetch(:host, 'localhost') - @port = opts.fetch(:port) + @socket = opts.fetch(:socket) do + @host = opts.fetch(:host, 'localhost') + @port = opts.fetch(:port) + nil + end @num_processes = opts.fetch(:num_processes, 40) @clients_per_process = opts.fetch(:clients_per_process, 10) @calls_per_client = opts.fetch(:calls_per_client, 50) @@ -104,7 +94,12 @@ class BenchmarkManager STDIN.close rd.close @clients_per_process.times do - transport = Thrift::FramedTransport.new(Thrift::Socket.new(@host, @port)) + if @socket + socket = Thrift::UNIXSocket.new(@socket) + else + socket = Thrift::Socket.new(@host, @port) + end + transport = Thrift::FramedTransport.new(socket) protocol = Thrift::BinaryProtocol.new(transport) client = ThriftBenchmark::BenchmarkService::Client.new(protocol) begin @@ -128,6 +123,14 @@ class BenchmarkManager pid end + def socket_class + if @socket + Thrift::UNIXSocket + else + Thrift::Socket + end + end + def collect_output puts "Collecting output..." # read from @pool until all sockets are closed @@ -208,6 +211,7 @@ class BenchmarkManager puts tabulate "%d", [["Server class", "%s"], Server.class], + [["Socket class", "%s"], socket_class], ["Number of processes", @num_processes], ["Clients per process", @clients_per_process], ["Calls per client", @calls_per_client], @@ -219,7 +223,7 @@ class BenchmarkManager ["Average time per client (%d calls)" % @calls_per_client, @report[:avg_clients]], ["Total time for all calls", @report[:total_calls]], ["Real time for benchmarking", @report[:total_benchmark_time]], - ["Longest call time", @report[:longest_call]], + ["Shortest call time", @report[:longest_call]], ["Longest client time (%d calls)" % @calls_per_client, @report[:longest_client]] end @@ -240,10 +244,20 @@ end puts "Starting server..." serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer -Server.start_server(serverklass) +servertrans = nil +if ENV['THRIFT_SOCKET'] + servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET']) +end +Server.start_server(serverklass, servertrans) sleep 0.2 # give the server time to start -BenchmarkManager.new(:host => HOST, :port => PORT, :num_processes => 40, :clients_per_process => 5).run +args = { :num_processes => 40, :clients_per_process => 5 } +if ENV['THRIFT_SOCKET'] + args[:socket] = ENV['THRIFT_SOCKET'] +else + args.merge!(:host => HOST, :port => PORT) +end +BenchmarkManager.new(args).run Server.shutdown diff --git a/lib/rb/lib/thrift/transport/socket.rb b/lib/rb/lib/thrift/transport/socket.rb index 7cf2f451..3d540e48 100644 --- a/lib/rb/lib/thrift/transport/socket.rb +++ b/lib/rb/lib/thrift/transport/socket.rb @@ -14,6 +14,7 @@ module Thrift def initialize(host='localhost', port=9090) @host = host @port = port + @desc = "#{host}:#{port}" @handle = nil end @@ -23,7 +24,7 @@ module Thrift begin @handle = TCPSocket.new(@host, @port) rescue StandardError - raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@host}:#{@port}") + raise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}") end end @@ -57,7 +58,7 @@ module Thrift raise TransportException.new(TransportException::NOT_OPEN, e.message) end if (data.nil? or data.length == 0) - raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@host}:#{@port}") + raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}") end data end diff --git a/lib/rb/lib/thrift/transport/unixsocket.rb b/lib/rb/lib/thrift/transport/unixsocket.rb new file mode 100644 index 00000000..af686cad --- /dev/null +++ b/lib/rb/lib/thrift/transport/unixsocket.rb @@ -0,0 +1,51 @@ +require 'thrift/transport' +require 'socket' + +module Thrift + class UNIXSocket < Socket + def initialize(path) + @path = path + @desc = @path # for read()'s error + @handle = nil + end + + def open + begin + @handle = ::UNIXSocket.new(@path) + rescue StandardError + raise TransportException.new(TransportException::NOT_OPEN, "Could not open UNIX socket at #{@path}") + end + end + end + + class UNIXServerSocket < ServerTransport + def initialize(path) + @path = path + @handle = nil + end + + attr_accessor :handle + + def listen + @handle = ::UNIXServer.new(@path) + end + + def accept + unless @handle.nil? + sock = @handle.accept + trans = UNIXSocket.new(nil) + trans.handle = sock + trans + end + end + + def close + if @handle + @handle.close unless @handle.closed? + @handle = nil + # UNIXServer doesn't delete the socket file, so we have to do it ourselves + File.delete(@path) + end + end + end +end diff --git a/lib/rb/spec/socket_spec.rb b/lib/rb/spec/socket_spec.rb index 4b6dae53..ed2f56ab 100644 --- a/lib/rb/spec/socket_spec.rb +++ b/lib/rb/spec/socket_spec.rb @@ -1,80 +1,32 @@ require File.dirname(__FILE__) + '/spec_helper' +require File.dirname(__FILE__) + "/socket_spec_shared" class ThriftSocketSpec < Spec::ExampleGroup include Thrift - before(:each) do - @socket = Socket.new - @handle = mock("Handle", :closed? => false) - @handle.stub!(:close) - end - describe Socket do - it "should open a TCPSocket" do - TCPSocket.should_receive(:new).with('localhost', 9090).and_return(@handle) - @socket.open.should == @handle + before(:each) do + @socket = Socket.new + @handle = mock("Handle", :closed? => false) + @handle.stub!(:close) + TCPSocket.stub!(:new).and_return(@handle) end - it "should accept host/port options" do - TCPSocket.should_receive(:new).with('my.domain', 1234) - Socket.new('my.domain', 1234).open - end + it_should_behave_like "a socket" it "should raise a TransportException when it cannot open a socket" do - TCPSocket.should_receive(:new).with('localhost', 9090).and_raise(StandardError) - lambda { @socket.open }.should raise_error(TransportException, "Could not connect to localhost:9090") { |e| e.type.should == TransportException::NOT_OPEN } - end - - it "should be open whenever it has a handle" do - @socket.should_not be_open - TCPSocket.should_receive(:new).and_return(@handle) - @socket.open - @socket.should be_open - @socket.handle = nil - @socket.should_not be_open - @socket.handle = @handle - @handle.should_receive(:close) - @socket.close - @socket.should_not be_open - end - - it "should write data to the handle" do - TCPSocket.should_receive(:new).and_return(@handle) - @socket.open - @handle.should_receive(:write).with("foobar") - @socket.write("foobar") - @handle.should_receive(:write).with("fail").and_raise(StandardError) - lambda { @socket.write("fail") }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN } - end - - it "should raise an error when it cannot read from the handle" do - TCPSocket.should_receive(:new).and_return(@handle) - @socket.open - @handle.should_receive(:read).with(17).and_raise(StandardError) - lambda { @socket.read(17) }.should raise_error(TransportException) { |e| e.type.should == TransportException::NOT_OPEN } - end - - it "should raise an error when it reads no data from the handle" do - TCPSocket.should_receive(:new).and_return(@handle) - @socket.open - @handle.should_receive(:read).with(17).and_return("") - lambda { @socket.read(17) }.should raise_error(TransportException, "Socket: Could not read 17 bytes from localhost:9090") + TCPSocket.should_receive(:new).and_raise(StandardError) + lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN } end - it "should return the data read when reading from the handle works" do - TCPSocket.should_receive(:new).and_return(@handle) + it "should open a TCPSocket with default args" do + TCPSocket.should_receive(:new).with('localhost', 9090) @socket.open - @handle.should_receive(:read).with(17).and_return("test data") - @socket.read(17).should == "test data" end - it "should declare itself as closed when it has an error" do - TCPSocket.should_receive(:new).and_return(@handle) - @socket.open - @handle.should_receive(:write).with("fail").and_raise(StandardError) - @socket.should be_open - lambda { @socket.write("fail") }.should raise_error - @socket.should_not be_open + it "should accept host/port options" do + TCPSocket.should_receive(:new).with('my.domain', 1234) + Socket.new('my.domain', 1234).open end end @@ -84,8 +36,8 @@ class ThriftSocketSpec < Spec::ExampleGroup end it "should create a handle when calling listen" do + TCPServer.should_receive(:new).with(nil, 1234) @socket.listen - @socket.handle.should be_an_instance_of(TCPServer) end it "should accept an optional host argument" do diff --git a/lib/rb/spec/socket_spec_shared.rb b/lib/rb/spec/socket_spec_shared.rb new file mode 100644 index 00000000..448a5169 --- /dev/null +++ b/lib/rb/spec/socket_spec_shared.rb @@ -0,0 +1,52 @@ +require File.dirname(__FILE__) + '/spec_helper' + +shared_examples_for "a socket" do + it "should open a socket" do + @socket.open.should == @handle + end + + it "should be open whenever it has a handle" do + @socket.should_not be_open + @socket.open + @socket.should be_open + @socket.handle = nil + @socket.should_not be_open + @socket.handle = @handle + @socket.close + @socket.should_not be_open + end + + it "should write data to the handle" do + @socket.open + @handle.should_receive(:write).with("foobar") + @socket.write("foobar") + @handle.should_receive(:write).with("fail").and_raise(StandardError) + lambda { @socket.write("fail") }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN } + end + + it "should raise an error when it cannot read from the handle" do + @socket.open + @handle.should_receive(:read).with(17).and_raise(StandardError) + lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN } + end + + it "should raise an error when it reads no data from the handle" do + @socket.open + @handle.should_receive(:read).with(17).and_return("") + lambda { @socket.read(17) }.should raise_error(Thrift::TransportException, "Socket: Could not read 17 bytes from #{@socket.instance_variable_get("@desc")}") + end + + it "should return the data read when reading from the handle works" do + @socket.open + @handle.should_receive(:read).with(17).and_return("test data") + @socket.read(17).should == "test data" + end + + it "should declare itself as closed when it has an error" do + @socket.open + @handle.should_receive(:write).with("fail").and_raise(StandardError) + @socket.should be_open + lambda { @socket.write("fail") }.should raise_error + @socket.should_not be_open + end +end diff --git a/lib/rb/spec/unixsocket_spec.rb b/lib/rb/spec/unixsocket_spec.rb new file mode 100644 index 00000000..777ef042 --- /dev/null +++ b/lib/rb/spec/unixsocket_spec.rb @@ -0,0 +1,60 @@ +require File.dirname(__FILE__) + '/spec_helper' +require 'thrift/transport/unixsocket' +require File.dirname(__FILE__) + "/socket_spec_shared" + +class ThriftUNIXSocketSpec < Spec::ExampleGroup + include Thrift + + describe UNIXSocket do + before(:each) do + @path = '/tmp/thrift_spec_socket' + @socket = UNIXSocket.new(@path) + @handle = mock("Handle", :closed? => false) + @handle.stub!(:close) + ::UNIXSocket.stub!(:new).and_return(@handle) + end + + it_should_behave_like "a socket" + + it "should raise a TransportException when it cannot open a socket" do + ::UNIXSocket.should_receive(:new).and_raise(StandardError) + lambda { @socket.open }.should raise_error(Thrift::TransportException) { |e| e.type.should == Thrift::TransportException::NOT_OPEN } + end + end + + describe UNIXServerSocket do + before(:each) do + @path = '/tmp/thrift_spec_socket' + @socket = UNIXServerSocket.new(@path) + end + + it "should create a handle when calling listen" do + UNIXServer.should_receive(:new).with(@path) + @socket.listen + end + + it "should create a Thrift::UNIXSocket to wrap accepted sockets" do + handle = mock("UNIXServer") + UNIXServer.should_receive(:new).with(@path).and_return(handle) + @socket.listen + sock = mock("sock") + handle.should_receive(:accept).and_return(sock) + trans = mock("UNIXSocket") + UNIXSocket.should_receive(:new).and_return(trans) + trans.should_receive(:handle=).with(sock) + @socket.accept.should == trans + end + + it "should close the handle when closed" do + handle = mock("UNIXServer", :closed? => false) + UNIXServer.should_receive(:new).with(@path).and_return(handle) + @socket.listen + handle.should_receive(:close) + @socket.close + end + + it "should return nil when accepting if there is no handle" do + @socket.accept.should be_nil + end + end +end