From ad74b320591c087690e4c43f5ff3bdb34aa98a26 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 11 Jun 2008 01:03:29 +0000 Subject: [PATCH] Modify thrift_client to take in a "connector" function as a parameter, enabling substitution of different protocol/transports Summary: Left in a backwards-compatible start_link with (Host, Port, Service) args Test plan: tutorial/alterl still works Notes: We may want to go a little further and get rid of the binary_protocol specific stuff from socket_transport as well git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666460 13f79535-47bb-0310-9956-ffa450edef68 --- lib/alterl/src/thrift_client.erl | 71 ++++++---------------- lib/alterl/src/thrift_socket_transport.erl | 42 ++++++++++++- 2 files changed, 61 insertions(+), 52 deletions(-) diff --git a/lib/alterl/src/thrift_client.erl b/lib/alterl/src/thrift_client.erl index dee91fb2..a2cc56b3 100644 --- a/lib/alterl/src/thrift_client.erl +++ b/lib/alterl/src/thrift_client.erl @@ -10,7 +10,7 @@ -behaviour(gen_server). %% API --export([start_link/3, start_link/4, call/3, close/1]). +-export([start_link/2, start_link/3, start_link/4, call/3, close/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -20,13 +20,7 @@ -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(state, {service, protocol, seqid, - strict_read = true, - strict_write = true, - framed = false, - connect_timeout = infinity, - sockopts = [] - }). +-record(state, {service, protocol, seqid}). %%==================================================================== %% API @@ -35,14 +29,21 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- -start_link(Host, Port, Service) -> +start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) -> start_link(Host, Port, Service, []). +%% Backwards-compatible starter for the usual case of socket transports start_link(Host, Port, Service, Options) when is_integer(Port), is_atom(Service), is_list(Options) -> - case gen_server:start_link(?MODULE, [Options], []) of + {ok, Connector} = thrift_socket_transport:new_connector(Host, Port, Options), + start_link(Connector, Service). + +%% Connector :: fun() -> thrift_protocol() +start_link(Connector, Service) + when is_function(Connector), is_atom(Service) -> + case gen_server:start_link(?MODULE, [Service], []) of {ok, Pid} -> - case gen_server:call(Pid, {connect, Host, Port, Service}) of + case gen_server:call(Pid, {connect, Connector}) of ok -> {ok, Pid}; Error -> @@ -74,22 +75,8 @@ close(Client) when is_pid(Client) -> %% {stop, Reason} %% Description: Initiates the server %%-------------------------------------------------------------------- -init([Options]) -> - State = parse_options(Options, #state{}), - {ok, State}. - -parse_options([], State) -> - State; -parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) -> - parse_options(Rest, State#state{strict_read=Bool}); -parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) -> - parse_options(Rest, State#state{strict_write=Bool}); -parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) -> - parse_options(Rest, State#state{framed=Bool}); -parse_options([{sockopts, OptList} | Rest], State) when is_list(OptList) -> - parse_options(Rest, State#state{sockopts=OptList}); -parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; is_integer(TO) -> - parse_options(Rest, State#state{connect_timeout=TO}). +init([Service]) -> + {ok, #state{service = Service}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -100,30 +87,12 @@ parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; is_in %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call({connect, Host, Port, Service}, _From, - State = #state{connect_timeout=Timeout, - sockopts=SockOpts}) -> - Options = [binary, - {packet, 0}, - {active, false}, - {nodelay, true} - | SockOpts - ], - case catch gen_tcp:connect(Host, Port, Options, Timeout) of - {ok, Sock} -> - {ok, Transport} = thrift_socket_transport:new(Sock), - {ok, BufTransport} = - case State#state.framed of - true -> thrift_framed_transport:new(Transport); - false -> thrift_buffered_transport:new(Transport) - end, - {ok, Protocol} = thrift_binary_protocol:new(BufTransport, - [{strict_read, State#state.strict_read}, - {strict_write, State#state.strict_write}]), - - {reply, ok, State#state{service = Service, - protocol = Protocol, - seqid = 0}}; +handle_call({connect, Connector}, _From, + State = #state{service = Service}) -> + case Connector() of + {ok, Protocol} -> + {reply, ok, State#state{protocol = Protocol, + seqid = 0}}; Error -> {stop, normal, Error, State} end; diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl index 9cc0af9d..bdae28b9 100644 --- a/lib/alterl/src/thrift_socket_transport.erl +++ b/lib/alterl/src/thrift_socket_transport.erl @@ -4,7 +4,9 @@ -export([new/1, new/2, - write/2, read/2, flush/1, close/1]). + write/2, read/2, flush/1, close/1, + + new_connector/3]). -record(data, {socket, recv_timeout=infinity}). @@ -43,3 +45,41 @@ flush(_) -> close(#data{socket = Socket}) -> gen_tcp:close(Socket). + + +%%%% CONNECTOR GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% +%% Generates a "connector" function - a fun which returns a Protocol instance. +%% This can be passed to thrift_client:start_link in order to connect to a +%% server over a socket. +%% +new_connector(Host, Port, Options) -> + ConnectTimeout = proplists:get_value(connect_timeout, Options, infinity), + InSockOpts = proplists:get_value(sockopts, Options, []), + Framed = proplists:get_value(framed, Options, false), + StrictRead = proplists:get_value(strict_read, Options, true), + StrictWrite = proplists:get_value(strict_write, Options, true), + + F = fun() -> + SockOpts = [binary, + {packet, 0}, + {active, false}, + {nodelay, true} | + InSockOpts], + case catch gen_tcp:connect(Host, Port, SockOpts, ConnectTimeout) of + {ok, Sock} -> + {ok, Transport} = thrift_socket_transport:new(Sock), + {ok, BufTransport} = + case Framed of + true -> thrift_framed_transport:new(Transport); + false -> thrift_buffered_transport:new(Transport) + end, + thrift_binary_protocol:new(BufTransport, + [{strict_read, StrictRead}, + {strict_write, StrictWrite}]); + Error -> + Error + end + end, + {ok, F}. -- 2.17.1