From: David Reiss Date: Wed, 11 Jun 2008 00:59:48 +0000 (+0000) Subject: mochiweb style avoidance of gen_tcp:controlling_process X-Git-Tag: 0.2.0~730 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=c11734efaca5e67a394997548293d940124de42f;p=common%2Fthrift.git mochiweb style avoidance of gen_tcp:controlling_process TODO: remove extraneous debug_rlogs git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666431 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl index 633cfba0..440da229 100644 --- a/lib/alterl/src/thrift_binary_protocol.erl +++ b/lib/alterl/src/thrift_binary_protocol.erl @@ -14,7 +14,8 @@ -export([new/1, read/2, write/2, - flush_transport/1 + flush_transport/1, + close_transport/1 ]). -record(binary_protocol, {transport}). @@ -30,6 +31,10 @@ new(Transport) -> flush_transport(#binary_protocol{transport = Transport}) -> thrift_transport:flush(Transport). +close_transport(#binary_protocol{transport = Transport}) -> + thrift_transport:flush(Transport), + thrift_transport:close(Transport). + %%% %%% instance methods %%% diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl index dc11fff2..a2ee017d 100644 --- a/lib/alterl/src/thrift_buffered_transport.erl +++ b/lib/alterl/src/thrift_buffered_transport.erl @@ -18,7 +18,7 @@ terminate/2, code_change/3]). %% thrift_transport callbacks --export([write/2, read/2, flush/1]). +-export([write/2, read/2, flush/1, close/1]). -record(state, { % The wrapped transport @@ -41,6 +41,7 @@ new(WrappedTransport) -> case gen_server:start_link(?MODULE, [WrappedTransport], []) of {ok, Pid} -> + io:format("buffered transport ~p wrapping ~p", [Pid, WrappedTransport]), thrift_transport:new(?MODULE, Pid); Else -> Else @@ -57,12 +58,20 @@ write(Transport, Data) when is_binary(Data) -> gen_server:call(Transport, {write, Data}). %%-------------------------------------------------------------------- -%% Function: flush(Transpor) -> ok +%% Function: flush(Transport) -> ok %% %% Description: Flushes the buffer through to the wrapped transport %%-------------------------------------------------------------------- flush(Transport) -> - gen_server:call(Transport, {flush}). + gen_server:call(Transport, flush). + +%%-------------------------------------------------------------------- +%% Function: flush(Transport) -> ok +%% +%% Description: Flushes the buffer through to the wrapped transport +%%-------------------------------------------------------------------- +close(Transport) -> + gen_server:call(Transport, close). %%-------------------------------------------------------------------- %% Function: Read(Transport, Len) -> {ok, Data} @@ -105,12 +114,24 @@ handle_call({read, Len}, _From, State = #state{wrapped = Wrapped}) -> Response = thrift_transport:read(Wrapped, Len), {reply, Response, State}; -handle_call({flush}, _From, State = #state{buffer = Buffer, - wrapped = Wrapped}) -> - Concat = concat_binary(lists:reverse(Buffer)), +handle_call(flush, _From, State = #state{buffer = Buffer, + wrapped = Wrapped}) -> + Concat = concat_binary(lists:reverse(Buffer)), Response = thrift_transport:write(Wrapped, Concat), - % todo(todd) - flush wrapped transport here? - {reply, Response, State#state{buffer = []}}. + thrift_transport:flush(Wrapped), + {reply, Response, State#state{buffer = []}}; + +handle_call(close, _From, State = #state{buffer = Buffer, + wrapped = Wrapped}) -> + case Buffer of + [] -> ok; + Data -> + thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))), + thrift_transport:flush(Wrapped) + end, + thrift_transport:close(Wrapped), + {reply, ok, State}. % TEST ONLY +%% {stop, normal, State}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | @@ -118,7 +139,7 @@ handle_call({flush}, _From, State = #state{buffer = Buffer, %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> +handle_cast(Msg, State=#state{}) -> {noreply, State}. %%-------------------------------------------------------------------- diff --git a/lib/alterl/src/thrift_processor.erl b/lib/alterl/src/thrift_processor.erl index 8f323523..2038b564 100644 --- a/lib/alterl/src/thrift_processor.erl +++ b/lib/alterl/src/thrift_processor.erl @@ -6,57 +6,61 @@ %%% Created : 28 Jan 2008 by %%%------------------------------------------------------------------- -module(thrift_processor). +-author('todd@lipcon.org'). +-author('eletuchy@facebook.com'). --export([start/3,init/3]). +-export([init/1]). -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(state, {handler, in_protocol, out_protocol, service}). +-record(thrift_processor, {handler, in_protocol, out_protocol, service}). -start(ProtocolGenerator, Service, Handler) when is_function(ProtocolGenerator, 0) -> - spawn(thrift_processor, init, [ProtocolGenerator, Service, Handler]). +init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) -> + {ok, IProt, OProt} = ProtoGen(), + loop(#thrift_processor{in_protocol = IProt, + out_protocol = OProt, + service = Service, + handler = Handler}). -init(ProtocolGenerator, Service, Handler) -> - {ok, IProt, OProt} = ProtocolGenerator(), - loop(#state{in_protocol = IProt, - out_protocol = OProt, - service = Service, - handler = Handler}). - -loop(State = #state{in_protocol = IProto, - out_protocol = OProto}) -> +loop(State = #thrift_processor{in_protocol = IProto, + out_protocol = OProto}) -> + error_logger:info_msg("loop: ~p", [State]), case thrift_protocol:read(IProto, message_begin) of #protocol_message_begin{name = Function, type = ?tMessageType_CALL} -> - ok= handle_function(State, list_to_atom(Function)), + ok=handle_function(State, list_to_atom(Function)), loop(State); {error, closed} -> - % error_logger:info_msg("Client disconnected~n"), + %% error_logger:info_msg("Client disconnected~n"), exit(protocol_closed) end. -handle_function(State = #state{in_protocol = IProto, - out_protocol = OProto, - handler = Handler, - service = Service}, +handle_function(State=#thrift_processor{in_protocol = IProto, + out_protocol = OProto, + handler = Handler, + service = Service}, Function) -> InParams = Service:function_info(Function, params_type), {ok, Params} = thrift_protocol:read(IProto, InParams), try + error_logger:info_msg("calling: ~p(~p)", [Function, Params]), Result = Handler:handle_function(Function, Params), - % {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), - % error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", - % [Function, Params, Micro/1000.0]), + error_logger:info_msg("result: ~p", [Result]), + %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), + %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", + %% [Function, Params, Micro/1000.0]), handle_success(State, Function, Result) catch Type:Data -> + error_logger:info_msg("handle_function oh noes: ~p ~p", [Type, Data]), handle_function_catch(State, Function, Type, Data) - end. + end, + after_reply(OProto). -handle_function_catch(State = #state{service = Service}, +handle_function_catch(State = #thrift_processor{service = Service}, Function, ErrType, ErrData) -> IsAsync = Service:function_info(Function, reply_type) =:= async_void, @@ -76,42 +80,41 @@ handle_function_catch(State = #state{service = Service}, ok = handle_error(State, Function, Error) end. -handle_success(State = #state{out_protocol = OProto, - service = Service}, +handle_success(State = #thrift_processor{out_protocol = OProto, + service = Service}, Function, Result) -> ReplyType = Service:function_info(Function, reply_type), StructName = atom_to_list(Function) ++ "_result", - case Result of - {reply, ReplyData} -> - Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, - ok = send_reply(OProto, Function, ?tMessageType_REPLY, Reply); + ok = case Result of + {reply, ReplyData} -> + Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, + send_reply(OProto, Function, ?tMessageType_REPLY, Reply); - ok when ReplyType == {struct, []} -> - ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); + ok when ReplyType == {struct, []} -> + send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); - ok when ReplyType == async_void -> - % no reply for async void - ok - end, - ok. + ok when ReplyType == async_void -> + %% no reply for async void + ok + end. -handle_exception(State = #state{out_protocol = OProto, - service = Service}, +handle_exception(State = #thrift_processor{out_protocol = OProto, + service = Service}, Function, Exception) -> ExceptionType = element(1, Exception), - % Fetch a structure like {struct, [{-2, {struct, {Module, Type}}}, - % {-3, {struct, {Module, Type}}}]} + %% Fetch a structure like {struct, [{-2, {struct, {Module, Type}}}, + %% {-3, {struct, {Module, Type}}}]} ReplySpec = Service:function_info(Function, exceptions), {struct, XInfo} = ReplySpec, true = is_list(XInfo), - % Assuming we had a type1 exception, we'd get: [undefined, Exception, undefined] - % e.g.: [{-1, type0}, {-2, type1}, {-3, type2}] + %% Assuming we had a type1 exception, we'd get: [undefined, Exception, undefined] + %% e.g.: [{-1, type0}, {-2, type1}, {-3, type2}] ExceptionList = [case Type of ExceptionType -> Exception; _ -> undefined @@ -120,7 +123,7 @@ handle_exception(State = #state{out_protocol = OProto, ExceptionTuple = list_to_tuple([Function | ExceptionList]), - % Make sure we got at least one defined + % Make sure we got at least one defined case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of true -> ok = handle_unknown_exception(State, Function, Exception); @@ -129,14 +132,14 @@ handle_exception(State = #state{out_protocol = OProto, end. %% -% Called when an exception has been explicitly thrown by the service, but it was -% not one of the exceptions that was defined for the function. +%% Called when an exception has been explicitly thrown by the service, but it was +%% not one of the exceptions that was defined for the function. %% handle_unknown_exception(State, Function, Exception) -> handle_error(State, Function, {exception_not_declared_as_thrown, Exception}). -handle_error(#state{out_protocol = OProto}, Function, Error) -> +handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) -> Stack = erlang:get_stacktrace(), error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]), @@ -154,7 +157,6 @@ handle_error(#state{out_protocol = OProto}, Function, Error) -> type = ?TApplicationException_UNKNOWN}}, send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply). - send_reply(OProto, Function, ReplyMessageType, Reply) -> ok = thrift_protocol:write(OProto, #protocol_message_begin{ name = atom_to_list(Function), @@ -165,13 +167,5 @@ send_reply(OProto, Function, ReplyMessageType, Reply) -> ok = thrift_protocol:flush_transport(OProto), ok. - -%% -% This is the same as timer:tc except that timer:tc appears to catch -% exceptions when it shouldn't! -%% -better_timer(Module, Function, Args) -> - T1 = erlang:now(), - Result = apply(Module, Function, Args), - T2 = erlang:now(), - {timer:now_diff(T2, T1), Result}. +after_reply(OProto) -> + ok = thrift_protocol:close_transport(OProto). diff --git a/lib/alterl/src/thrift_protocol.erl b/lib/alterl/src/thrift_protocol.erl index f1b2bcc9..484dcd46 100644 --- a/lib/alterl/src/thrift_protocol.erl +++ b/lib/alterl/src/thrift_protocol.erl @@ -5,6 +5,7 @@ read/2, skip/2, flush_transport/1, + close_transport/1, typeid_to_atom/1 ]). @@ -19,7 +20,8 @@ behaviour_info(callbacks) -> [ {read, 2}, {write, 2}, - {flush_transport, 1} + {flush_transport, 1}, + {close_transport, 1} ]; behaviour_info(_Else) -> undefined. @@ -31,6 +33,10 @@ flush_transport(#protocol{module = Module, data = Data}) -> Module:flush_transport(Data). +close_transport(#protocol{module = Module, + data = Data}) -> + Module:close_transport(Data). + typeid_to_atom(?tType_STOP) -> field_stop; typeid_to_atom(?tType_VOID) -> void; typeid_to_atom(?tType_BOOL) -> bool; diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl deleted file mode 100644 index 1cc29206..00000000 --- a/lib/alterl/src/thrift_server.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%%------------------------------------------------------------------- -%%% File : thrift_server.erl -%%% Author : -%%% Description : -%%% -%%% Created : 28 Jan 2008 by -%%%------------------------------------------------------------------- --module(thrift_server). - --behaviour(gen_server). - -%% API --export([start_link/3, stop/1, take_socket/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, {listen_socket, acceptor_ref, service, handler}). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- -start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []). - -%%-------------------------------------------------------------------- -%% Function: stop(Pid) -> ok, {error, Reason} -%% Description: Stops the server. -%%-------------------------------------------------------------------- -stop(Pid) when is_pid(Pid) -> - gen_server:call(Pid, stop). - - -take_socket(Server, Socket) -> - gen_server:call(Server, {take_socket, Socket}). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init({Port, Service, Handler}) -> - {ok, Socket} = gen_tcp:listen(Port, - [binary, - {packet, 0}, - {active, false}, - {nodelay, true}, - {reuseaddr, true}]), - {ok, Ref} = prim_inet:async_accept(Socket, -1), - {ok, #state{listen_socket = Socket, - acceptor_ref = Ref, - service = Service, - handler = Handler}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call(stop, _From, State) -> - {stop, stopped, ok, State}; - -handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> - Result = gen_tcp:controlling_process(Socket, FromPid), - {reply, Result, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, - State = #state{listen_socket = ListenSocket, - acceptor_ref = Ref, - service = Service, - handler = Handler}) -> - case set_sockopt(ListenSocket, ClientSocket) of - ok -> - %% New client connected - start processor - start_processor(ClientSocket, Service, Handler), - {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), - {noreply, State#state{acceptor_ref = NewRef}}; - {error, Reason} -> - error_logger:error_msg("Couldn't set socket opts: ~p~n", - [Reason]), - {stop, Reason, State} - end; - -handle_info({inet_async, ListenSocket, Ref, Error}, State) -> - error_logger:error_msg("Error in acceptor: ~p~n", [Error]), - {stop, Error, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- -set_sockopt(ListenSocket, ClientSocket) -> - true = inet_db:register_socket(ClientSocket, inet_tcp), - case prim_inet:getopts(ListenSocket, - [active, nodelay, keepalive, delay_send, priority, tos]) of - {ok, Opts} -> - case prim_inet:setopts(ClientSocket, Opts) of - ok -> ok; - Error -> gen_tcp:close(ClientSocket), - Error - end; - Error -> - gen_tcp:close(ClientSocket), - Error - end. - -start_processor(Socket, Service, Handler) -> - Server = self(), - - ProtoGen = fun() -> - % Become the controlling process - ok = take_socket(Server, Socket), - {ok, SocketTransport} = thrift_socket_transport:new(Socket), - {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), - {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), - {ok, Protocol, Protocol} - end, - - thrift_processor:start(ProtoGen, Service, Handler). diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl index a1b2e05a..bd5ac716 100644 --- a/lib/alterl/src/thrift_socket_transport.erl +++ b/lib/alterl/src/thrift_socket_transport.erl @@ -3,8 +3,7 @@ -behaviour(thrift_transport). -export([new/1, - - write/2, read/2, flush/1]). + write/2, read/2, flush/1, close/1]). -record(data, {socket}). @@ -17,6 +16,12 @@ write(#data{socket = Socket}, Data) when is_binary(Data) -> read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 -> gen_tcp:recv(Socket, Len). -% We can't really flush - everything is flushed when we write +%% We can't really flush - everything is flushed when we write flush(_) -> - ok. + ok. + +close(#data{socket = Socket}) -> + error_logger:info_msg("Close called, socket ~p", [Socket]) +%% gen_tcp:close(Socket), +%% exit(normal) + . diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl index f901f74d..6c0c6633 100644 --- a/lib/alterl/src/thrift_transport.erl +++ b/lib/alterl/src/thrift_transport.erl @@ -5,13 +5,15 @@ -export([new/2, write/2, read/2, - flush/1 + flush/1, + close/1 ]). behaviour_info(callbacks) -> [{read, 2}, {write, 2}, - {flush, 1} + {flush, 1}, + {close, 1} ]. -record(transport, { module, data }). @@ -30,3 +32,6 @@ read(Transport, Len) when is_integer(Len) -> flush(#transport{module = Module, data = Data}) -> Module:flush(Data). + +close(#transport{module = Module, data = Data}) -> + Module:close(Data).