From: Kevin Clark Date: Wed, 18 Jun 2008 01:19:09 +0000 (+0000) Subject: rb: split up benchmark into separate server/client files and distinct interpreters X-Git-Tag: 0.2.0~526 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=d3cee029c9f7075e85d8e100d01bc66b6829b727;p=common%2Fthrift.git rb: split up benchmark into separate server/client files and distinct interpreters git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669025 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/rb/Rakefile b/lib/rb/Rakefile index 04aaff73..ba0fe8ca 100644 --- a/lib/rb/Rakefile +++ b/lib/rb/Rakefile @@ -37,5 +37,5 @@ end desc 'Run benchmarking of NonblockingServer' task :benchmark do - ruby 'benchmark/fairness.rb' + ruby 'benchmark/benchmark.rb' end diff --git a/lib/rb/benchmark/fairness.rb b/lib/rb/benchmark/benchmark.rb similarity index 64% rename from lib/rb/benchmark/fairness.rb rename to lib/rb/benchmark/benchmark.rb index 11a9f049..5e6dd40e 100644 --- a/lib/rb/benchmark/fairness.rb +++ b/lib/rb/benchmark/benchmark.rb @@ -3,68 +3,47 @@ $:.unshift File.dirname(__FILE__) + '/../lib' require 'thrift' require 'thrift/server/nonblockingserver' require 'thrift/transport/unixsocket' -$:.unshift File.dirname(__FILE__) + "/gen-rb" -require 'BenchmarkService' -require 'thread' require 'stringio' + HOST = 'localhost' PORT = 42587 -Thread.abort_on_exception = true - ############### ## Server ############### -module Server - include Thrift +class Server + attr_accessor :serverclass + attr_accessor :interpreter + attr_accessor :host + attr_accessor :port - class BenchmarkHandler - # 1-based index into the fibonacci sequence - def fibonacci(n) - seq = [1, 1] - 3.upto(n) do - seq << seq[-1] + seq[-2] - end - seq[n-1] # n is 1-based - end + def initialize(opts) + @serverclass = opts.fetch(:class, Thrift::NonblockingServer) + @interpreter = opts.fetch(:interpreter, "ruby") + @host = opts.fetch(:host, ::HOST) + @port = opts.fetch(:port, ::PORT) end - def self.start_server(serverClass, trans = nil) - return if serverClass == Object - handler = BenchmarkHandler.new - processor = ThriftBenchmark::BenchmarkService::Processor.new(handler) - transport = trans || ServerSocket.new(HOST, PORT) - transportFactory = FramedTransportFactory.new - args = [processor, transport, transportFactory, nil, 20] - if serverClass == NonblockingServer - logger = Logger.new(STDERR) - logger.level = Logger::WARN - args << logger - end - server = serverClass.new(*args) - @server_thread = Thread.new do - server.serve - end - @server = server + def start + return if @class == Object + @pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/server.rb #{@host} #{@port} #{@serverclass.name}", "r+") end - def self.shutdown - return if @server.nil? - if @server.respond_to? :shutdown - @server.shutdown - else - @server_thread.kill + def shutdown + return unless @pipe + Marshal.dump(:shutdown, @pipe) + begin + @pipe.read(10) # block until the server shuts down + rescue EOFError end - end - - def self.class - @server and @server.class + @pipe.close + @pipe = nil end end class BenchmarkManager - def initialize(opts) + def initialize(opts, server) @socket = opts.fetch(:socket) do @host = opts.fetch(:host, 'localhost') @port = opts.fetch(:port) @@ -73,6 +52,8 @@ class BenchmarkManager @num_processes = opts.fetch(:num_processes, 40) @clients_per_process = opts.fetch(:clients_per_process, 10) @calls_per_client = opts.fetch(:calls_per_client, 50) + @interpreter = opts.fetch(:interpreter, "ruby") + @server = server end def run @@ -91,38 +72,8 @@ class BenchmarkManager end def spawn - rd, wr = IO.pipe - pid = fork do - STDIN.close - rd.close - @clients_per_process.times do - if @socket - socket = Thrift::UNIXSocket.new(@socket) - else - socket = Thrift::Socket.new(@host, @port) - end - transport = Thrift::FramedTransport.new(socket) - protocol = Thrift::BinaryProtocol.new(transport) - client = ThriftBenchmark::BenchmarkService::Client.new(protocol) - begin - transport.open - rescue - Marshal.dump [:connection_failure, Time.now], wr - else - Marshal.dump [:start, Time.now], wr - @calls_per_client.times do - Marshal.dump [:call_start, Time.now], wr - client.fibonacci(15) - Marshal.dump [:call_end, Time.now], wr - end - transport.close - Marshal.dump [:end, Time.now], wr - end - end - end - wr.close - @pool << rd - pid + pipe = IO.popen("#{@interpreter} #{File.dirname(__FILE__)}/client.rb #{@host} #{@port} #{@clients_per_process} #{@calls_per_client}") + @pool << pipe end def socket_class @@ -212,7 +163,9 @@ class BenchmarkManager fmt = "%.4f seconds" puts tabulate "%d", - [["Server class", "%s"], Server.class], + [["Server class", "%s"], @server.serverclass], + [["Server interpreter", "%s"], @server.interpreter], + [["Client interpreter", "%s"], @interpreter], [["Socket class", "%s"], socket_class], ["Number of processes", @num_processes], ["Clients per process", @clients_per_process], @@ -245,21 +198,16 @@ def resolve_const(const) end puts "Starting server..." -serverklass = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer -servertrans = nil -if ENV['THRIFT_SOCKET'] - servertrans = Thrift::UNIXServerSocket.new(ENV['THRIFT_SOCKET']) -end -Server.start_server(serverklass, servertrans) +args = {} +args[:interpreter] = ENV['THRIFT_SERVER_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby" +args[:class] = resolve_const(ENV['THRIFT_SERVER']) || Thrift::NonblockingServer +server = Server.new(args) +server.start sleep 0.2 # give the server time to start -args = { :num_processes => 40, :clients_per_process => 5 } -if ENV['THRIFT_SOCKET'] - args[:socket] = ENV['THRIFT_SOCKET'] -else - args.merge!(:host => HOST, :port => PORT) -end -BenchmarkManager.new(args).run +args = { :num_processes => 40, :clients_per_process => 5, :host => HOST, :port => PORT } +args[:interpreter] = ENV['THRIFT_CLIENT_INTERPRETER'] || ENV['THRIFT_INTERPRETER'] || "ruby" +BenchmarkManager.new(args, server).run -Server.shutdown +server.shutdown diff --git a/lib/rb/benchmark/client.rb b/lib/rb/benchmark/client.rb new file mode 100644 index 00000000..e8d33515 --- /dev/null +++ b/lib/rb/benchmark/client.rb @@ -0,0 +1,41 @@ +$:.unshift File.dirname(__FILE__) + '/../lib' +require 'thrift' +require 'thrift/server/nonblockingserver' +$:.unshift File.dirname(__FILE__) + "/gen-rb" +require 'BenchmarkService' + +class Client + def initialize(host, port, clients_per_process, calls_per_client) + @host = host + @port = port + @clients_per_process = clients_per_process + @calls_per_client = calls_per_client + end + + def run + @clients_per_process.times do + socket = Thrift::Socket.new(@host, @port) + transport = Thrift::FramedTransport.new(socket) + protocol = Thrift::BinaryProtocol.new(transport) + client = ThriftBenchmark::BenchmarkService::Client.new(protocol) + begin + transport.open + rescue + Marshal.dump [:connection_failure, Time.now], STDOUT + else + Marshal.dump [:start, Time.now], STDOUT + @calls_per_client.times do + Marshal.dump [:call_start, Time.now], STDOUT + client.fibonacci(15) + Marshal.dump [:call_end, Time.now], STDOUT + end + transport.close + Marshal.dump [:end, Time.now], STDOUT + end + end + end +end + +host, port, clients_per_process, calls_per_client = ARGV + +Client.new(host, port.to_i, clients_per_process.to_i, calls_per_client.to_i).run diff --git a/lib/rb/benchmark/server.rb b/lib/rb/benchmark/server.rb new file mode 100644 index 00000000..400c90d1 --- /dev/null +++ b/lib/rb/benchmark/server.rb @@ -0,0 +1,59 @@ +$:.unshift File.dirname(__FILE__) + '/../lib' +require 'thrift' +require 'thrift/server/nonblockingserver' +$:.unshift File.dirname(__FILE__) + "/gen-rb" +require 'BenchmarkService' + +module Server + include Thrift + + class BenchmarkHandler + # 1-based index into the fibonacci sequence + def fibonacci(n) + seq = [1, 1] + 3.upto(n) do + seq << seq[-1] + seq[-2] + end + seq[n-1] # n is 1-based + end + end + + def self.start_server(host, port, serverClass) + handler = BenchmarkHandler.new + processor = ThriftBenchmark::BenchmarkService::Processor.new(handler) + transport = ServerSocket.new(host, port) + transportFactory = FramedTransportFactory.new + args = [processor, transport, transportFactory, nil, 20] + if serverClass == NonblockingServer + logger = Logger.new(STDERR) + logger.level = Logger::WARN + args << logger + end + server = serverClass.new(*args) + @server_thread = Thread.new do + server.serve + end + @server = server + end + + def self.shutdown + return if @server.nil? + if @server.respond_to? :shutdown + @server.shutdown + else + @server_thread.kill + end + end +end + +def resolve_const(const) + const and const.split('::').inject(Object) { |k,c| k.const_get(c) } +end + +host, port, serverklass = ARGV + +Server.start_server(host, port.to_i, resolve_const(serverklass)) + +Marshal.load(STDIN) + +Server.shutdown