From 5e530af5878ce7650e94ee662951b49358100984 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 4 Jun 2009 02:01:24 +0000 Subject: [PATCH] THRIFT-211. erlang: Support unlinked Thrift clients. - Create a thrift_client:start function that accepts client options. - Make start_link a wrapper that adds {monitor, link}. - Add a test to make sure that everything dies or doesn't die as expected. (The test has to be run manually.) git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@781634 13f79535-47bb-0310-9956-ffa450edef68 --- lib/erl/src/thrift_client.erl | 52 +++++++++++----- test/erl/Makefile | 2 +- test/erl/src/test_tether.erl | 114 ++++++++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 15 deletions(-) create mode 100644 test/erl/src/test_tether.erl diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl index 5ba8aee6..d70df344 100644 --- a/lib/erl/src/thrift_client.erl +++ b/lib/erl/src/thrift_client.erl @@ -22,7 +22,9 @@ -behaviour(gen_server). %% API --export([start_link/2, start_link/3, start_link/4, call/3, send_call/3, close/1]). +-export([start_link/2, start_link/3, start_link/4, + start/3, start/4, + call/3, send_call/3, close/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -39,11 +41,16 @@ %%==================================================================== %%-------------------------------------------------------------------- %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server +%% Description: Starts the server as a linked process. %%-------------------------------------------------------------------- start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) -> start_link(Host, Port, Service, []). +start_link(Host, Port, Service, Options) -> + start(Host, Port, Service, [{monitor, link} | Options]). + +start_link(ProtocolFactory, Service) -> + start(ProtocolFactory, Service, [{monitor, link}]). %% %% Splits client options into protocol options and transport options @@ -51,27 +58,36 @@ start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) -> %% split_options([Options...]) -> {ProtocolOptions, TransportOptions} %% split_options(Options) -> - split_options(Options, [], []). + split_options(Options, [], [], []). + +split_options([], ClientIn, ProtoIn, TransIn) -> + {ClientIn, ProtoIn, TransIn}; -split_options([], ProtoIn, TransIn) -> - {ProtoIn, TransIn}; +split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) + when OptKey =:= monitor -> + split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn); -split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn) +split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) when OptKey =:= strict_read; OptKey =:= strict_write -> - split_options(Rest, [Opt | ProtoIn], TransIn); + split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn); -split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn) +split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) when OptKey =:= framed; OptKey =:= connect_timeout; OptKey =:= sockopts -> - split_options(Rest, ProtoIn, [Opt | TransIn]). + split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]). +%%-------------------------------------------------------------------- +%% Function: start() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server as an unlinked process. +%%-------------------------------------------------------------------- + %% Backwards-compatible starter for the common-case of socket transports -start_link(Host, Port, Service, Options) +start(Host, Port, Service, Options) when is_integer(Port), is_atom(Service), is_list(Options) -> - {ProtoOpts, TransOpts} = split_options(Options), + {ClientOpts, ProtoOpts, TransOpts} = split_options(Options), {ok, TransportFactory} = thrift_socket_transport:new_transport_factory(Host, Port, TransOpts), @@ -79,13 +95,21 @@ start_link(Host, Port, Service, Options) {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( TransportFactory, ProtoOpts), - start_link(ProtocolFactory, Service). + start(ProtocolFactory, Service, ClientOpts). %% ProtocolFactory :: fun() -> thrift_protocol() -start_link(ProtocolFactory, Service) +start(ProtocolFactory, Service, ClientOpts) when is_function(ProtocolFactory), is_atom(Service) -> - case gen_server:start_link(?MODULE, [Service], []) of + Starter = + case lists:keysearch(monitor, 1, ClientOpts) of + {value, {monitor, link}} -> + start_link; + _ -> + start + end, + + case gen_server:Starter(?MODULE, [Service], []) of {ok, Pid} -> case gen_server:call(Pid, {connect, ProtocolFactory}) of ok -> diff --git a/test/erl/Makefile b/test/erl/Makefile index 17e30da6..21260372 100644 --- a/test/erl/Makefile +++ b/test/erl/Makefile @@ -29,7 +29,7 @@ SRCDIR=src ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR}) -MODULES = stress_server test_server test_disklog test_membuffer +MODULES = stress_server test_server test_disklog test_membuffer test_tether INCLUDES = TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES}) diff --git a/test/erl/src/test_tether.erl b/test/erl/src/test_tether.erl new file mode 100644 index 00000000..07d1ee99 --- /dev/null +++ b/test/erl/src/test_tether.erl @@ -0,0 +1,114 @@ +%% Tests the behavior of clients in the face of transport errors. +%% Makes sure start, start_linked, and start_tethered work as expected. + +-module(test_tether). + +-compile(export_all). + +t() -> + io:format("Starting.~n", []), + register(tester, self()), + + Pid1 = erlang:spawn(?MODULE, test_start, []), + receive after 200 -> ok end, % Wait for completion. + case is_up(Pid1) of + true -> + io:format("PASS. Unlinked owner still alive.~n"); + false -> + io:format("FAIL. Unlinked owner is dead.~n") + end, + + Pid2 = erlang:spawn(?MODULE, test_linked, []), + receive after 200 -> ok end, % Wait for completion. + case is_up(Pid2) of + true -> + io:format("FAIL. Linked owner still alive.~n"); + false -> + io:format("PASS. Linked owner is dead.~n") + end, + + check_extras(2), + + erlang:halt(). + +is_up(Pid) -> + MonitorRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonitorRef, process, Pid, _Info} -> + false + after + 50 -> + erlang:demonitor(MonitorRef), + true + end. + +check_extras(0) -> ok; +check_extras(N) -> + receive + {client, Type, Pid} -> + case {Type, is_up(Pid)} of + {unlinked, true} -> + io:format("PASS. Unlinked client still alive.~n"); + {unlinked, false} -> + io:format("FAIL. Unlinked client dead.~n"); + {linked, true} -> + io:format("FAIL. Linked client still alive.~n"); + {linked, false} -> + io:format("PASS. Linked client dead.~n") + end, + check_extras(N-1) + after + 500 -> + io:format("FAIL. Expected ~p more clients.~n", [N]) + end. + +make_protocol_factory(Port) -> + {ok, TransportFactory} = + thrift_socket_transport:new_transport_factory( + "127.0.0.1", Port, []), + {ok, ProtocolFactory} = + thrift_binary_protocol:new_protocol_factory( + TransportFactory, []), + ProtocolFactory. + + +test_start() -> + {ok, Client1} = gen_server:start(thrift_client, [thriftTest_thrift], []), + tester ! {client, unlinked, Client1}, + {ok, Client2} = gen_server:start(thrift_client, [thriftTest_thrift], []), + io:format("PASS. Unlinked clients created.~n"), + try + gen_server:call(Client2, {connect, make_protocol_factory(2)}), + io:format("FAIL. Unlinked client connected.~n", []) + catch + Kind:Info -> + io:format("PASS. Caught unlinked error. ~p:~p~n", [Kind, Info]) + end, + receive after 100 -> + io:format("PASS. Still alive after unlinked death.~n"), + %% Hang around a little longer so our parent can verify. + receive after 200 -> ok end + end, + %% Exit abnormally to not kill our unlinked extra client. + exit(die). + +test_linked() -> + {ok, Client1} = gen_server:start_link(thrift_client, [thriftTest_thrift], []), + tester ! {client, linked, Client1}, + {ok, Client2} = gen_server:start_link(thrift_client, [thriftTest_thrift], []), + io:format("PASS. Linked clients created.~n"), + try + gen_server:call(Client2, {connect, make_protocol_factory(2)}), + io:format("FAIL. Linked client connected.~n", []) + catch + Kind:Info -> + io:format("FAIL. Caught linked error. ~p:~p~n", [Kind, Info]) + end, + receive after 100 -> + io:format("FAIL. Still alive after linked death.~n"), + % Hang around a little longer so our parent can verify. + receive after 200 -> ok end + end, + %% Exit abnormally to kill our linked extra client. + %% But we should never get here. + exit(die). -- 2.17.1