blob: 07c9f488b2ac9d41db418e00c2d11cd5ce37c4c1 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001%%
2%% Licensed to the Apache Software Foundation (ASF) under one
3%% or more contributor license agreements. See the NOTICE file
4%% distributed with this work for additional information
5%% regarding copyright ownership. The ASF licenses this file
6%% to you under the Apache License, Version 2.0 (the
7%% "License"); you may not use this file except in compliance
8%% with the License. You may obtain a copy of the License at
9%%
10%% http://www.apache.org/licenses/LICENSE-2.0
11%%
12%% Unless required by applicable law or agreed to in writing,
13%% software distributed under the License is distributed on an
14%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15%% KIND, either express or implied. See the License for the
16%% specific language governing permissions and limitations
17%% under the License.
18%%
19
David Reiss2c534032008-06-11 00:58:00 +000020-module(thrift_client).
21
David Reissf32d0fb2010-08-30 22:05:00 +000022-behaviour(gen_server).
23
David Reiss2c534032008-06-11 00:58:00 +000024%% API
David Reissf32d0fb2010-08-30 22:05:00 +000025-export([start_link/2, start_link/3, start_link/4,
26 start/3, start/4,
David Reissd172a882010-08-30 22:05:08 +000027 call/3, cast/3, send_call/3, close/1]).
David Reissf32d0fb2010-08-30 22:05:00 +000028
29%% gen_server callbacks
30-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
31 terminate/2, code_change/3]).
32
David Reiss2c534032008-06-11 00:58:00 +000033
34-include("thrift_constants.hrl").
35-include("thrift_protocol.hrl").
36
David Reissf32d0fb2010-08-30 22:05:00 +000037-record(state, {service, protocol, seqid}).
38
39%%====================================================================
40%% API
41%%====================================================================
42%%--------------------------------------------------------------------
43%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
44%% Description: Starts the server as a linked process.
45%%--------------------------------------------------------------------
46start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) ->
47 start_link(Host, Port, Service, []).
48
49start_link(Host, Port, Service, Options) ->
50 start(Host, Port, Service, [{monitor, link} | Options]).
51
52start_link(ProtocolFactory, Service) ->
53 start(ProtocolFactory, Service, [{monitor, link}]).
54
55%%
56%% Splits client options into protocol options and transport options
57%%
58%% split_options([Options...]) -> {ProtocolOptions, TransportOptions}
59%%
60split_options(Options) ->
61 split_options(Options, [], [], []).
62
63split_options([], ClientIn, ProtoIn, TransIn) ->
64 {ClientIn, ProtoIn, TransIn};
65
66split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
67 when OptKey =:= monitor ->
68 split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn);
69
70split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
71 when OptKey =:= strict_read;
72 OptKey =:= strict_write ->
73 split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn);
74
75split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn)
76 when OptKey =:= framed;
77 OptKey =:= connect_timeout;
78 OptKey =:= sockopts ->
79 split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]).
David Reissfc427af2008-06-11 01:11:57 +000080
81
David Reissf32d0fb2010-08-30 22:05:00 +000082%%--------------------------------------------------------------------
83%% Function: start() -> {ok,Pid} | ignore | {error,Error}
84%% Description: Starts the server as an unlinked process.
85%%--------------------------------------------------------------------
David Reiss5e530af2009-06-04 02:01:24 +000086
David Reissf32d0fb2010-08-30 22:05:00 +000087%% Backwards-compatible starter for the common-case of socket transports
88start(Host, Port, Service, Options)
89 when is_integer(Port), is_atom(Service), is_list(Options) ->
90 {ClientOpts, ProtoOpts, TransOpts} = split_options(Options),
91
92 {ok, TransportFactory} =
93 thrift_socket_transport:new_transport_factory(Host, Port, TransOpts),
94
95 {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory(
96 TransportFactory, ProtoOpts),
97
98 start(ProtocolFactory, Service, ClientOpts).
99
100
101%% ProtocolFactory :: fun() -> thrift_protocol()
102start(ProtocolFactory, Service, ClientOpts)
103 when is_function(ProtocolFactory), is_atom(Service) ->
104 {Starter, Opts} =
105 case lists:keysearch(monitor, 1, ClientOpts) of
106 {value, {monitor, link}} ->
107 {start_link, []};
108 {value, {monitor, tether}} ->
109 {start, [{tether, self()}]};
110 _ ->
111 {start, []}
112 end,
113
114 Connect =
115 case lists:keysearch(connect, 1, ClientOpts) of
116 {value, {connect, Choice}} ->
117 Choice;
118 _ ->
119 %% By default, connect at creation-time.
120 true
121 end,
122
123
124 Started = gen_server:Starter(?MODULE, [Service, Opts], []),
125
126 if
127 Connect ->
128 case Started of
129 {ok, Pid} ->
130 case gen_server:call(Pid, {connect, ProtocolFactory}) of
131 ok ->
132 {ok, Pid};
133 Error ->
134 Error
135 end;
136 Else ->
137 Else
138 end;
139 true ->
140 Started
David Reisse5a4d0c2008-06-11 01:02:10 +0000141 end.
David Reiss2c534032008-06-11 00:58:00 +0000142
David Reiss5e6637b2010-08-30 22:05:18 +0000143-spec call(term(), atom(), list()) -> {ok, term()} | {error, term()}.
David Reissf32d0fb2010-08-30 22:05:00 +0000144call(Client, Function, Args)
145 when is_pid(Client), is_atom(Function), is_list(Args) ->
146 case gen_server:call(Client, {call, Function, Args}) of
147 R = {ok, _} -> R;
148 R = {error, _} -> R;
149 {exception, Exception} -> throw(Exception)
150 end.
151
David Reiss5e6637b2010-08-30 22:05:18 +0000152-spec cast(term(), atom(), list()) -> ok.
David Reissf32d0fb2010-08-30 22:05:00 +0000153cast(Client, Function, Args)
154 when is_pid(Client), is_atom(Function), is_list(Args) ->
155 gen_server:cast(Client, {call, Function, Args}).
David Reissa2f45972008-06-11 01:13:33 +0000156
David Reiss65cf7202008-06-11 01:12:20 +0000157%% Sends a function call but does not read the result. This is useful
David Reissc51986f2009-03-24 20:01:25 +0000158%% if you're trying to log non-oneway function calls to write-only
David Reiss65cf7202008-06-11 01:12:20 +0000159%% transports like thrift_disk_log_transport.
David Reiss5e6637b2010-08-30 22:05:18 +0000160-spec send_call(term(), atom(), list()) -> ok.
David Reissf32d0fb2010-08-30 22:05:00 +0000161send_call(Client, Function, Args)
162 when is_pid(Client), is_atom(Function), is_list(Args) ->
163 gen_server:call(Client, {send_call, Function, Args}).
David Reiss65cf7202008-06-11 01:12:20 +0000164
David Reiss5e6637b2010-08-30 22:05:18 +0000165-spec close(term()) -> ok.
David Reissf32d0fb2010-08-30 22:05:00 +0000166close(Client) when is_pid(Client) ->
167 gen_server:cast(Client, close).
David Reiss464e3002008-06-11 01:00:45 +0000168
David Reissf32d0fb2010-08-30 22:05:00 +0000169%%====================================================================
170%% gen_server callbacks
171%%====================================================================
172
173%%--------------------------------------------------------------------
174%% Function: init(Args) -> {ok, State} |
175%% {ok, State, Timeout} |
176%% ignore |
177%% {stop, Reason}
178%% Description: Initiates the server
179%%--------------------------------------------------------------------
180init([Service, Opts]) ->
181 case lists:keysearch(tether, 1, Opts) of
182 {value, {tether, Pid}} ->
183 erlang:monitor(process, Pid);
184 _Else ->
185 ok
186 end,
187 {ok, #state{service = Service}}.
188
189%%--------------------------------------------------------------------
190%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
191%% {reply, Reply, State, Timeout} |
192%% {noreply, State} |
193%% {noreply, State, Timeout} |
194%% {stop, Reason, Reply, State} |
195%% {stop, Reason, State}
196%% Description: Handling call messages
197%%--------------------------------------------------------------------
198handle_call({connect, ProtocolFactory}, _From,
199 State = #state{service = Service}) ->
200 case ProtocolFactory() of
201 {ok, Protocol} ->
202 {reply, ok, State#state{protocol = Protocol,
203 seqid = 0}};
204 Error ->
205 {stop, normal, Error, State}
206 end;
207
208handle_call({call, Function, Args}, _From, State = #state{service = Service}) ->
209 Result = catch_function_exceptions(
210 fun() ->
211 ok = send_function_call(State, Function, Args),
212 receive_function_result(State, Function)
213 end,
214 Service),
215 {reply, Result, State};
216
217
218handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) ->
219 Result = catch_function_exceptions(
220 fun() ->
221 send_function_call(State, Function, Args)
222 end,
223 Service),
224 {reply, Result, State}.
225
226
227%% Helper function that catches exceptions thrown by sending or receiving
228%% a function and returns the correct response for call or send_only above.
229catch_function_exceptions(Fun, Service) ->
230 try
231 Fun()
232 catch
233 throw:{return, Return} ->
234 Return;
235 error:function_clause ->
236 ST = erlang:get_stacktrace(),
237 case hd(ST) of
238 {Service, function_info, [Function, _]} ->
239 {error, {no_function, Function}};
240 _ -> throw({error, {function_clause, ST}})
241 end
242 end.
243
244
245%%--------------------------------------------------------------------
246%% Function: handle_cast(Msg, State) -> {noreply, State} |
247%% {noreply, State, Timeout} |
248%% {stop, Reason, State}
249%% Description: Handling cast messages
250%%--------------------------------------------------------------------
251handle_cast({call, Function, Args}, State = #state{service = Service,
252 protocol = Protocol,
253 seqid = SeqId}) ->
254 _Result =
255 try
256 ok = send_function_call(State, Function, Args),
257 receive_function_result(State, Function)
258 catch
259 Class:Reason ->
260 error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason])
261 end,
262
263 {noreply, State};
264
265handle_cast(close, State=#state{protocol = Protocol}) ->
266%% error_logger:info_msg("thrift_client ~p received close", [self()]),
267 {stop,normal,State};
268handle_cast(_Msg, State) ->
269 {noreply, State}.
270
271%%--------------------------------------------------------------------
272%% Function: handle_info(Info, State) -> {noreply, State} |
273%% {noreply, State, Timeout} |
274%% {stop, Reason, State}
275%% Description: Handling all non call/cast messages
276%%--------------------------------------------------------------------
277handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State)
278 when is_reference(MonitorRef), is_pid(Pid) ->
279 %% We don't actually verify the correctness of the DOWN message.
280 {stop, parent_died, State};
281
282handle_info(_Info, State) ->
283 {noreply, State}.
284
285%%--------------------------------------------------------------------
286%% Function: terminate(Reason, State) -> void()
287%% Description: This function is called by a gen_server when it is about to
288%% terminate. It should be the opposite of Module:init/1 and do any necessary
289%% cleaning up. When it returns, the gen_server terminates with Reason.
290%% The return value is ignored.
291%%--------------------------------------------------------------------
292terminate(Reason, State = #state{protocol=undefined}) ->
293 ok;
294terminate(Reason, State = #state{protocol=Protocol}) ->
295 thrift_protocol:close_transport(Protocol),
296 ok.
297
298%%--------------------------------------------------------------------
299%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
300%% Description: Convert process state when code is changed
301%%--------------------------------------------------------------------
302code_change(_OldVsn, State, _Extra) ->
303 {ok, State}.
David Reiss2c534032008-06-11 00:58:00 +0000304
305%%--------------------------------------------------------------------
306%%% Internal functions
307%%--------------------------------------------------------------------
David Reissf32d0fb2010-08-30 22:05:00 +0000308send_function_call(#state{protocol = Proto,
309 service = Service,
310 seqid = SeqId},
David Reiss2c534032008-06-11 00:58:00 +0000311 Function,
312 Args) ->
313 Params = Service:function_info(Function, params_type),
David Reissf32d0fb2010-08-30 22:05:00 +0000314 {struct, PList} = Params,
315 if
316 length(PList) =/= length(Args) ->
317 throw({return, {error, {bad_args, Function, Args}}});
318 true -> ok
319 end,
David Reiss2c534032008-06-11 00:58:00 +0000320
David Reissf32d0fb2010-08-30 22:05:00 +0000321 Begin = #protocol_message_begin{name = atom_to_list(Function),
322 type = ?tMessageType_CALL,
323 seqid = SeqId},
324 ok = thrift_protocol:write(Proto, Begin),
325 ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}),
326 ok = thrift_protocol:write(Proto, message_end),
327 thrift_protocol:flush_transport(Proto),
328 ok.
329
330receive_function_result(State = #state{protocol = Proto,
331 service = Service},
332 Function) ->
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000333 ResultType = Service:function_info(Function, reply_type),
David Reissf32d0fb2010-08-30 22:05:00 +0000334 read_result(State, Function, ResultType).
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000335
David Reissf32d0fb2010-08-30 22:05:00 +0000336read_result(_State,
337 _Function,
338 oneway_void) ->
339 {ok, ok};
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000340
David Reissf32d0fb2010-08-30 22:05:00 +0000341read_result(State = #state{protocol = Proto,
342 seqid = SeqId},
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000343 Function,
344 ReplyType) ->
David Reissf32d0fb2010-08-30 22:05:00 +0000345 case thrift_protocol:read(Proto, message_begin) of
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000346 #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId ->
David Reissf32d0fb2010-08-30 22:05:00 +0000347 {error, {bad_seq_id, SeqId}};
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000348
349 #protocol_message_begin{type = ?tMessageType_EXCEPTION} ->
David Reissf32d0fb2010-08-30 22:05:00 +0000350 handle_application_exception(State);
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000351
352 #protocol_message_begin{type = ?tMessageType_REPLY} ->
David Reissf32d0fb2010-08-30 22:05:00 +0000353 handle_reply(State, Function, ReplyType)
Bryan Duxburyd3879f82010-08-19 05:06:02 +0000354 end.
355
David Reissf32d0fb2010-08-30 22:05:00 +0000356handle_reply(State = #state{protocol = Proto,
357 service = Service},
David Reiss2c534032008-06-11 00:58:00 +0000358 Function,
359 ReplyType) ->
360 {struct, ExceptionFields} = Service:function_info(Function, exceptions),
361 ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields},
David Reissf32d0fb2010-08-30 22:05:00 +0000362 {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef),
David Reiss2c534032008-06-11 00:58:00 +0000363 ReplyList = tuple_to_list(Reply),
364 true = length(ReplyList) == length(ExceptionFields) + 1,
365 ExceptionVals = tl(ReplyList),
366 Thrown = [X || X <- ExceptionVals,
367 X =/= undefined],
David Reissf32d0fb2010-08-30 22:05:00 +0000368 Result =
369 case Thrown of
370 [] when ReplyType == {struct, []} ->
371 {ok, ok};
372 [] ->
373 {ok, hd(ReplyList)};
374 [Exception] ->
375 {exception, Exception}
376 end,
377 ok = thrift_protocol:read(Proto, message_end),
378 Result.
David Reiss6b3e40f2008-06-11 00:59:03 +0000379
David Reissf32d0fb2010-08-30 22:05:00 +0000380handle_application_exception(State = #state{protocol = Proto}) ->
381 {ok, Exception} = thrift_protocol:read(Proto,
382 ?TApplicationException_Structure),
383 ok = thrift_protocol:read(Proto, message_end),
David Reiss55ff70f2008-06-11 00:58:25 +0000384 XRecord = list_to_tuple(
385 ['TApplicationException' | tuple_to_list(Exception)]),
David Reiss1af18682008-06-11 01:01:36 +0000386 error_logger:error_msg("X: ~p~n", [XRecord]),
David Reiss55ff70f2008-06-11 00:58:25 +0000387 true = is_record(XRecord, 'TApplicationException'),
David Reissf32d0fb2010-08-30 22:05:00 +0000388 {exception, XRecord}.