From: David Reiss Date: Mon, 30 Aug 2010 22:05:38 +0000 (+0000) Subject: erlang: Refactor the processor X-Git-Tag: 0.5.0~119 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=035979ff80e1e47c03eaa019a674239fef4f343f;p=common%2Fthrift.git erlang: Refactor the processor Now the server works. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990986 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl index b751da6e..9924a2c0 100644 --- a/lib/erl/src/thrift_processor.erl +++ b/lib/erl/src/thrift_processor.erl @@ -24,53 +24,53 @@ -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(thrift_processor, {handler, in_protocol, out_protocol, service}). +-record(thrift_processor, {handler, protocol, service}). init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) -> - {ok, IProt, OProt} = ProtoGen(), - loop(#thrift_processor{in_protocol = IProt, - out_protocol = OProt, + {ok, Proto} = ProtoGen(), + loop(#thrift_processor{protocol = Proto, service = Service, handler = Handler}). -loop(State = #thrift_processor{in_protocol = IProto, - out_protocol = OProto}) -> - case thrift_protocol:read(IProto, message_begin) of +loop(State0 = #thrift_processor{protocol = Proto0}) -> + {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin), + State1 = State0#thrift_processor{protocol = Proto1}, + case MessageBegin of #protocol_message_begin{name = Function, type = ?tMessageType_CALL} -> - ok = handle_function(State, list_to_atom(Function)), - loop(State); + {State2, ok} = handle_function(State1, list_to_atom(Function)), + loop(State2); #protocol_message_begin{name = Function, type = ?tMessageType_ONEWAY} -> - ok = handle_function(State, list_to_atom(Function)), - loop(State); + {State2, ok} = handle_function(State1, list_to_atom(Function)), + loop(State2); {error, timeout} -> - thrift_protocol:close_transport(OProto), + thrift_protocol:close_transport(Proto1), ok; {error, closed} -> %% error_logger:info_msg("Client disconnected~n"), - thrift_protocol:close_transport(OProto), + thrift_protocol:close_transport(Proto1), exit(shutdown) end. -handle_function(State=#thrift_processor{in_protocol = IProto, - out_protocol = OProto, - handler = Handler, - service = Service}, +handle_function(State0=#thrift_processor{protocol = Proto0, + handler = Handler, + service = Service}, Function) -> InParams = Service:function_info(Function, params_type), - {ok, Params} = thrift_protocol:read(IProto, InParams), + {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams), + State1 = State0#thrift_processor{protocol = Proto1}, try Result = Handler:handle_function(Function, Params), %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", %% [Function, Params, Micro/1000.0]), - handle_success(State, Function, Result) + handle_success(State1, Function, Result) catch Type:Data when Type =:= throw orelse Type =:= error -> - handle_function_catch(State, Function, Type, Data) + handle_function_catch(State1, Function, Type, Data) end. handle_function_catch(State = #thrift_processor{service = Service}, @@ -83,39 +83,37 @@ handle_function_catch(State = #thrift_processor{service = Service}, error_logger:warning_msg( "oneway void ~p threw error which must be ignored: ~p", [Function, {ErrType, ErrData, Stack}]), - ok; + {State, ok}; {throw, Exception} when is_tuple(Exception), size(Exception) > 0 -> %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), - handle_exception(State, Function, Exception), - ok; % we still want to accept more requests from this client + handle_exception(State, Function, Exception); + % we still want to accept more requests from this client {error, Error} -> - ok = handle_error(State, Function, Error) + handle_error(State, Function, Error) end. -handle_success(State = #thrift_processor{out_protocol = OProto, - service = Service}, +handle_success(State = #thrift_processor{service = Service}, Function, Result) -> ReplyType = Service:function_info(Function, reply_type), StructName = atom_to_list(Function) ++ "_result", - ok = case Result of - {reply, ReplyData} -> - Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, - send_reply(OProto, Function, ?tMessageType_REPLY, Reply); + case Result of + {reply, ReplyData} -> + Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, + send_reply(State, Function, ?tMessageType_REPLY, Reply); - ok when ReplyType == {struct, []} -> - send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); + ok when ReplyType == {struct, []} -> + send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); - ok when ReplyType == oneway_void -> - %% no reply for oneway void - ok - end. + ok when ReplyType == oneway_void -> + %% no reply for oneway void + {State, ok} + end. -handle_exception(State = #thrift_processor{out_protocol = OProto, - service = Service}, +handle_exception(State = #thrift_processor{service = Service}, Function, Exception) -> ExceptionType = element(1, Exception), @@ -140,9 +138,9 @@ handle_exception(State = #thrift_processor{out_protocol = OProto, % Make sure we got at least one defined case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of true -> - ok = handle_unknown_exception(State, Function, Exception); + handle_unknown_exception(State, Function, Exception); false -> - ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) + send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) end. %% @@ -153,7 +151,7 @@ handle_unknown_exception(State, Function, Exception) -> handle_error(State, Function, {exception_not_declared_as_thrown, Exception}). -handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) -> +handle_error(State, Function, Error) -> Stack = erlang:get_stacktrace(), error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]), @@ -169,14 +167,14 @@ handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) -> #'TApplicationException'{ message = Message, type = ?TApplicationException_UNKNOWN}}, - send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply). - -send_reply(OProto, Function, ReplyMessageType, Reply) -> - ok = thrift_protocol:write(OProto, #protocol_message_begin{ - name = atom_to_list(Function), - type = ReplyMessageType, - seqid = 0}), - ok = thrift_protocol:write(OProto, Reply), - ok = thrift_protocol:write(OProto, message_end), - ok = thrift_protocol:flush_transport(OProto), - ok. + send_reply(State, Function, ?tMessageType_EXCEPTION, Reply). + +send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) -> + {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{ + name = atom_to_list(Function), + type = ReplyMessageType, + seqid = 0}), + {Proto2, ok} = thrift_protocol:write(Proto1, Reply), + {Proto3, ok} = thrift_protocol:write(Proto2, message_end), + {Proto4, ok} = thrift_protocol:flush_transport(Proto3), + {State#thrift_processor{protocol = Proto4}, ok}. diff --git a/lib/erl/src/thrift_server.erl b/lib/erl/src/thrift_server.erl index 5d0012ba..80a1388e 100644 --- a/lib/erl/src/thrift_server.erl +++ b/lib/erl/src/thrift_server.erl @@ -177,7 +177,7 @@ start_processor(Socket, Service, Handler) -> {ok, SocketTransport} = thrift_socket_transport:new(Socket), {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), - {ok, Protocol, Protocol} + {ok, Protocol} end, spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]). diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl index 6794e630..44894b05 100644 --- a/lib/erl/src/thrift_socket_server.erl +++ b/lib/erl/src/thrift_socket_server.erl @@ -188,7 +188,7 @@ acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed}) false -> thrift_buffered_transport:new(SocketTransport) end, {ok, Protocol} = thrift_binary_protocol:new(Transport), - {ok, IProt=Protocol, OProt=Protocol} + {ok, Protocol} end, thrift_processor:init({Server, ProtoGen, Service, Handler}); {error, closed} ->