From 8cf694d62d314de3501872656a662a825ff6c98d Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 11 Jun 2008 00:57:54 +0000 Subject: [PATCH] Change alterl thrift_server to use non-blocking TCP calls and properly set the processor as the controlling process for the client sockets. Summary: - Removes the non-OTP "acceptor" process - The processor becomes the socket's controlling process instead of the transport, which is kind of messy, but it means we don't have to make a process for the socket_transport. - See http://www.trapexit.org/Building_a_Non-blocking_TCP_server_using_OTP_principles for non-blocking server info Test plan: - Ran ThriftTest and StressTest git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666417 13f79535-47bb-0310-9956-ffa450edef68 --- lib/alterl/src/thrift_server.erl | 80 +++++++++++++++++++++++--------- test/erl/Makefile | 2 +- test/erl/src/stress_server.erl | 21 +-------- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl index 54c70856..8a1704f6 100644 --- a/lib/alterl/src/thrift_server.erl +++ b/lib/alterl/src/thrift_server.erl @@ -10,7 +10,7 @@ -behaviour(gen_server). %% API --export([start_link/3, stop/1]). +-export([start_link/3, stop/1, take_socket/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -18,7 +18,7 @@ -define(SERVER, ?MODULE). --record(state, {listen_socket, acceptor, service}). +-record(state, {listen_socket, acceptor_ref, service, handler}). %%==================================================================== %% API @@ -39,6 +39,10 @@ stop(Pid) when is_pid(Pid) -> gen_server:call(Pid, stop). +take_socket(Server, Socket) -> + gen_server:call(Server, {take_socket, Socket}). + + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -57,10 +61,11 @@ init({Port, Service, Handler}) -> {active, false}, {nodelay, true}, {reuseaddr, true}]), - Acceptor = spawn_link(fun () -> acceptor(Socket, Service, Handler) end), + {ok, Ref} = prim_inet:async_accept(Socket, -1), {ok, #state{listen_socket = Socket, - acceptor = Acceptor, - service = Service}}. + acceptor_ref = Ref, + service = Service, + handler = Handler}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -72,8 +77,11 @@ init({Port, Service, Handler}) -> %% Description: Handling call messages %%-------------------------------------------------------------------- handle_call(stop, _From, State) -> - State#state.acceptor ! stop, - {stop, stopped, ok, State}. + {stop, stopped, ok, State}; + +handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> + Result = gen_tcp:controlling_process(Socket, FromPid), + {reply, Result, State}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | @@ -90,6 +98,27 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- +handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, + State = #state{listen_socket = ListenSocket, + acceptor_ref = Ref, + service = Service, + handler = Handler}) -> + case set_sockopt(ListenSocket, ClientSocket) of + ok -> + %% New client connected - start processor + start_processor(ClientSocket, Service, Handler), + {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), + {noreply, State#state{acceptor_ref = NewRef}}; + {error, Reason} -> + error_logger:error_msg("Couldn't set socket opts: ~p~n", + [Reason]), + {stop, Reason, State} + end; + +handle_info({inet_async, ListenSocket, Ref, Error}, State) -> + error_logger:error_msg("Error in acceptor: ~p~n", [Error]), + {stop, Error, State}; + handle_info(_Info, State) -> {noreply, State}. @@ -108,31 +137,38 @@ terminate(_Reason, _State) -> %% Description: Convert process state when code is changed %%-------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> - State#state.acceptor ! refresh, {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- +set_sockopt(ListenSocket, ClientSocket) -> + true = inet_db:register_socket(ClientSocket, inet_tcp), + case prim_inet:getopts(ListenSocket, + [active, nodelay, keepalive, delay_send, priority, tos]) of + {ok, Opts} -> + case prim_inet:setopts(ClientSocket, Opts) of + ok -> ok; + Error -> gen_tcp:close(ClientSocket), + Error + end; + Error -> + gen_tcp:close(ClientSocket), + Error + end. -acceptor(ListenSocket, Service, Handler) - when is_port(ListenSocket), is_atom(Handler) -> - {ok, Socket} = gen_tcp:accept(ListenSocket), -% error_logger:info_msg("Accepted client"), + + +start_processor(Socket, Service, Handler) -> + Server = self(), ProtoGen = fun() -> + % Become the controlling process + ok = take_socket(Server, Socket), {ok, SocketTransport} = thrift_socket_transport:new(Socket), {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), {ok, Protocol, Protocol} end, - - thrift_processor:start(ProtoGen, Service, Handler), - receive - refresh -> - error_logger:info_msg("Acceptor refreshing~n"), - ?MODULE:acceptor(ListenSocket, Service, Handler); - stop -> - ok - after 0 -> acceptor(ListenSocket, Service, Handler) - end. + + thrift_processor:start(ProtoGen, Service, Handler). diff --git a/test/erl/Makefile b/test/erl/Makefile index 017cdc1a..42572d22 100644 --- a/test/erl/Makefile +++ b/test/erl/Makefile @@ -7,7 +7,7 @@ INCLUDEDIR=include TARGETDIR=ebin SRCDIR=src -ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include +ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/alterl/include INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR}) MODULES = stress_server test_server diff --git a/test/erl/src/stress_server.erl b/test/erl/src/stress_server.erl index 915b027e..d82f9405 100644 --- a/test/erl/src/stress_server.erl +++ b/test/erl/src/stress_server.erl @@ -1,8 +1,7 @@ -module(stress_server). --include("thrift.hrl"). --export([start_link/1, old_start_link/1, +-export([start_link/1, handle_function/2, @@ -19,24 +18,6 @@ start_link(Port) -> thrift_server:start_link(Port, service_thrift, ?MODULE). -% Start the server with the old style bindings -old_start_link(Port) -> - Handler = ?MODULE, - Processor = service_thrift, - - TF = tBufferedTransportFactory:new(), - PF = tBinaryProtocolFactory:new(), - - ServerTransport = tErlAcceptor, - ServerFlavor = tErlServer, - - Server = oop:start_new(ServerFlavor, [Port, Handler, Processor, ServerTransport, TF, PF]), - - case ?R0(Server, effectful_serve) of - ok -> Server; - Error -> Error - end. - handle_function(Function, Args) -> case apply(?MODULE, Function, tuple_to_list(Args)) of -- 2.17.1