Rollback a few recent Erlang changes to fix blame data
My combined patch for THRIFT-599 was committed, but it is preferable
commit the individual patches to preserve the more detailed log and
blame data. I'll recommit r987018 as a sequence of patches and r988722
as its own rev.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990957 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl
index 5c74adc..d5bb146 100644
--- a/lib/erl/src/thrift_client.erl
+++ b/lib/erl/src/thrift_client.erl
@@ -19,127 +19,366 @@
-module(thrift_client).
+-behaviour(gen_server).
+
%% API
--export([new/2, call/3, send_call/3, close/1]).
+-export([start_link/2, start_link/3, start_link/4,
+ start/3, start/4,
+ call/3, send_call/3, close/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(tclient, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as a linked process.
+%%--------------------------------------------------------------------
+start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
+ start_link(Host, Port, Service, []).
+
+start_link(Host, Port, Service, Options) ->
+ start(Host, Port, Service, [{monitor, link} | Options]).
+
+start_link(ProtocolFactory, Service) ->
+ start(ProtocolFactory, Service, [{monitor, link}]).
+
+%%
+%% Splits client options into protocol options and transport options
+%%
+%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
+%%
+split_options(Options) ->
+ split_options(Options, [], [], []).
+
+split_options([], ClientIn, ProtoIn, TransIn) ->
+ {ClientIn, ProtoIn, TransIn};
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= monitor ->
+ split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= strict_read;
+ OptKey =:= strict_write ->
+ split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
+
+split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
+ when OptKey =:= framed;
+ OptKey =:= connect_timeout;
+ OptKey =:= sockopts ->
+ split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
-new(Protocol, Service)
- when is_atom(Service) ->
- {ok, #tclient{protocol = Protocol,
- service = Service,
- seqid = 0}}.
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server as an unlinked process.
+%%--------------------------------------------------------------------
--spec call(#tclient{}, atom(), list()) -> {#tclient{}, {ok, any()} | {error, any()}}.
-call(Client = #tclient{}, Function, Args)
- when is_atom(Function), is_list(Args) ->
- case send_function_call(Client, Function, Args) of
- {Client1, ok} ->
- receive_function_result(Client1, Function);
- Else ->
- Else
+%% Backwards-compatible starter for the common-case of socket transports
+start(Host, Port, Service, Options)
+ when is_integer(Port), is_atom(Service), is_list(Options) ->
+ {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
+
+ {ok, TransportFactory} =
+ thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
+
+ {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
+ TransportFactory, ProtoOpts),
+
+ start(ProtocolFactory, Service, ClientOpts).
+
+
+%% ProtocolFactory :: fun() -> thrift_protocol()
+start(ProtocolFactory, Service, ClientOpts)
+ when is_function(ProtocolFactory), is_atom(Service) ->
+ {Starter, Opts} =
+ case lists:keysearch(monitor, 1, ClientOpts) of
+ {value, {monitor, link}} ->
+ {start_link, []};
+ {value, {monitor, tether}} ->
+ {start, [{tether, self()}]};
+ _ ->
+ {start, []}
+ end,
+
+ Connect =
+ case lists:keysearch(connect, 1, ClientOpts) of
+ {value, {connect, Choice}} ->
+ Choice;
+ _ ->
+ %% By default, connect at creation-time.
+ true
+ end,
+
+
+ Started = gen_server:Starter(?MODULE, [Service, Opts], []),
+
+ if
+ Connect ->
+ case Started of
+ {ok, Pid} ->
+ case gen_server:call(Pid, {connect, ProtocolFactory}) of
+ ok ->
+ {ok, Pid};
+ Error ->
+ Error
+ end;
+ Else ->
+ Else
+ end;
+ true ->
+ Started
end.
+call(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ case gen_server:call(Client, {call, Function, Args}) of
+ R = {ok, _} -> R;
+ R = {error, _} -> R;
+ {exception, Exception} -> throw(Exception)
+ end.
+
+cast(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ gen_server:cast(Client, {call, Function, Args}).
%% Sends a function call but does not read the result. This is useful
%% if you're trying to log non-oneway function calls to write-only
%% transports like thrift_disk_log_transport.
--spec send_call(#tclient{}, atom(), list()) -> {#tclient{}, ok}.
-send_call(Client = #tclient{}, Function, Args)
- when is_atom(Function), is_list(Args) ->
- send_function_call(Client, Function, Args).
+send_call(Client, Function, Args)
+ when is_pid(Client), is_atom(Function), is_list(Args) ->
+ gen_server:call(Client, {send_call, Function, Args}).
--spec close(#tclient{}) -> ok.
-close(#tclient{protocol=Protocol}) ->
- thrift_protocol:close_transport(Protocol).
+close(Client) when is_pid(Client) ->
+ gen_server:cast(Client, close).
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Service, Opts]) ->
+ case lists:keysearch(tether, 1, Opts) of
+ {value, {tether, Pid}} ->
+ erlang:monitor(process, Pid);
+ _Else ->
+ ok
+ end,
+ {ok, #state{service = Service}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({connect, ProtocolFactory}, _From,
+ State = #state{service = Service}) ->
+ case ProtocolFactory() of
+ {ok, Protocol} ->
+ {reply, ok, State#state{protocol = Protocol,
+ seqid = 0}};
+ Error ->
+ {stop, normal, Error, State}
+ end;
+
+handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
+ Result = catch_function_exceptions(
+ fun() ->
+ ok = send_function_call(State, Function, Args),
+ receive_function_result(State, Function)
+ end,
+ Service),
+ {reply, Result, State};
+
+
+handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
+ Result = catch_function_exceptions(
+ fun() ->
+ send_function_call(State, Function, Args)
+ end,
+ Service),
+ {reply, Result, State}.
+
+
+%% Helper function that catches exceptions thrown by sending or receiving
+%% a function and returns the correct response for call or send_only above.
+catch_function_exceptions(Fun, Service) ->
+ try
+ Fun()
+ catch
+ throw:{return, Return} ->
+ Return;
+ error:function_clause ->
+ ST = erlang:get_stacktrace(),
+ case hd(ST) of
+ {Service, function_info, [Function, _]} ->
+ {error, {no_function, Function}};
+ _ -> throw({error, {function_clause, ST}})
+ end
+ end.
+
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({call, Function, Args}, State = #state{service = Service,
+ protocol = Protocol,
+ seqid = SeqId}) ->
+ _Result =
+ try
+ ok = send_function_call(State, Function, Args),
+ receive_function_result(State, Function)
+ catch
+ Class:Reason ->
+ error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
+ end,
+
+ {noreply, State};
+
+handle_cast(close, State=#state{protocol = Protocol}) ->
+%% error_logger:info_msg("thrift_client ~p received close", [self()]),
+ {stop,normal,State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State)
+ when is_reference(MonitorRef), is_pid(Pid) ->
+ %% We don't actually verify the correctness of the DOWN message.
+ {stop, parent_died, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(Reason, State = #state{protocol=undefined}) ->
+ ok;
+terminate(Reason, State = #state{protocol=Protocol}) ->
+ thrift_protocol:close_transport(Protocol),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
--spec send_function_call(#tclient{}, atom(), list()) -> {#tclient{}, ok | {error, any()}}.
-send_function_call(Client = #tclient{protocol = Proto0,
- service = Service,
- seqid = SeqId},
+send_function_call(#state{protocol = Proto,
+ service = Service,
+ seqid = SeqId},
Function,
Args) ->
Params = Service:function_info(Function, params_type),
- case Params of
- no_function ->
- {Client, {error, {no_function, Function}}};
- {struct, PList} when length(PList) =/= length(Args) ->
- {Client, {error, {bad_args, Function, Args}}};
- {struct, _PList} ->
- Begin = #protocol_message_begin{name = atom_to_list(Function),
- type = ?tMessageType_CALL,
- seqid = SeqId},
- {Proto1, ok} = thrift_protocol:write(Proto0, Begin),
- {Proto2, ok} = thrift_protocol:write(Proto1, {Params, list_to_tuple([Function | Args])}),
- {Proto3, ok} = thrift_protocol:write(Proto2, message_end),
- {Proto4, ok} = thrift_protocol:flush_transport(Proto3),
- {Client#tclient{protocol = Proto4}, ok}
- end.
+ {struct, PList} = Params,
+ if
+ length(PList) =/= length(Args) ->
+ throw({return, {error, {bad_args, Function, Args}}});
+ true -> ok
+ end,
--spec receive_function_result(#tclient{}, atom()) -> {#tclient{}, {ok, any()} | {error, any()}}.
-receive_function_result(Client = #tclient{service = Service}, Function) ->
+ Begin = #protocol_message_begin{name = atom_to_list(Function),
+ type = ?tMessageType_CALL,
+ seqid = SeqId},
+ ok = thrift_protocol:write(Proto, Begin),
+ ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
+ ok = thrift_protocol:write(Proto, message_end),
+ thrift_protocol:flush_transport(Proto),
+ ok.
+
+receive_function_result(State = #state{protocol = Proto,
+ service = Service},
+ Function) ->
ResultType = Service:function_info(Function, reply_type),
- read_result(Client, Function, ResultType).
+ read_result(State, Function, ResultType).
-read_result(Client, _Function, oneway_void) ->
- {Client, {ok, ok}};
+read_result(_State,
+ _Function,
+ oneway_void) ->
+ {ok, ok};
-read_result(Client = #tclient{protocol = Proto0,
- seqid = SeqId},
+read_result(State = #state{protocol = Proto,
+ seqid = SeqId},
Function,
ReplyType) ->
- {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin),
- NewClient = Client#tclient{protocol = Proto1},
- case MessageBegin of
+ case thrift_protocol:read(Proto, message_begin) of
#protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
- {NewClient, {error, {bad_seq_id, SeqId}}};
+ {error, {bad_seq_id, SeqId}};
#protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
- handle_application_exception(NewClient);
+ handle_application_exception(State);
#protocol_message_begin{type = ?tMessageType_REPLY} ->
- handle_reply(NewClient, Function, ReplyType)
+ handle_reply(State, Function, ReplyType)
end.
-
-handle_reply(Client = #tclient{protocol = Proto0,
- service = Service},
+handle_reply(State = #state{protocol = Proto,
+ service = Service},
Function,
ReplyType) ->
{struct, ExceptionFields} = Service:function_info(Function, exceptions),
ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
- {Proto1, {ok, Reply}} = thrift_protocol:read(Proto0, ReplyStructDef),
- {Proto2, ok} = thrift_protocol:read(Proto1, message_end),
- NewClient = Client#tclient{protocol = Proto2},
+ {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
ReplyList = tuple_to_list(Reply),
true = length(ReplyList) == length(ExceptionFields) + 1,
ExceptionVals = tl(ReplyList),
Thrown = [X || X <- ExceptionVals,
X =/= undefined],
- case Thrown of
- [] when ReplyType == {struct, []} ->
- {NewClient, {ok, ok}};
- [] ->
- {NewClient, {ok, hd(ReplyList)}};
- [Exception] ->
- throw({NewClient, {exception, Exception}})
- end.
+ Result =
+ case Thrown of
+ [] when ReplyType == {struct, []} ->
+ {ok, ok};
+ [] ->
+ {ok, hd(ReplyList)};
+ [Exception] ->
+ {exception, Exception}
+ end,
+ ok = thrift_protocol:read(Proto, message_end),
+ Result.
-handle_application_exception(Client = #tclient{protocol = Proto0}) ->
- {Proto1, {ok, Exception}} =
- thrift_protocol:read(Proto0, ?TApplicationException_Structure),
- {Proto2, ok} = thrift_protocol:read(Proto1, message_end),
+handle_application_exception(State = #state{protocol = Proto}) ->
+ {ok, Exception} = thrift_protocol:read(Proto,
+ ?TApplicationException_Structure),
+ ok = thrift_protocol:read(Proto, message_end),
XRecord = list_to_tuple(
['TApplicationException' | tuple_to_list(Exception)]),
error_logger:error_msg("X: ~p~n", [XRecord]),
true = is_record(XRecord, 'TApplicationException'),
- NewClient = Client#tclient{protocol = Proto2},
- throw({NewClient, {exception, XRecord}}).
+ {exception, XRecord}.