From: David Reiss Date: Wed, 11 Jun 2008 01:12:52 +0000 (+0000) Subject: Add back thrift_server.erl which we still use X-Git-Tag: 0.2.0~693 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=0c8cb4aa9fc571846812e57239e251a025c06ec7;p=common%2Fthrift.git Add back thrift_server.erl which we still use Sorry facebook guys :P We'll switch to thrift_socket_server eventually git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666469 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl new file mode 100644 index 00000000..1cc29206 --- /dev/null +++ b/lib/alterl/src/thrift_server.erl @@ -0,0 +1,171 @@ +%%%------------------------------------------------------------------- +%%% File : thrift_server.erl +%%% Author : +%%% Description : +%%% +%%% Created : 28 Jan 2008 by +%%%------------------------------------------------------------------- +-module(thrift_server). + +-behaviour(gen_server). + +%% API +-export([start_link/3, stop/1, take_socket/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {listen_socket, acceptor_ref, service, handler}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []). + +%%-------------------------------------------------------------------- +%% Function: stop(Pid) -> ok, {error, Reason} +%% Description: Stops the server. +%%-------------------------------------------------------------------- +stop(Pid) when is_pid(Pid) -> + gen_server:call(Pid, stop). + + +take_socket(Server, Socket) -> + gen_server:call(Server, {take_socket, Socket}). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init({Port, Service, Handler}) -> + {ok, Socket} = gen_tcp:listen(Port, + [binary, + {packet, 0}, + {active, false}, + {nodelay, true}, + {reuseaddr, true}]), + {ok, Ref} = prim_inet:async_accept(Socket, -1), + {ok, #state{listen_socket = Socket, + acceptor_ref = Ref, + service = Service, + handler = Handler}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(stop, _From, State) -> + {stop, stopped, ok, State}; + +handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> + Result = gen_tcp:controlling_process(Socket, FromPid), + {reply, Result, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, + State = #state{listen_socket = ListenSocket, + acceptor_ref = Ref, + service = Service, + handler = Handler}) -> + case set_sockopt(ListenSocket, ClientSocket) of + ok -> + %% New client connected - start processor + start_processor(ClientSocket, Service, Handler), + {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), + {noreply, State#state{acceptor_ref = NewRef}}; + {error, Reason} -> + error_logger:error_msg("Couldn't set socket opts: ~p~n", + [Reason]), + {stop, Reason, State} + end; + +handle_info({inet_async, ListenSocket, Ref, Error}, State) -> + error_logger:error_msg("Error in acceptor: ~p~n", [Error]), + {stop, Error, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +set_sockopt(ListenSocket, ClientSocket) -> + true = inet_db:register_socket(ClientSocket, inet_tcp), + case prim_inet:getopts(ListenSocket, + [active, nodelay, keepalive, delay_send, priority, tos]) of + {ok, Opts} -> + case prim_inet:setopts(ClientSocket, Opts) of + ok -> ok; + Error -> gen_tcp:close(ClientSocket), + Error + end; + Error -> + gen_tcp:close(ClientSocket), + Error + end. + +start_processor(Socket, Service, Handler) -> + Server = self(), + + ProtoGen = fun() -> + % Become the controlling process + ok = take_socket(Server, Socket), + {ok, SocketTransport} = thrift_socket_transport:new(Socket), + {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), + {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), + {ok, Protocol, Protocol} + end, + + thrift_processor:start(ProtoGen, Service, Handler).