From: Bryan Duxbury Date: Wed, 2 Mar 2011 18:25:24 +0000 (+0000) Subject: THRIFT-1076. erl: Erlang Thrift socket server has a bug that causes java thrift clien... X-Git-Tag: 0.7.0~164 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=a2cceb4396e89f02b9724e3f6dada3c534eacea1;p=common%2Fthrift.git THRIFT-1076. erl: Erlang Thrift socket server has a bug that causes java thrift client of framed binary client to throw 'out of sequence' exception This patch makes the erlang server return the seq id that the client sent instead of 0. Patch: Pascal Qu git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1076318 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl index 43155050..88af05a8 100644 --- a/lib/erl/src/thrift_processor.erl +++ b/lib/erl/src/thrift_processor.erl @@ -37,12 +37,14 @@ loop(State0 = #thrift_processor{protocol = Proto0}) -> State1 = State0#thrift_processor{protocol = Proto1}, case MessageBegin of #protocol_message_begin{name = Function, - type = ?tMessageType_CALL} -> - {State2, ok} = handle_function(State1, list_to_atom(Function)), + type = ?tMessageType_CALL, + seqid = Seqid} -> + {State2, ok} = handle_function(State1, list_to_atom(Function), Seqid), loop(State2); #protocol_message_begin{name = Function, - type = ?tMessageType_ONEWAY} -> - {State2, ok} = handle_function(State1, list_to_atom(Function)), + type = ?tMessageType_ONEWAY, + seqid = Seqid} -> + {State2, ok} = handle_function(State1, list_to_atom(Function), Seqid), loop(State2); {error, timeout} -> thrift_protocol:close_transport(Proto1), @@ -56,7 +58,8 @@ loop(State0 = #thrift_processor{protocol = Proto0}) -> handle_function(State0=#thrift_processor{protocol = Proto0, handler = Handler, service = Service}, - Function) -> + Function, + Seqid) -> InParams = Service:function_info(Function, params_type), {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams), @@ -67,14 +70,14 @@ handle_function(State0=#thrift_processor{protocol = Proto0, %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", %% [Function, Params, Micro/1000.0]), - handle_success(State1, Function, Result) + handle_success(State1, Function, Result, Seqid) catch Type:Data when Type =:= throw orelse Type =:= error -> - handle_function_catch(State1, Function, Type, Data) + handle_function_catch(State1, Function, Type, Data, Seqid) end. handle_function_catch(State = #thrift_processor{service = Service}, - Function, ErrType, ErrData) -> + Function, ErrType, ErrData, Seqid) -> IsOneway = Service:function_info(Function, reply_type) =:= oneway_void, case {ErrType, ErrData} of @@ -87,26 +90,27 @@ handle_function_catch(State = #thrift_processor{service = Service}, {throw, Exception} when is_tuple(Exception), size(Exception) > 0 -> %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), - handle_exception(State, Function, Exception); + handle_exception(State, Function, Exception, Seqid); % we still want to accept more requests from this client {error, Error} -> - handle_error(State, Function, Error) + handle_error(State, Function, Error, Seqid) end. handle_success(State = #thrift_processor{service = Service}, Function, - Result) -> + Result, + Seqid) -> ReplyType = Service:function_info(Function, reply_type), StructName = atom_to_list(Function) ++ "_result", case Result of {reply, ReplyData} -> Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, - send_reply(State, Function, ?tMessageType_REPLY, Reply); + send_reply(State, Function, ?tMessageType_REPLY, Reply, Seqid); ok when ReplyType == {struct, []} -> - send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); + send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}, Seqid); ok when ReplyType == oneway_void -> %% no reply for oneway void @@ -115,7 +119,8 @@ handle_success(State = #thrift_processor{service = Service}, handle_exception(State = #thrift_processor{service = Service}, Function, - Exception) -> + Exception, + Seqid) -> ExceptionType = element(1, Exception), %% Fetch a structure like {struct, [{-2, {struct, {Module, Type}}}, %% {-3, {struct, {Module, Type}}}]} @@ -138,20 +143,20 @@ handle_exception(State = #thrift_processor{service = Service}, % Make sure we got at least one defined case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of true -> - handle_unknown_exception(State, Function, Exception); + handle_unknown_exception(State, Function, Exception, Seqid); false -> - send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) + send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}, Seqid) end. %% %% Called when an exception has been explicitly thrown by the service, but it was %% not one of the exceptions that was defined for the function. %% -handle_unknown_exception(State, Function, Exception) -> +handle_unknown_exception(State, Function, Exception, Seqid) -> handle_error(State, Function, {exception_not_declared_as_thrown, - Exception}). + Exception}, Seqid). -handle_error(State, Function, Error) -> +handle_error(State, Function, Error, Seqid) -> Stack = erlang:get_stacktrace(), error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]), @@ -167,13 +172,13 @@ handle_error(State, Function, Error) -> #'TApplicationException'{ message = Message, type = ?TApplicationException_UNKNOWN}}, - send_reply(State, Function, ?tMessageType_EXCEPTION, Reply). + send_reply(State, Function, ?tMessageType_EXCEPTION, Reply, Seqid). -send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) -> +send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply, Seqid) -> {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{ name = atom_to_list(Function), type = ReplyMessageType, - seqid = 0}), + seqid = Seqid}), {Proto2, ok} = thrift_protocol:write(Proto1, Reply), {Proto3, ok} = thrift_protocol:write(Proto2, message_end), {Proto4, ok} = thrift_protocol:flush_transport(Proto3),