From a2cceb4396e89f02b9724e3f6dada3c534eacea1 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Wed, 2 Mar 2011 18:25:24 +0000 Subject: [PATCH] THRIFT-1076. erl: Erlang Thrift socket server has a bug that causes java thrift client of framed binary client to throw 'out of sequence' exception This patch makes the erlang server return the seq id that the client sent instead of 0. Patch: Pascal Qu git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1076318 13f79535-47bb-0310-9956-ffa450edef68 --- lib/erl/src/thrift_processor.erl | 49 ++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl index 43155050..88af05a8 100644 --- a/lib/erl/src/thrift_processor.erl +++ b/lib/erl/src/thrift_processor.erl @@ -37,12 +37,14 @@ loop(State0 = #thrift_processor{protocol = Proto0}) -> State1 = State0#thrift_processor{protocol = Proto1}, case MessageBegin of #protocol_message_begin{name = Function, - type = ?tMessageType_CALL} -> - {State2, ok} = handle_function(State1, list_to_atom(Function)), + type = ?tMessageType_CALL, + seqid = Seqid} -> + {State2, ok} = handle_function(State1, list_to_atom(Function), Seqid), loop(State2); #protocol_message_begin{name = Function, - type = ?tMessageType_ONEWAY} -> - {State2, ok} = handle_function(State1, list_to_atom(Function)), + type = ?tMessageType_ONEWAY, + seqid = Seqid} -> + {State2, ok} = handle_function(State1, list_to_atom(Function), Seqid), loop(State2); {error, timeout} -> thrift_protocol:close_transport(Proto1), @@ -56,7 +58,8 @@ loop(State0 = #thrift_processor{protocol = Proto0}) -> handle_function(State0=#thrift_processor{protocol = Proto0, handler = Handler, service = Service}, - Function) -> + Function, + Seqid) -> InParams = Service:function_info(Function, params_type), {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams), @@ -67,14 +70,14 @@ handle_function(State0=#thrift_processor{protocol = Proto0, %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", %% [Function, Params, Micro/1000.0]), - handle_success(State1, Function, Result) + handle_success(State1, Function, Result, Seqid) catch Type:Data when Type =:= throw orelse Type =:= error -> - handle_function_catch(State1, Function, Type, Data) + handle_function_catch(State1, Function, Type, Data, Seqid) end. handle_function_catch(State = #thrift_processor{service = Service}, - Function, ErrType, ErrData) -> + Function, ErrType, ErrData, Seqid) -> IsOneway = Service:function_info(Function, reply_type) =:= oneway_void, case {ErrType, ErrData} of @@ -87,26 +90,27 @@ handle_function_catch(State = #thrift_processor{service = Service}, {throw, Exception} when is_tuple(Exception), size(Exception) > 0 -> %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), - handle_exception(State, Function, Exception); + handle_exception(State, Function, Exception, Seqid); % we still want to accept more requests from this client {error, Error} -> - handle_error(State, Function, Error) + handle_error(State, Function, Error, Seqid) end. handle_success(State = #thrift_processor{service = Service}, Function, - Result) -> + Result, + Seqid) -> ReplyType = Service:function_info(Function, reply_type), StructName = atom_to_list(Function) ++ "_result", case Result of {reply, ReplyData} -> Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, - send_reply(State, Function, ?tMessageType_REPLY, Reply); + send_reply(State, Function, ?tMessageType_REPLY, Reply, Seqid); ok when ReplyType == {struct, []} -> - send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); + send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}, Seqid); ok when ReplyType == oneway_void -> %% no reply for oneway void @@ -115,7 +119,8 @@ handle_success(State = #thrift_processor{service = Service}, handle_exception(State = #thrift_processor{service = Service}, Function, - Exception) -> + Exception, + Seqid) -> ExceptionType = element(1, Exception), %% Fetch a structure like {struct, [{-2, {struct, {Module, Type}}}, %% {-3, {struct, {Module, Type}}}]} @@ -138,20 +143,20 @@ handle_exception(State = #thrift_processor{service = Service}, % Make sure we got at least one defined case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of true -> - handle_unknown_exception(State, Function, Exception); + handle_unknown_exception(State, Function, Exception, Seqid); false -> - send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) + send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}, Seqid) end. %% %% Called when an exception has been explicitly thrown by the service, but it was %% not one of the exceptions that was defined for the function. %% -handle_unknown_exception(State, Function, Exception) -> +handle_unknown_exception(State, Function, Exception, Seqid) -> handle_error(State, Function, {exception_not_declared_as_thrown, - Exception}). + Exception}, Seqid). -handle_error(State, Function, Error) -> +handle_error(State, Function, Error, Seqid) -> Stack = erlang:get_stacktrace(), error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]), @@ -167,13 +172,13 @@ handle_error(State, Function, Error) -> #'TApplicationException'{ message = Message, type = ?TApplicationException_UNKNOWN}}, - send_reply(State, Function, ?tMessageType_EXCEPTION, Reply). + send_reply(State, Function, ?tMessageType_EXCEPTION, Reply, Seqid). -send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) -> +send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply, Seqid) -> {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{ name = atom_to_list(Function), type = ReplyMessageType, - seqid = 0}), + seqid = Seqid}), {Proto2, ok} = thrift_protocol:write(Proto1, Reply), {Proto3, ok} = thrift_protocol:write(Proto2, message_end), {Proto4, ok} = thrift_protocol:flush_transport(Proto3), -- 2.17.1