| %% | 
 | %% Licensed to the Apache Software Foundation (ASF) under one | 
 | %% or more contributor license agreements. See the NOTICE file | 
 | %% distributed with this work for additional information | 
 | %% regarding copyright ownership. The ASF licenses this file | 
 | %% to you under the Apache License, Version 2.0 (the | 
 | %% "License"); you may not use this file except in compliance | 
 | %% with the License. You may obtain a copy of the License at | 
 | %% | 
 | %%   http://www.apache.org/licenses/LICENSE-2.0 | 
 | %% | 
 | %% Unless required by applicable law or agreed to in writing, | 
 | %% software distributed under the License is distributed on an | 
 | %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | %% KIND, either express or implied. See the License for the | 
 | %% specific language governing permissions and limitations | 
 | %% under the License. | 
 | %% | 
 |  | 
 | -module(thrift_reconnecting_client). | 
 |  | 
 | -behaviour(gen_server). | 
 |  | 
 | %% API | 
 | -export([ call/3, | 
 |           get_stats/1, | 
 |           get_and_reset_stats/1 ]). | 
 |  | 
 | -export([ start_link/6 ]). | 
 |  | 
 | %% gen_server callbacks | 
 | -export([ init/1, | 
 |           handle_call/3, | 
 |           handle_cast/2, | 
 |           handle_info/2, | 
 |           terminate/2, | 
 |           code_change/3 ]). | 
 |  | 
 | -record( state, { client = nil,  | 
 |                   host, | 
 |                   port, | 
 |                   thrift_svc, | 
 |                   thrift_opts, | 
 |                   reconn_min, | 
 |                   reconn_max, | 
 |                   reconn_time, | 
 |                   op_cnt_dict, | 
 |                   op_time_dict } ). | 
 |  | 
 | %%==================================================================== | 
 | %% API | 
 | %%==================================================================== | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} | 
 | %% Description: Starts the server | 
 | %%-------------------------------------------------------------------- | 
 | start_link( Host, Port, | 
 |             ThriftSvc, ThriftOpts, | 
 |             ReconnMin, ReconnMax ) -> | 
 |   gen_server:start_link( ?MODULE, | 
 |                          [ Host, Port, | 
 |                            ThriftSvc, ThriftOpts, | 
 |                            ReconnMin, ReconnMax ], | 
 |                          [] ). | 
 |  | 
 | call( Pid, Op, Args ) -> | 
 |   gen_server:call( Pid, { call, Op, Args } ). | 
 |  | 
 | get_stats( Pid ) -> | 
 |   gen_server:call( Pid, get_stats ). | 
 |  | 
 | get_and_reset_stats( Pid ) -> | 
 |   gen_server:call( Pid, get_and_reset_stats ). | 
 |  | 
 | %%==================================================================== | 
 | %% gen_server callbacks | 
 | %%==================================================================== | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: init(Args) -> {ok, State} | | 
 | %%                         {ok, State, Timeout} | | 
 | %%                         ignore               | | 
 | %%                         {stop, Reason} | 
 | %% Description: Start the server. | 
 | %%-------------------------------------------------------------------- | 
 | init( [ Host, Port, TSvc, TOpts, ReconnMin, ReconnMax ] ) -> | 
 |   process_flag( trap_exit, true ), | 
 |  | 
 |   State = #state{ host         = Host, | 
 |                   port         = Port, | 
 |                   thrift_svc   = TSvc, | 
 |                   thrift_opts  = TOpts, | 
 |                   reconn_min   = ReconnMin, | 
 |                   reconn_max   = ReconnMax, | 
 |                   op_cnt_dict  = dict:new(), | 
 |                   op_time_dict = dict:new() }, | 
 |  | 
 |   { ok, try_connect( State ) }. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | | 
 | %%                                                   {reply, Reply, State, Timeout} | | 
 | %%                                                   {noreply, State} | | 
 | %%                                                   {noreply, State, Timeout} | | 
 | %%                                                   {stop, Reason, Reply, State} | | 
 | %%                                                   {stop, Reason, State} | 
 | %% Description: Handling call messages | 
 | %%-------------------------------------------------------------------- | 
 | handle_call( { call, Op, _ }, | 
 |              _From, | 
 |              State = #state{ client = nil } ) -> | 
 |   { reply, { error, noconn }, incr_stats( Op, "failfast", 1, State ) }; | 
 |  | 
 | handle_call( { call, Op, Args }, | 
 |              _From, | 
 |              State=#state{ client = Client } ) -> | 
 |  | 
 |   Start = now(), | 
 |   Result = ( catch thrift_client:call( Client, Op, Args) ), | 
 |   Time = timer:now_diff( now(), Start ), | 
 |  | 
 |   case Result of | 
 |     { C, { ok, Reply } } -> | 
 |       S = incr_stats( Op, "success", Time, State#state{ client = C } ), | 
 |       { reply, {ok, Reply }, S }; | 
 |     { _, { E, Msg } } when E == error; E == exception -> | 
 |       S = incr_stats( Op, "error", Time, try_connect( State ) ), | 
 |       { reply, { E, Msg }, S }; | 
 |     Other -> | 
 |       S = incr_stats( Op, "error", Time, try_connect( State ) ), | 
 |       { reply, Other, S } | 
 |   end; | 
 |  | 
 | handle_call( get_stats, | 
 |              _From, | 
 |              State = #state{} ) -> | 
 |   { reply, stats( State ), State }; | 
 |  | 
 | handle_call( get_and_reset_stats, | 
 |              _From, | 
 |              State = #state{} ) -> | 
 |   { reply, stats( State ), reset_stats( State ) }. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: handle_cast(Msg, State) -> {noreply, State} | | 
 | %%                                      {noreply, State, Timeout} | | 
 | %%                                      {stop, Reason, State} | 
 | %% Description: Handling cast messages | 
 | %%-------------------------------------------------------------------- | 
 | handle_cast( _Msg, State ) -> | 
 |   { noreply, State }. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: handle_info(Info, State) -> {noreply, State} | | 
 | %%                                       {noreply, State, Timeout} | | 
 | %%                                       {stop, Reason, State} | 
 | %% Description: Handling all non call/cast messages | 
 | %%-------------------------------------------------------------------- | 
 | handle_info( _Info, State ) -> | 
 |   { noreply, State }. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: terminate(Reason, State) -> void() | 
 | %% Description: This function is called by a gen_server when it is about to | 
 | %% terminate. It should be the opposite of Module:init/1 and do any necessary | 
 | %% cleaning up. When it returns, the gen_server terminates with Reason. | 
 | %% The return value is ignored. | 
 | %%-------------------------------------------------------------------- | 
 | terminate( _Reason, #state{ client = Client } ) -> | 
 |   thrift_client:close( Client ), | 
 |   ok. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} | 
 | %% Description: Convert process state when code is changed | 
 | %%-------------------------------------------------------------------- | 
 | code_change( _OldVsn, State, _Extra ) -> | 
 |   { ok, State }. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %%% Internal functions | 
 | %%-------------------------------------------------------------------- | 
 | try_connect( State = #state{ client      = OldClient, | 
 |                              host        = Host, | 
 |                              port        = Port, | 
 |                              thrift_svc  = TSvc, | 
 |                              thrift_opts = TOpts } ) -> | 
 |  | 
 |   case OldClient of | 
 |     nil -> ok; | 
 |     _   -> ( catch thrift_client:close( OldClient ) ) | 
 |   end, | 
 |  | 
 |   case catch thrift_client_util:new( Host, Port, TSvc, TOpts ) of | 
 |     { ok, Client } -> | 
 |       State#state{ client = Client, reconn_time = 0 }; | 
 |     { E, Msg } when E == error; E == exception -> | 
 |       ReconnTime = reconn_time( State ), | 
 |       error_logger:error_msg( "[~w] ~w connect failed (~w), trying again in ~w ms~n", | 
 |                               [ self(), TSvc, Msg, ReconnTime ] ), | 
 |       erlang:send_after( ReconnTime, self(), try_connect ), | 
 |       State#state{ client = nil, reconn_time = ReconnTime } | 
 |   end. | 
 |  | 
 |  | 
 | reconn_time( #state{ reconn_min = ReconnMin, reconn_time = 0 } ) -> | 
 |   ReconnMin; | 
 | reconn_time( #state{ reconn_max = ReconnMax, reconn_time = ReconnMax } ) -> | 
 |   ReconnMax; | 
 | reconn_time( #state{ reconn_max = ReconnMax, reconn_time = R } ) -> | 
 |   Backoff = 2 * R, | 
 |   case Backoff > ReconnMax of | 
 |     true  -> ReconnMax; | 
 |     false -> Backoff | 
 |   end. | 
 |  | 
 |  | 
 | incr_stats( Op, Result, Time, | 
 |             State = #state{ op_cnt_dict  = OpCntDict, | 
 |                             op_time_dict = OpTimeDict } ) -> | 
 |   Key = lists:flatten( [ atom_to_list( Op ), [ "_" | Result ] ] ), | 
 |   State#state{ op_cnt_dict  = dict:update_counter( Key, 1, OpCntDict ), | 
 |                op_time_dict = dict:update_counter( Key, Time, OpTimeDict ) }. | 
 |  | 
 |  | 
 | stats( #state{ thrift_svc   = TSvc, | 
 |                op_cnt_dict  = OpCntDict, | 
 |                op_time_dict = OpTimeDict } ) -> | 
 |   Svc = atom_to_list(TSvc), | 
 |  | 
 |   F = fun( Key, Count, Stats ) -> | 
 |         Name = lists:flatten( [ Svc, [ "_" | Key ] ] ), | 
 |         Micros = dict:fetch( Key, OpTimeDict ), | 
 |         [ { Name, Count, Micros } | Stats ] | 
 |       end, | 
 |  | 
 |   dict:fold( F, [], OpCntDict ). | 
 |  | 
 | reset_stats( State = #state{} ) -> | 
 |   State#state{ op_cnt_dict = dict:new(), op_time_dict = dict:new() }. |