From: David Reiss Date: Mon, 30 Aug 2010 22:05:00 +0000 (+0000) Subject: Rollback a few recent Erlang changes to fix blame data X-Git-Tag: 0.5.0~148 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=f32d0fb90085009cd53efd402f21d6fe6dcde492;p=common%2Fthrift.git Rollback a few recent Erlang changes to fix blame data My combined patch for THRIFT-599 was committed, but it is preferable commit the individual patches to preserve the more detailed log and blame data. I'll recommit r987018 as a sequence of patches and r988722 as its own rev. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990957 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/compiler/cpp/src/generate/t_erl_generator.cc b/compiler/cpp/src/generate/t_erl_generator.cc index a5b6b9cd..30003ab0 100644 --- a/compiler/cpp/src/generate/t_erl_generator.cc +++ b/compiler/cpp/src/generate/t_erl_generator.cc @@ -649,8 +649,8 @@ void t_erl_generator::generate_service_interface(t_service* tservice) { << "_thrift:function_info(Function, InfoType)." << endl; indent_down(); } else { - // Use a special return code for nonexistent functions - indent(f_service_) << "function_info(_Func, _Info) -> no_function." << endl; + // Dummy function_info so we don't worry about the ;s + indent(f_service_) << "function_info(xxx, dummy) -> dummy." << endl; } indent(f_service_) << endl; diff --git a/lib/erl/README b/lib/erl/README index 667c549c..ddb6946f 100644 --- a/lib/erl/README +++ b/lib/erl/README @@ -25,19 +25,32 @@ Example Example session using thrift_client: -1> {ok, C0} = thrift_client_util:new("localhost", 9090, thriftTest_thrift, []), ok. -ok -2> {C1, R1} = thrift_client:call(C0, testVoid, []), R1. +118> f(), {ok, C} = thrift_client:start_link("localhost", 9090, thriftTest_thrif +t). +{ok,<0.271.0>} +119> thrift_client:call(C, testVoid, []). {ok,ok} -3> {C2, R2} = thrift_client:call(C1, testVoid, [asdf]), R2. +120> thrift_client:call(C, testVoid, [asdf]). {error,{bad_args,testVoid,[asdf]}} -4> {C3, R3} = thrift_client:call(C2, testI32, [123]), R3. +121> thrift_client:call(C, testI32, [123]). {ok,123} -5> {C4, R4} = thrift_client:call(C3, testOneway, [1]), R4. +122> thrift_client:call(C, testOneway, [1]). {ok,ok} -6> {C5, R5} = thrift_client:call(C4, testXception, ["foo"]), R5. +123> catch thrift_client:call(C, testXception, ["foo"]). {error,{no_function,testXception}} -7> {C6, R6} = thrift_client:call(C5, testException, ["foo"]), R6. +124> catch thrift_client:call(C, testException, ["foo"]). {ok,ok} -8> {C7, R7} = (catch thrift_client:call(C6, testException, ["Xception"])), R7. -{exception,{xception,1001,<<"Xception">>}} +125> catch thrift_client:call(C, testException, ["Xception"]). +{xception,1001,"This is an Xception"} +126> thrift_client:call(C, testException, ["Xception"]). + +=ERROR REPORT==== 24-Feb-2008::23:00:23 === +Error in process <0.269.0> with exit value: {{nocatch,{xception,1001,"This is an + Xception"}},[{thrift_client,call,3},{erl_eval,do_apply,5},{shell,exprs,6},{shel +l,eval_loop,3}]} + +** exited: {{nocatch,{xception,1001,"This is an Xception"}}, + [{thrift_client,call,3}, + {erl_eval,do_apply,5}, + {shell,exprs,6}, + {shell,eval_loop,3}]} ** diff --git a/lib/erl/build/otp.mk b/lib/erl/build/otp.mk index 0e0381ec..1d16e2c8 100644 --- a/lib/erl/build/otp.mk +++ b/lib/erl/build/otp.mk @@ -25,6 +25,7 @@ OS_TYPE=${shell uname} # MHOST is the host where this Makefile runs. MHOST=${shell hostname -s} +ERL_COMPILE_FLAGS+=-W0 # The location of the erlang runtime system. ifndef ERL_RUN_TOP diff --git a/lib/erl/include/thrift_protocol.hrl b/lib/erl/include/thrift_protocol.hrl index f85f4552..f4e1901f 100644 --- a/lib/erl/include/thrift_protocol.hrl +++ b/lib/erl/include/thrift_protocol.hrl @@ -18,7 +18,7 @@ %% -ifndef(THRIFT_PROTOCOL_INCLUDED). --define(THRIFT_PROTOCOL_INCLUDED, true). +-define(THRIFT_PROTOCOL_INCLUDED, yea). -record(protocol_message_begin, {name, type, seqid}). -record(protocol_struct_begin, {name}). @@ -27,40 +27,5 @@ -record(protocol_list_begin, {etype, size}). -record(protocol_set_begin, {etype, size}). --type tprot_header_val() :: #protocol_message_begin{} - | #protocol_struct_begin{} - | #protocol_field_begin{} - | #protocol_map_begin{} - | #protocol_list_begin{} - | #protocol_set_begin{} - . --type tprot_empty_tag() :: message_end - | struct_begin - | struct_end - | field_end - | map_end - | list_end - | set_end - . --type tprot_header_tag() :: message_begin - | field_begin - | map_begin - | list_begin - | set_begin - . --type tprot_data_tag() :: ui32 - | bool - | byte - | i16 - | i32 - | i64 - | double - | string - . --type tprot_cont_tag() :: {list, _Type} - | {map, _KType, _VType} - | {set, _Type} - . - -endif. diff --git a/lib/erl/src/Makefile b/lib/erl/src/Makefile index 78af14f6..980af812 100644 --- a/lib/erl/src/Makefile +++ b/lib/erl/src/Makefile @@ -27,7 +27,6 @@ INSTALL_DST = $(ERLANG_OTP)/lib/$(APP_NAME)-$(VSN) MODULES = $(shell find . -name \*.erl | sed 's:^\./::' | sed 's/\.erl//') MODULES_STRING_LIST = $(shell find . -name \*.erl | sed 's:^\./:":' | sed 's/\.erl/",/') -BEHAV_MODULES = $(shell find . -name \*.erl | xargs grep -l behaviour_info | sed 's:^\./::' | sed 's/\.erl//') HRL_FILES= INTERNAL_HRL_FILES= $(APP_NAME).hrl @@ -44,8 +43,7 @@ APP_TARGET= $(EBIN)/$(APP_FILE) APPUP_TARGET= $(EBIN)/$(APPUP_FILE) BEAMS= $(MODULES:%=$(EBIN)/%.$(EMULATOR)) -BEHAV_BEAMS= $(BEHAV_MODULES:%=$(EBIN)/%.$(EMULATOR)) -TARGET_FILES= $(BEHAV_BEAMS) $(BEAMS) $(APP_TARGET) $(APPUP_TARGET) +TARGET_FILES= $(BEAMS) $(APP_TARGET) $(APPUP_TARGET) WEB_TARGET=/var/yaws/www/$(APP_NAME) @@ -55,8 +53,7 @@ WEB_TARGET=/var/yaws/www/$(APP_NAME) ERL_FLAGS += ERL_INCLUDE = -I../include -I../../fslib/include -I../../system_status/include -ERL_BEHAV_PATH = -pz ../ebin -ERL_COMPILE_FLAGS += $(ERL_INCLUDE) $(ERL_BEHAV_PATH) +ERL_COMPILE_FLAGS += $(ERL_INCLUDE) # ---------------------------------------------------- # Targets diff --git a/lib/erl/include/thrift_transport_behaviour.hrl b/lib/erl/src/test_handler.erl similarity index 64% rename from lib/erl/include/thrift_transport_behaviour.hrl rename to lib/erl/src/test_handler.erl index dbc05aac..28a3acd3 100644 --- a/lib/erl/include/thrift_transport_behaviour.hrl +++ b/lib/erl/src/test_handler.erl @@ -17,15 +17,10 @@ %% under the License. %% -%% Signature specifications for transport implementations. +-module(test_handler). --ifndef(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED). --define(THRIFT_TRANSPORT_BEHAVIOUR_INCLUDED, true). +-export([handle_function/2]). --spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}. --spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}. --spec flush(state()) -> {state(), ok | {error, _Reason}}. --spec close(state()) -> {state(), ok | {error, _Reason}}. - - --endif. +handle_function(add, Params = {A, B}) -> + io:format("Got params: ~p~n", [Params]), + {reply, A + B}. diff --git a/lib/erl/include/thrift_protocol_behaviour.hrl b/lib/erl/src/test_service.erl similarity index 52% rename from lib/erl/include/thrift_protocol_behaviour.hrl rename to lib/erl/src/test_service.erl index abe300b1..7aa4827f 100644 --- a/lib/erl/include/thrift_protocol_behaviour.hrl +++ b/lib/erl/src/test_service.erl @@ -17,21 +17,13 @@ %% under the License. %% -%% Signature specifications for protocol implementations. +-module(test_service). +% +% Test service definition --ifndef(THRIFT_PROTOCOL_BEHAVIOUR_INCLUDED). --define(THRIFT_PROTOCOL_BEHAVIOUR_INCLUDED, true). +-export([function_info/2]). --spec flush_transport(state()) -> {state(), ok | {error, _Reason}}. --spec close_transport(state()) -> {state(), ok | {error, _Reason}}. - --spec write(state(), any()) -> {state(), ok | {error, _Reason}}. - -%% NOTE: Keep this in sync with thrift_protocol:read and read_specific. --spec read - (state(), tprot_empty_tag()) -> {state(), ok | {error, _Reason}}; - (state(), tprot_header_tag()) -> {state(), tprot_header_val() | {error, _Reason}}; - (state(), tprot_data_tag()) -> {state(), {ok, any()} | {error, _Reason}}. - - --endif. +function_info(add, params_type) -> + {struct, [{1, i32}, + {2, i32}]}; +function_info(add, reply_type) -> i32. diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl index d31f2bac..9d13151c 100644 --- a/lib/erl/src/thrift_base64_transport.erl +++ b/lib/erl/src/thrift_base64_transport.erl @@ -29,35 +29,30 @@ %% State -record(b64_transport, {wrapped}). --type state() :: #b64_transport{}. --include("thrift_transport_behaviour.hrl"). new(Wrapped) -> State = #b64_transport{wrapped = Wrapped}, thrift_transport:new(?MODULE, State). -write(This = #b64_transport{wrapped = Wrapped}, Data) -> - {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))), - {This#b64_transport{wrapped = NewWrapped}, Result}. +write(#b64_transport{wrapped = Wrapped}, Data) -> + thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))). %% base64 doesn't support reading quite yet since it would involve %% nasty buffering and such -read(This = #b64_transport{}, _Data) -> - {This, {error, no_reads_allowed}}. +read(#b64_transport{wrapped = Wrapped}, Data) -> + {error, no_reads_allowed}. -flush(This = #b64_transport{wrapped = Wrapped0}) -> - {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>), - {Wrapped2, ok} = thrift_transport:flush(Wrapped1), - {This#b64_transport{wrapped = Wrapped2}, ok}. +flush(#b64_transport{wrapped = Wrapped}) -> + thrift_transport:write(Wrapped, <<"\n">>), + thrift_transport:flush(Wrapped). -close(This0) -> - {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0), - {NewWrapped, ok} = thrift_transport:close(Wrapped), - {This1#b64_transport{wrapped = NewWrapped}, ok}. +close(Me = #b64_transport{wrapped = Wrapped}) -> + flush(Me), + thrift_transport:close(Wrapped). %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl index 800fd8ea..ad533842 100644 --- a/lib/erl/src/thrift_binary_protocol.erl +++ b/lib/erl/src/thrift_binary_protocol.erl @@ -19,7 +19,7 @@ -module(thrift_binary_protocol). --behaviour(thrift_protocol). +-behavior(thrift_protocol). -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). @@ -37,8 +37,6 @@ strict_read=true, strict_write=true }). --type state() :: #binary_protocol{}. --include("thrift_protocol_behaviour.hrl"). -define(VERSION_MASK, 16#FFFF0000). -define(VERSION_1, 16#80010000). @@ -60,81 +58,79 @@ parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) -> parse_options(Rest, State#binary_protocol{strict_write=Bool}). -flush_transport(This = #binary_protocol{transport = Transport}) -> - {NewTransport, Result} = thrift_transport:flush(Transport), - {This#binary_protocol{transport = NewTransport}, Result}. +flush_transport(#binary_protocol{transport = Transport}) -> + thrift_transport:flush(Transport). -close_transport(This = #binary_protocol{transport = Transport}) -> - {NewTransport, Result} = thrift_transport:close(Transport), - {This#binary_protocol{transport = NewTransport}, Result}. +close_transport(#binary_protocol{transport = Transport}) -> + thrift_transport:close(Transport). %%% %%% instance methods %%% -write(This0, #protocol_message_begin{ +write(This, #protocol_message_begin{ name = Name, type = Type, seqid = Seqid}) -> - case This0#binary_protocol.strict_write of + case This#binary_protocol.strict_write of true -> - {This1, ok} = write(This0, {i32, ?VERSION_1 bor Type}), - {This2, ok} = write(This1, {string, Name}), - {This3, ok} = write(This2, {i32, Seqid}), - {This3, ok}; + write(This, {i32, ?VERSION_1 bor Type}), + write(This, {string, Name}), + write(This, {i32, Seqid}); false -> - {This1, ok} = write(This0, {string, Name}), - {This2, ok} = write(This1, {byte, Type}), - {This3, ok} = write(This2, {i32, Seqid}), - {This3, ok} - end; + write(This, {string, Name}), + write(This, {byte, Type}), + write(This, {i32, Seqid}) + end, + ok; -write(This, message_end) -> {This, ok}; +write(This, message_end) -> ok; -write(This0, #protocol_field_begin{ +write(This, #protocol_field_begin{ name = _Name, type = Type, id = Id}) -> - {This1, ok} = write(This0, {byte, Type}), - {This2, ok} = write(This1, {i16, Id}), - {This2, ok}; + write(This, {byte, Type}), + write(This, {i16, Id}), + ok; write(This, field_stop) -> - write(This, {byte, ?tType_STOP}); + write(This, {byte, ?tType_STOP}), + ok; -write(This, field_end) -> {This, ok}; +write(This, field_end) -> ok; -write(This0, #protocol_map_begin{ +write(This, #protocol_map_begin{ ktype = Ktype, vtype = Vtype, size = Size}) -> - {This1, ok} = write(This0, {byte, Ktype}), - {This2, ok} = write(This1, {byte, Vtype}), - {This3, ok} = write(This2, {i32, Size}), - {This3, ok}; + write(This, {byte, Ktype}), + write(This, {byte, Vtype}), + write(This, {i32, Size}), + ok; -write(This, map_end) -> {This, ok}; +write(This, map_end) -> ok; -write(This0, #protocol_list_begin{ +write(This, #protocol_list_begin{ etype = Etype, size = Size}) -> - {This1, ok} = write(This0, {byte, Etype}), - {This2, ok} = write(This1, {i32, Size}), - {This2, ok}; + write(This, {byte, Etype}), + write(This, {i32, Size}), + ok; -write(This, list_end) -> {This, ok}; +write(This, list_end) -> ok; -write(This0, #protocol_set_begin{ +write(This, #protocol_set_begin{ etype = Etype, size = Size}) -> - {This1, ok} = write(This0, {byte, Etype}), - {This2, ok} = write(This1, {i32, Size}), - {This2, ok}; + write(This, {byte, Etype}), + write(This, {i32, Size}), + ok; -write(This, set_end) -> {This, ok}; +write(This, set_end) -> ok; -write(This, #protocol_struct_begin{}) -> {This, ok}; -write(This, struct_end) -> {This, ok}; +write(This, #protocol_struct_begin{}) -> ok; +write(This, struct_end) -> ok; write(This, {bool, true}) -> write(This, {byte, 1}); write(This, {bool, false}) -> write(This, {byte, 0}); @@ -154,166 +150,152 @@ write(This, {i64, I64}) -> write(This, {double, Double}) -> write(This, <>); -write(This0, {string, Str}) when is_list(Str) -> - {This1, ok} = write(This0, {i32, length(Str)}), - {This2, ok} = write(This1, list_to_binary(Str)), - {This2, ok}; +write(This, {string, Str}) when is_list(Str) -> + write(This, {i32, length(Str)}), + write(This, list_to_binary(Str)); -write(This0, {string, Bin}) when is_binary(Bin) -> - {This1, ok} = write(This0, {i32, size(Bin)}), - {This2, ok} = write(This1, Bin), - {This2, ok}; +write(This, {string, Bin}) when is_binary(Bin) -> + write(This, {i32, size(Bin)}), + write(This, Bin); %% Data :: iolist() -write(This = #binary_protocol{transport = Trans}, Data) -> - {NewTransport, Result} = thrift_transport:write(Trans, Data), - {This#binary_protocol{transport = NewTransport}, Result}. +write(This, Data) -> + thrift_transport:write(This#binary_protocol.transport, Data). %% -read(This0, message_begin) -> - {This1, Initial} = read(This0, ui32), - case Initial of +read(This, message_begin) -> + case read(This, ui32) of {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 -> %% we're at version 1 - {This2, {ok, Name}} = read(This1, string), - {This3, {ok, SeqId}} = read(This2, i32), - Type = Sz band ?TYPE_MASK, - {This3, #protocol_message_begin{name = binary_to_list(Name), - type = Type, - seqid = SeqId}}; + {ok, Name} = read(This, string), + Type = Sz band ?TYPE_MASK, + {ok, SeqId} = read(This, i32), + #protocol_message_begin{name = binary_to_list(Name), + type = Type, + seqid = SeqId}; {ok, Sz} when Sz < 0 -> %% there's a version number but it's unexpected - {This1, {error, {bad_binary_protocol_version, Sz}}}; + {error, {bad_binary_protocol_version, Sz}}; - {ok, _Sz} when This1#binary_protocol.strict_read =:= true -> + {ok, Sz} when This#binary_protocol.strict_read =:= true -> %% strict_read is true and there's no version header; that's an error - {This1, {error, no_binary_protocol_version}}; + {error, no_binary_protocol_version}; - {ok, Sz} when This1#binary_protocol.strict_read =:= false -> + {ok, Sz} when This#binary_protocol.strict_read =:= false -> %% strict_read is false, so just read the old way - {This2, {ok, Name}} = read_data(This1, Sz), - {This3, {ok, Type}} = read(This2, byte), - {This4, {ok, SeqId}} = read(This3, i32), - {This4, #protocol_message_begin{name = binary_to_list(Name), - type = Type, - seqid = SeqId}}; - - Else -> - {This1, Else} + {ok, Name} = read(This, Sz), + {ok, Type} = read(This, byte), + {ok, SeqId} = read(This, i32), + #protocol_message_begin{name = binary_to_list(Name), + type = Type, + seqid = SeqId}; + + Err = {error, closed} -> Err; + Err = {error, timeout}-> Err; + Err = {error, ebadf} -> Err end; -read(This, message_end) -> {This, ok}; +read(This, message_end) -> ok; -read(This, struct_begin) -> {This, ok}; -read(This, struct_end) -> {This, ok}; +read(This, struct_begin) -> ok; +read(This, struct_end) -> ok; -read(This0, field_begin) -> - {This1, Result} = read(This0, byte), - case Result of +read(This, field_begin) -> + case read(This, byte) of {ok, Type = ?tType_STOP} -> - {This1, #protocol_field_begin{type = Type}}; + #protocol_field_begin{type = Type}; {ok, Type} -> - {This2, {ok, Id}} = read(This1, i16), - {This2, #protocol_field_begin{type = Type, - id = Id}} + {ok, Id} = read(This, i16), + #protocol_field_begin{type = Type, + id = Id} end; -read(This, field_end) -> {This, ok}; - -read(This0, map_begin) -> - {This1, {ok, Ktype}} = read(This0, byte), - {This2, {ok, Vtype}} = read(This1, byte), - {This3, {ok, Size}} = read(This2, i32), - {This3, #protocol_map_begin{ktype = Ktype, - vtype = Vtype, - size = Size}}; -read(This, map_end) -> {This, ok}; - -read(This0, list_begin) -> - {This1, {ok, Etype}} = read(This0, byte), - {This2, {ok, Size}} = read(This1, i32), - {This2, #protocol_list_begin{etype = Etype, - size = Size}}; -read(This, list_end) -> {This, ok}; - -read(This0, set_begin) -> - {This1, {ok, Etype}} = read(This0, byte), - {This2, {ok, Size}} = read(This1, i32), - {This2, #protocol_set_begin{etype = Etype, - size = Size}}; -read(This, set_end) -> {This, ok}; - -read(This0, field_stop) -> - {This1, {ok, ?tType_STOP}} = read(This0, byte), - {This1, ok}; +read(This, field_end) -> ok; + +read(This, map_begin) -> + {ok, Ktype} = read(This, byte), + {ok, Vtype} = read(This, byte), + {ok, Size} = read(This, i32), + #protocol_map_begin{ktype = Ktype, + vtype = Vtype, + size = Size}; +read(This, map_end) -> ok; + +read(This, list_begin) -> + {ok, Etype} = read(This, byte), + {ok, Size} = read(This, i32), + #protocol_list_begin{etype = Etype, + size = Size}; +read(This, list_end) -> ok; + +read(This, set_begin) -> + {ok, Etype} = read(This, byte), + {ok, Size} = read(This, i32), + #protocol_set_begin{etype = Etype, + size = Size}; +read(This, set_end) -> ok; + +read(This, field_stop) -> + {ok, ?tType_STOP} = read(This, byte), + ok; %% -read(This0, bool) -> - {This1, Result} = read(This0, byte), - case Result of - {ok, Byte} -> {This1, {ok, Byte /= 0}}; - Else -> {This1, Else} +read(This, bool) -> + case read(This, byte) of + {ok, Byte} -> {ok, Byte /= 0}; + Else -> Else end; -read(This0, byte) -> - {This1, Bytes} = read_data(This0, 1), - case Bytes of - {ok, <>} -> {This1, {ok, Val}}; - Else -> {This1, Else} +read(This, byte) -> + case read(This, 1) of + {ok, <>} -> {ok, Val}; + Else -> Else end; -read(This0, i16) -> - {This1, Bytes} = read_data(This0, 2), - case Bytes of - {ok, <>} -> {This1, {ok, Val}}; - Else -> {This1, Else} +read(This, i16) -> + case read(This, 2) of + {ok, <>} -> {ok, Val}; + Else -> Else end; -read(This0, i32) -> - {This1, Bytes} = read_data(This0, 4), - case Bytes of - {ok, <>} -> {This1, {ok, Val}}; - Else -> {This1, Else} +read(This, i32) -> + case read(This, 4) of + {ok, <>} -> {ok, Val}; + Else -> Else end; %% unsigned ints aren't used by thrift itself, but it's used for the parsing %% of the packet version header. Without this special function BEAM works fine %% but hipe thinks it received a bad version header. -read(This0, ui32) -> - {This1, Bytes} = read_data(This0, 4), - case Bytes of - {ok, <>} -> {This1, {ok, Val}}; - Else -> {This1, Else} +read(This, ui32) -> + case read(This, 4) of + {ok, <>} -> {ok, Val}; + Else -> Else end; -read(This0, i64) -> - {This1, Bytes} = read_data(This0, 8), - case Bytes of - {ok, <>} -> {This1, {ok, Val}}; - Else -> {This1, Else} +read(This, i64) -> + case read(This, 8) of + {ok, <>} -> {ok, Val}; + Else -> Else end; -read(This0, double) -> - {This1, Bytes} = read_data(This0, 8), - case Bytes of - {ok, <>} -> {This1, {ok, Val}}; - Else -> {This1, Else} +read(This, double) -> + case read(This, 8) of + {ok, <>} -> {ok, Val}; + Else -> Else end; % returns a binary directly, call binary_to_list if necessary -read(This0, string) -> - {This1, {ok, Sz}} = read(This0, i32), - read_data(This1, Sz). - --spec read_data(#binary_protocol{}, non_neg_integer()) -> - {#binary_protocol{}, {ok, binary()} | {error, _Reason}}. -read_data(This, 0) -> {This, {ok, <<>>}}; -read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 -> - {NewTransport, Result} = thrift_transport:read(Trans, Len), - {This#binary_protocol{transport = NewTransport}, Result}. +read(This, string) -> + {ok, Sz} = read(This, i32), + {ok, Bin} = read(This, Sz); + +read(This, 0) -> {ok, <<>>}; +read(This, Len) when is_integer(Len), Len >= 0 -> + thrift_transport:read(This#binary_protocol.transport, Len). %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl index d4d614eb..ebc16bd6 100644 --- a/lib/erl/src/thrift_buffered_transport.erl +++ b/lib/erl/src/thrift_buffered_transport.erl @@ -19,51 +19,154 @@ -module(thrift_buffered_transport). +-behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/1, new_transport_factory/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). -record(buffered_transport, {wrapped, % a thrift_transport write_buffer % iolist() }). --type state() :: #buffered_transport{}. --include("thrift_transport_behaviour.hrl"). - +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- new(WrappedTransport) -> - State = #buffered_transport{wrapped = WrappedTransport, - write_buffer = []}, - thrift_transport:new(?MODULE, State). - - -%% Writes data into the buffer -write(State = #buffered_transport{write_buffer = WBuf}, Data) -> - {State#buffered_transport{write_buffer = [WBuf, Data]}, ok}. - -%% Flushes the buffer through to the wrapped transport -flush(State = #buffered_transport{write_buffer = WBuf, - wrapped = Wrapped0}) -> - {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf), - {Wrapped2, _} = thrift_transport:flush(Wrapped1), - NewState = State#buffered_transport{write_buffer = [], - wrapped = Wrapped2}, - {NewState, Response}. - -%% Closes the transport and the wrapped transport -close(State = #buffered_transport{wrapped = Wrapped0}) -> - {Wrapped1, Result} = thrift_transport:close(Wrapped0), - NewState = State#buffered_transport{wrapped = Wrapped1}, - {NewState, Result}. - -%% Reads data through from the wrapped transport -read(State = #buffered_transport{wrapped = Wrapped0}, Len) when is_integer(Len) -> - {Wrapped1, Response} = thrift_transport:read(Wrapped0, Len), - NewState = State#buffered_transport{wrapped = Wrapped1}, - {NewState, Response}. + case gen_server:start_link(?MODULE, [WrappedTransport], []) of + {ok, Pid} -> + thrift_transport:new(?MODULE, Pid); + Else -> + Else + end. + + + +%%-------------------------------------------------------------------- +%% Function: write(Transport, Data) -> ok +%% +%% Data = iolist() +%% +%% Description: Writes data into the buffer +%%-------------------------------------------------------------------- +write(Transport, Data) -> + gen_server:call(Transport, {write, Data}). + +%%-------------------------------------------------------------------- +%% Function: flush(Transport) -> ok +%% +%% Description: Flushes the buffer through to the wrapped transport +%%-------------------------------------------------------------------- +flush(Transport) -> + gen_server:call(Transport, flush). + +%%-------------------------------------------------------------------- +%% Function: close(Transport) -> ok +%% +%% Description: Closes the transport and the wrapped transport +%%-------------------------------------------------------------------- +close(Transport) -> + gen_server:cast(Transport, close). + +%%-------------------------------------------------------------------- +%% Function: Read(Transport, Len) -> {ok, Data} +%% +%% Data = binary() +%% +%% Description: Reads data through from the wrapped transoprt +%%-------------------------------------------------------------------- +read(Transport, Len) when is_integer(Len) -> + gen_server:call(Transport, {read, Len}, _Timeout=10000). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Wrapped]) -> + {ok, #buffered_transport{wrapped = Wrapped, + write_buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) -> + {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}}; + +handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) -> + Response = thrift_transport:read(Wrapped, Len), + {reply, Response, State}; + +handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf, + wrapped = Wrapped}) -> + Response = thrift_transport:write(Wrapped, WBuf), + thrift_transport:flush(Wrapped), + {reply, Response, State#buffered_transport{write_buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(close, State = #buffered_transport{write_buffer = WBuf, + wrapped = Wrapped}) -> + thrift_transport:write(Wrapped, WBuf), + %% Wrapped is closed by terminate/2 + %% error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]), + {stop, normal, State}; +handle_cast(Msg, State=#buffered_transport{}) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) -> + thrift_transport:close(Wrapped), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions diff --git a/lib/erl/src/thrift_client.erl b/lib/erl/src/thrift_client.erl index 5c74adc7..d5bb146a 100644 --- a/lib/erl/src/thrift_client.erl +++ b/lib/erl/src/thrift_client.erl @@ -19,127 +19,366 @@ -module(thrift_client). +-behaviour(gen_server). + %% API --export([new/2, call/3, send_call/3, close/1]). +-export([start_link/2, start_link/3, start_link/4, + start/3, start/4, + call/3, send_call/3, close/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(tclient, {service, protocol, seqid}). +-record(state, {service, protocol, seqid}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server as a linked process. +%%-------------------------------------------------------------------- +start_link(Host, Port, Service) when is_integer(Port), is_atom(Service) -> + start_link(Host, Port, Service, []). + +start_link(Host, Port, Service, Options) -> + start(Host, Port, Service, [{monitor, link} | Options]). + +start_link(ProtocolFactory, Service) -> + start(ProtocolFactory, Service, [{monitor, link}]). + +%% +%% Splits client options into protocol options and transport options +%% +%% split_options([Options...]) -> {ProtocolOptions, TransportOptions} +%% +split_options(Options) -> + split_options(Options, [], [], []). + +split_options([], ClientIn, ProtoIn, TransIn) -> + {ClientIn, ProtoIn, TransIn}; + +split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) + when OptKey =:= monitor -> + split_options(Rest, [Opt | ClientIn], ProtoIn, TransIn); + +split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) + when OptKey =:= strict_read; + OptKey =:= strict_write -> + split_options(Rest, ClientIn, [Opt | ProtoIn], TransIn); +split_options([Opt = {OptKey, _} | Rest], ClientIn, ProtoIn, TransIn) + when OptKey =:= framed; + OptKey =:= connect_timeout; + OptKey =:= sockopts -> + split_options(Rest, ClientIn, ProtoIn, [Opt | TransIn]). -new(Protocol, Service) - when is_atom(Service) -> - {ok, #tclient{protocol = Protocol, - service = Service, - seqid = 0}}. --spec call(#tclient{}, atom(), list()) -> {#tclient{}, {ok, any()} | {error, any()}}. -call(Client = #tclient{}, Function, Args) - when is_atom(Function), is_list(Args) -> - case send_function_call(Client, Function, Args) of - {Client1, ok} -> - receive_function_result(Client1, Function); - Else -> - Else +%%-------------------------------------------------------------------- +%% Function: start() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server as an unlinked process. +%%-------------------------------------------------------------------- + +%% Backwards-compatible starter for the common-case of socket transports +start(Host, Port, Service, Options) + when is_integer(Port), is_atom(Service), is_list(Options) -> + {ClientOpts, ProtoOpts, TransOpts} = split_options(Options), + + {ok, TransportFactory} = + thrift_socket_transport:new_transport_factory(Host, Port, TransOpts), + + {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( + TransportFactory, ProtoOpts), + + start(ProtocolFactory, Service, ClientOpts). + + +%% ProtocolFactory :: fun() -> thrift_protocol() +start(ProtocolFactory, Service, ClientOpts) + when is_function(ProtocolFactory), is_atom(Service) -> + {Starter, Opts} = + case lists:keysearch(monitor, 1, ClientOpts) of + {value, {monitor, link}} -> + {start_link, []}; + {value, {monitor, tether}} -> + {start, [{tether, self()}]}; + _ -> + {start, []} + end, + + Connect = + case lists:keysearch(connect, 1, ClientOpts) of + {value, {connect, Choice}} -> + Choice; + _ -> + %% By default, connect at creation-time. + true + end, + + + Started = gen_server:Starter(?MODULE, [Service, Opts], []), + + if + Connect -> + case Started of + {ok, Pid} -> + case gen_server:call(Pid, {connect, ProtocolFactory}) of + ok -> + {ok, Pid}; + Error -> + Error + end; + Else -> + Else + end; + true -> + Started end. +call(Client, Function, Args) + when is_pid(Client), is_atom(Function), is_list(Args) -> + case gen_server:call(Client, {call, Function, Args}) of + R = {ok, _} -> R; + R = {error, _} -> R; + {exception, Exception} -> throw(Exception) + end. + +cast(Client, Function, Args) + when is_pid(Client), is_atom(Function), is_list(Args) -> + gen_server:cast(Client, {call, Function, Args}). %% Sends a function call but does not read the result. This is useful %% if you're trying to log non-oneway function calls to write-only %% transports like thrift_disk_log_transport. --spec send_call(#tclient{}, atom(), list()) -> {#tclient{}, ok}. -send_call(Client = #tclient{}, Function, Args) - when is_atom(Function), is_list(Args) -> - send_function_call(Client, Function, Args). +send_call(Client, Function, Args) + when is_pid(Client), is_atom(Function), is_list(Args) -> + gen_server:call(Client, {send_call, Function, Args}). + +close(Client) when is_pid(Client) -> + gen_server:cast(Client, close). --spec close(#tclient{}) -> ok. -close(#tclient{protocol=Protocol}) -> - thrift_protocol:close_transport(Protocol). +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Service, Opts]) -> + case lists:keysearch(tether, 1, Opts) of + {value, {tether, Pid}} -> + erlang:monitor(process, Pid); + _Else -> + ok + end, + {ok, #state{service = Service}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({connect, ProtocolFactory}, _From, + State = #state{service = Service}) -> + case ProtocolFactory() of + {ok, Protocol} -> + {reply, ok, State#state{protocol = Protocol, + seqid = 0}}; + Error -> + {stop, normal, Error, State} + end; + +handle_call({call, Function, Args}, _From, State = #state{service = Service}) -> + Result = catch_function_exceptions( + fun() -> + ok = send_function_call(State, Function, Args), + receive_function_result(State, Function) + end, + Service), + {reply, Result, State}; + + +handle_call({send_call, Function, Args}, _From, State = #state{service = Service}) -> + Result = catch_function_exceptions( + fun() -> + send_function_call(State, Function, Args) + end, + Service), + {reply, Result, State}. + + +%% Helper function that catches exceptions thrown by sending or receiving +%% a function and returns the correct response for call or send_only above. +catch_function_exceptions(Fun, Service) -> + try + Fun() + catch + throw:{return, Return} -> + Return; + error:function_clause -> + ST = erlang:get_stacktrace(), + case hd(ST) of + {Service, function_info, [Function, _]} -> + {error, {no_function, Function}}; + _ -> throw({error, {function_clause, ST}}) + end + end. +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast({call, Function, Args}, State = #state{service = Service, + protocol = Protocol, + seqid = SeqId}) -> + _Result = + try + ok = send_function_call(State, Function, Args), + receive_function_result(State, Function) + catch + Class:Reason -> + error_logger:error_msg("error ignored in handle_cast({cast,...},...): ~p:~p~n", [Class, Reason]) + end, + + {noreply, State}; + +handle_cast(close, State=#state{protocol = Protocol}) -> +%% error_logger:info_msg("thrift_client ~p received close", [self()]), + {stop,normal,State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({'DOWN', MonitorRef, process, Pid, _Info}, State) + when is_reference(MonitorRef), is_pid(Pid) -> + %% We don't actually verify the correctness of the DOWN message. + {stop, parent_died, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(Reason, State = #state{protocol=undefined}) -> + ok; +terminate(Reason, State = #state{protocol=Protocol}) -> + thrift_protocol:close_transport(Protocol), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- --spec send_function_call(#tclient{}, atom(), list()) -> {#tclient{}, ok | {error, any()}}. -send_function_call(Client = #tclient{protocol = Proto0, - service = Service, - seqid = SeqId}, +send_function_call(#state{protocol = Proto, + service = Service, + seqid = SeqId}, Function, Args) -> Params = Service:function_info(Function, params_type), - case Params of - no_function -> - {Client, {error, {no_function, Function}}}; - {struct, PList} when length(PList) =/= length(Args) -> - {Client, {error, {bad_args, Function, Args}}}; - {struct, _PList} -> - Begin = #protocol_message_begin{name = atom_to_list(Function), - type = ?tMessageType_CALL, - seqid = SeqId}, - {Proto1, ok} = thrift_protocol:write(Proto0, Begin), - {Proto2, ok} = thrift_protocol:write(Proto1, {Params, list_to_tuple([Function | Args])}), - {Proto3, ok} = thrift_protocol:write(Proto2, message_end), - {Proto4, ok} = thrift_protocol:flush_transport(Proto3), - {Client#tclient{protocol = Proto4}, ok} - end. + {struct, PList} = Params, + if + length(PList) =/= length(Args) -> + throw({return, {error, {bad_args, Function, Args}}}); + true -> ok + end, --spec receive_function_result(#tclient{}, atom()) -> {#tclient{}, {ok, any()} | {error, any()}}. -receive_function_result(Client = #tclient{service = Service}, Function) -> + Begin = #protocol_message_begin{name = atom_to_list(Function), + type = ?tMessageType_CALL, + seqid = SeqId}, + ok = thrift_protocol:write(Proto, Begin), + ok = thrift_protocol:write(Proto, {Params, list_to_tuple([Function | Args])}), + ok = thrift_protocol:write(Proto, message_end), + thrift_protocol:flush_transport(Proto), + ok. + +receive_function_result(State = #state{protocol = Proto, + service = Service}, + Function) -> ResultType = Service:function_info(Function, reply_type), - read_result(Client, Function, ResultType). + read_result(State, Function, ResultType). -read_result(Client, _Function, oneway_void) -> - {Client, {ok, ok}}; +read_result(_State, + _Function, + oneway_void) -> + {ok, ok}; -read_result(Client = #tclient{protocol = Proto0, - seqid = SeqId}, +read_result(State = #state{protocol = Proto, + seqid = SeqId}, Function, ReplyType) -> - {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin), - NewClient = Client#tclient{protocol = Proto1}, - case MessageBegin of + case thrift_protocol:read(Proto, message_begin) of #protocol_message_begin{seqid = RetSeqId} when RetSeqId =/= SeqId -> - {NewClient, {error, {bad_seq_id, SeqId}}}; + {error, {bad_seq_id, SeqId}}; #protocol_message_begin{type = ?tMessageType_EXCEPTION} -> - handle_application_exception(NewClient); + handle_application_exception(State); #protocol_message_begin{type = ?tMessageType_REPLY} -> - handle_reply(NewClient, Function, ReplyType) + handle_reply(State, Function, ReplyType) end. - -handle_reply(Client = #tclient{protocol = Proto0, - service = Service}, +handle_reply(State = #state{protocol = Proto, + service = Service}, Function, ReplyType) -> {struct, ExceptionFields} = Service:function_info(Function, exceptions), ReplyStructDef = {struct, [{0, ReplyType}] ++ ExceptionFields}, - {Proto1, {ok, Reply}} = thrift_protocol:read(Proto0, ReplyStructDef), - {Proto2, ok} = thrift_protocol:read(Proto1, message_end), - NewClient = Client#tclient{protocol = Proto2}, + {ok, Reply} = thrift_protocol:read(Proto, ReplyStructDef), ReplyList = tuple_to_list(Reply), true = length(ReplyList) == length(ExceptionFields) + 1, ExceptionVals = tl(ReplyList), Thrown = [X || X <- ExceptionVals, X =/= undefined], - case Thrown of - [] when ReplyType == {struct, []} -> - {NewClient, {ok, ok}}; - [] -> - {NewClient, {ok, hd(ReplyList)}}; - [Exception] -> - throw({NewClient, {exception, Exception}}) - end. + Result = + case Thrown of + [] when ReplyType == {struct, []} -> + {ok, ok}; + [] -> + {ok, hd(ReplyList)}; + [Exception] -> + {exception, Exception} + end, + ok = thrift_protocol:read(Proto, message_end), + Result. -handle_application_exception(Client = #tclient{protocol = Proto0}) -> - {Proto1, {ok, Exception}} = - thrift_protocol:read(Proto0, ?TApplicationException_Structure), - {Proto2, ok} = thrift_protocol:read(Proto1, message_end), +handle_application_exception(State = #state{protocol = Proto}) -> + {ok, Exception} = thrift_protocol:read(Proto, + ?TApplicationException_Structure), + ok = thrift_protocol:read(Proto, message_end), XRecord = list_to_tuple( ['TApplicationException' | tuple_to_list(Exception)]), error_logger:error_msg("X: ~p~n", [XRecord]), true = is_record(XRecord, 'TApplicationException'), - NewClient = Client#tclient{protocol = Proto2}, - throw({NewClient, {exception, XRecord}}). + {exception, XRecord}. diff --git a/lib/erl/src/thrift_client_util.erl b/lib/erl/src/thrift_client_util.erl deleted file mode 100644 index c52bb8b6..00000000 --- a/lib/erl/src/thrift_client_util.erl +++ /dev/null @@ -1,61 +0,0 @@ -%% -%% Licensed to the Apache Software Foundation (ASF) under one -%% or more contributor license agreements. See the NOTICE file -%% distributed with this work for additional information -%% regarding copyright ownership. The ASF licenses this file -%% to you under the Apache License, Version 2.0 (the -%% "License"); you may not use this file except in compliance -%% with the License. You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% - --module(thrift_client_util). - --export([new/4]). - -%% -%% Splits client options into client, protocol, and transport options -%% -%% split_options([Options...]) -> {ProtocolOptions, TransportOptions} -%% -split_options(Options) -> - split_options(Options, [], []). - -split_options([], ProtoIn, TransIn) -> - {ProtoIn, TransIn}; - -split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn) - when OptKey =:= strict_read; - OptKey =:= strict_write -> - split_options(Rest, [Opt | ProtoIn], TransIn); - -split_options([Opt = {OptKey, _} | Rest], ProtoIn, TransIn) - when OptKey =:= framed; - OptKey =:= connect_timeout; - OptKey =:= sockopts -> - split_options(Rest, ProtoIn, [Opt | TransIn]). - - -%% Client constructor for the common-case of socket transports -%% with the binary protocol -new(Host, Port, Service, Options) - when is_integer(Port), is_atom(Service), is_list(Options) -> - {ProtoOpts, TransOpts} = split_options(Options), - - {ok, TransportFactory} = - thrift_socket_transport:new_transport_factory(Host, Port, TransOpts), - - {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( - TransportFactory, ProtoOpts), - - {ok, Protocol} = ProtocolFactory(), - - thrift_client:new(Protocol, Service). diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl index de8ee417..761fa309 100644 --- a/lib/erl/src/thrift_disk_log_transport.erl +++ b/lib/erl/src/thrift_disk_log_transport.erl @@ -35,8 +35,6 @@ close_on_close = false, sync_every = infinity, sync_tref}). --type state() :: #dl_transport{}. --include("thrift_transport_behaviour.hrl"). %% Create a transport attached to an already open log. @@ -49,7 +47,7 @@ new(LogName, Opts) when is_atom(LogName), is_list(Opts) -> State2 = case State#dl_transport.sync_every of N when is_integer(N), N > 0 -> - {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, [State]), + {ok, TRef} = timer:apply_interval(N, ?MODULE, force_flush, State), State#dl_transport{sync_tref = TRef}; _ -> State end, @@ -60,41 +58,38 @@ new(LogName, Opts) when is_atom(LogName), is_list(Opts) -> parse_opts([], State) -> State; parse_opts([{close_on_close, Bool} | Rest], State) when is_boolean(Bool) -> - parse_opts(Rest, State#dl_transport{close_on_close = Bool}); + State#dl_transport{close_on_close = Bool}; parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 -> - parse_opts(Rest, State#dl_transport{sync_every = Int}). + State#dl_transport{sync_every = Int}. %%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% disk_log_transport is write-only -read(State, _Len) -> - {State, {error, no_read_from_disk_log}}. +read(_State, Len) -> + {error, no_read_from_disk_log}. -write(This = #dl_transport{log = Log}, Data) -> - {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}. +write(#dl_transport{log = Log}, Data) -> + disk_log:balog(Log, erlang:iolist_to_binary(Data)). force_flush(#dl_transport{log = Log}) -> error_logger:info_msg("~p syncing~n", [?MODULE]), disk_log:sync(Log). -flush(This = #dl_transport{log = Log, sync_every = SE}) -> +flush(#dl_transport{log = Log, sync_every = SE}) -> case SE of undefined -> % no time-based sync disk_log:sync(Log); _Else -> % sync will happen automagically ok - end, - {This, ok}. - - + end. %% On close, close the underlying log if we're configured to do so. -close(This = #dl_transport{close_on_close = false}) -> - {This, ok}; -close(This = #dl_transport{log = Log}) -> - {This, disk_log:lclose(Log)}. +close(#dl_transport{close_on_close = false}) -> + ok; +close(#dl_transport{log = Log}) -> + disk_log:lclose(Log). %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -114,10 +109,10 @@ factory_impl(Name, ExtraLogOpts, TransportOpts) -> ExtraLogOpts], Log = case disk_log:open(LogOpts) of - {ok, LogS} -> - LogS; - {repaired, LogS, Info1, Info2} -> - error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [LogS, Info1, Info2]), - LogS + {ok, Log} -> + Log; + {repaired, Log, Info1, Info2} -> + error_logger:info_msg("Disk log ~p repaired: ~p, ~p~n", [Log, Info1, Info2]), + Log end, new(Log, TransportOpts). diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl index ba3aa898..5ac2dbe1 100644 --- a/lib/erl/src/thrift_file_transport.erl +++ b/lib/erl/src/thrift_file_transport.erl @@ -29,8 +29,6 @@ -record(t_file_transport, {device, should_close = true, mode = write}). --type state() :: #t_file_transport{}. --include("thrift_transport_behaviour.hrl"). %%%% CONSTRUCTION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -65,25 +63,25 @@ parse_opts([], State) -> %%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -write(This = #t_file_transport{device = Device, mode = write}, Data) -> - {This, file:write(Device, Data)}; -write(This, _D) -> - {This, {error, read_mode}}. +write(#t_file_transport{device = Device, mode = write}, Data) -> + file:write(Device, Data); +write(_T, _D) -> + {error, read_mode}. -read(This = #t_file_transport{device = Device, mode = read}, Len) +read(#t_file_transport{device = Device, mode = read}, Len) when is_integer(Len), Len >= 0 -> - {This, file:read(Device, Len)}; -read(This, _D) -> - {This, {error, read_mode}}. + file:read(Device, Len); +read(_T, _D) -> + {error, read_mode}. -flush(This = #t_file_transport{device = Device, mode = write}) -> - {This, file:sync(Device)}. +flush(#t_file_transport{device = Device, mode = write}) -> + file:sync(Device). -close(This = #t_file_transport{device = Device, should_close = SC}) -> +close(#t_file_transport{device = Device, should_close = SC}) -> case SC of true -> - {This, file:close(Device)}; + file:close(Device); false -> - {This, ok} + ok end. diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl index 9b90112c..01bab70b 100644 --- a/lib/erl/src/thrift_framed_transport.erl +++ b/lib/erl/src/thrift_framed_transport.erl @@ -19,11 +19,16 @@ -module(thrift_framed_transport). +-behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). @@ -31,55 +36,102 @@ read_buffer, % iolist() write_buffer % iolist() }). --type state() :: #framed_transport{}. --include("thrift_transport_behaviour.hrl"). +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- new(WrappedTransport) -> - State = #framed_transport{wrapped = WrappedTransport, - read_buffer = [], - write_buffer = []}, - thrift_transport:new(?MODULE, State). - -%% Writes data into the buffer -write(State = #framed_transport{write_buffer = WBuf}, Data) -> - {State#framed_transport{write_buffer = [WBuf, Data]}, ok}. - -%% Flushes the buffer through to the wrapped transport -flush(State0 = #framed_transport{write_buffer = Buffer, - wrapped = Wrapped0}) -> - FrameLen = iolist_size(Buffer), - Data = [<>, Buffer], + case gen_server:start_link(?MODULE, [WrappedTransport], []) of + {ok, Pid} -> + thrift_transport:new(?MODULE, Pid); + Else -> + Else + end. - {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data), +%%-------------------------------------------------------------------- +%% Function: write(Transport, Data) -> ok +%% +%% Data = iolist() +%% +%% Description: Writes data into the buffer +%%-------------------------------------------------------------------- +write(Transport, Data) -> + gen_server:call(Transport, {write, Data}). - {Wrapped2, _} = thrift_transport:flush(Wrapped1), +%%-------------------------------------------------------------------- +%% Function: flush(Transport) -> ok +%% +%% Description: Flushes the buffer through to the wrapped transport +%%-------------------------------------------------------------------- +flush(Transport) -> + gen_server:call(Transport, flush). - State1 = State0#framed_transport{wrapped = Wrapped2, write_buffer = []}, - {State1, Response}. +%%-------------------------------------------------------------------- +%% Function: close(Transport) -> ok +%% +%% Description: Closes the transport and the wrapped transport +%%-------------------------------------------------------------------- +close(Transport) -> + gen_server:cast(Transport, close). -%% Closes the transport and the wrapped transport -close(State = #framed_transport{wrapped = Wrapped0}) -> - {Wrapped1, Result} = thrift_transport:close(Wrapped0), - NewState = State#framed_transport{wrapped = Wrapped1}, - {NewState, Result}. +%%-------------------------------------------------------------------- +%% Function: Read(Transport, Len) -> {ok, Data} +%% +%% Data = binary() +%% +%% Description: Reads data through from the wrapped transoprt +%%-------------------------------------------------------------------- +read(Transport, Len) when is_integer(Len) -> + gen_server:call(Transport, {read, Len}). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== -%% Reads data through from the wrapped transport -read(State0 = #framed_transport{wrapped = Wrapped0, read_buffer = RBuf}, - Len) when is_integer(Len) -> - {Wrapped1, {RBuf1, RBuf1Size}} = +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Wrapped]) -> + {ok, #framed_transport{wrapped = Wrapped, + read_buffer = [], + write_buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) -> + {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}}; + +handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped, + read_buffer = RBuf}) -> + {RBuf1, RBuf1Size} = %% if the read buffer is empty, read another frame %% otherwise, just read from what's left in the buffer case iolist_size(RBuf) of 0 -> %% read the frame length - {WrappedS1, {ok, <>}} = - thrift_transport:read(Wrapped0, 4), + {ok, <>} = + thrift_transport:read(Wrapped, 4), %% then read the data - {WrappedS2, {ok, Bin}} = - thrift_transport:read(WrappedS1, FrameLen), - {WrappedS2, {Bin, erlang:byte_size(Bin)}}; + {ok, Bin} = + thrift_transport:read(Wrapped, FrameLen), + {Bin, erlang:byte_size(Bin)}; Sz -> - {Wrapped0, {RBuf, Sz}} + {RBuf, Sz} end, %% pull off Give bytes, return them to the user, leave the rest in the buffer @@ -87,13 +139,69 @@ read(State0 = #framed_transport{wrapped = Wrapped0, read_buffer = RBuf}, <> = iolist_to_binary(RBuf1), Response = {ok, Data}, - State1 = State0#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2}, + State1 = State#framed_transport{read_buffer=RBuf2}, + + {reply, Response, State1}; + +handle_call(flush, _From, State) -> + {Response, State1} = do_flush(State), + {reply, Response, State1}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(close, State) -> + {_, State1} = do_flush(State), + %% Wrapped is closed by terminate/2 + %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]), + {stop, normal, State}; +handle_cast(Msg, State=#framed_transport{}) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) -> + thrift_transport:close(Wrapped), + ok. - {State1, Response}. +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%-------------------------------------------------------------------- -%% Internal functions +%%% Internal functions %%-------------------------------------------------------------------- +do_flush(State = #framed_transport{write_buffer = Buffer, + wrapped = Wrapped}) -> + FrameLen = iolist_size(Buffer), + Data = [<>, Buffer], + + Response = thrift_transport:write(Wrapped, Data), + + thrift_transport:flush(Wrapped), + + State1 = State#framed_transport{write_buffer = []}, + {Response, State1}. min(A,B) when A A; min(_,B) -> B. diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl index 09113cc2..f8c18277 100644 --- a/lib/erl/src/thrift_http_transport.erl +++ b/lib/erl/src/thrift_http_transport.erl @@ -19,11 +19,20 @@ -module(thrift_http_transport). +-behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/2, new/3]). +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). @@ -34,9 +43,14 @@ http_options, % see http(3) extra_headers % [{str(), str()}, ...] }). --type state() :: pid(). --include("thrift_transport_behaviour.hrl"). +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: new() -> {ok, Transport} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- new(Host, Path) -> new(Host, Path, _Options = []). @@ -46,6 +60,54 @@ new(Host, Path) -> %% {extra_headers, ExtraHeaders} = List of extra HTTP headers %%-------------------------------------------------------------------- new(Host, Path, Options) -> + case gen_server:start_link(?MODULE, {Host, Path, Options}, []) of + {ok, Pid} -> + thrift_transport:new(?MODULE, Pid); + Else -> + Else + end. + +%%-------------------------------------------------------------------- +%% Function: write(Transport, Data) -> ok +%% +%% Data = iolist() +%% +%% Description: Writes data into the buffer +%%-------------------------------------------------------------------- +write(Transport, Data) -> + gen_server:call(Transport, {write, Data}). + +%%-------------------------------------------------------------------- +%% Function: flush(Transport) -> ok +%% +%% Description: Flushes the buffer, making a request +%%-------------------------------------------------------------------- +flush(Transport) -> + gen_server:call(Transport, flush). + +%%-------------------------------------------------------------------- +%% Function: close(Transport) -> ok +%% +%% Description: Closes the transport +%%-------------------------------------------------------------------- +close(Transport) -> + gen_server:cast(Transport, close). + +%%-------------------------------------------------------------------- +%% Function: Read(Transport, Len) -> {ok, Data} +%% +%% Data = binary() +%% +%% Description: Reads data through from the wrapped transoprt +%%-------------------------------------------------------------------- +read(Transport, Len) when is_integer(Len) -> + gen_server:call(Transport, {read, Len}). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +init({Host, Path, Options}) -> State1 = #http_transport{host = Host, path = Path, read_buffer = [], @@ -65,17 +127,50 @@ new(Host, Path, Options) -> end, case lists:foldl(ApplyOption, State1, Options) of State2 = #http_transport{} -> - thrift_transport:new(?MODULE, State2); + {ok, State2}; Else -> - {error, Else} + {stop, Else} end. -%% Writes data into the buffer -write(State = #http_transport{write_buffer = WBuf}, Data) -> - {State#http_transport{write_buffer = [WBuf, Data]}, ok}. +handle_call({write, Data}, _From, State = #http_transport{write_buffer = WBuf}) -> + {reply, ok, State#http_transport{write_buffer = [WBuf, Data]}}; -%% Flushes the buffer, making a request -flush(State = #http_transport{host = Host, +handle_call({read, Len}, _From, State = #http_transport{read_buffer = RBuf}) -> + %% Pull off Give bytes, return them to the user, leave the rest in the buffer. + Give = min(iolist_size(RBuf), Len), + case iolist_to_binary(RBuf) of + <> -> + Response = {ok, Data}, + State1 = State#http_transport{read_buffer=RBuf1}, + {reply, Response, State1}; + _ -> + {reply, {error, 'EOF'}, State} + end; + +handle_call(flush, _From, State) -> + {Response, State1} = do_flush(State), + {reply, Response, State1}. + +handle_cast(close, State) -> + {_, State1} = do_flush(State), + {stop, normal, State1}; + +handle_cast(_Msg, State=#http_transport{}) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +do_flush(State = #http_transport{host = Host, path = Path, read_buffer = Rbuf, write_buffer = Wbuf, @@ -84,7 +179,7 @@ flush(State = #http_transport{host = Host, case iolist_to_binary(Wbuf) of <<>> -> %% Don't bother flushing empty buffers. - {State, ok}; + {ok, State}; WBinary -> {ok, {{_Version, 200, _ReasonPhrase}, _Headers, Body}} = http:request(post, @@ -97,22 +192,7 @@ flush(State = #http_transport{host = Host, State1 = State#http_transport{read_buffer = [Rbuf, Body], write_buffer = []}, - {State1, ok} - end. - -close(State) -> - {State, ok}. - -read(State = #http_transport{read_buffer = RBuf}, Len) when is_integer(Len) -> - %% Pull off Give bytes, return them to the user, leave the rest in the buffer. - Give = min(iolist_size(RBuf), Len), - case iolist_to_binary(RBuf) of - <> -> - Response = {ok, Data}, - State1 = State#http_transport{read_buffer=RBuf1}, - {State1, Response}; - _ -> - {State, {error, 'EOF'}} + {ok, State1} end. min(A,B) when A A; diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl index c44449ec..b4f607a9 100644 --- a/lib/erl/src/thrift_memory_buffer.erl +++ b/lib/erl/src/thrift_memory_buffer.erl @@ -19,43 +19,145 @@ -module(thrift_memory_buffer). +-behaviour(gen_server). -behaviour(thrift_transport). %% API -export([new/0, new_transport_factory/0]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + %% thrift_transport callbacks -export([write/2, read/2, flush/1, close/1]). -record(memory_buffer, {buffer}). --type state() :: #memory_buffer{}. --include("thrift_transport_behaviour.hrl"). +%%==================================================================== +%% API +%%==================================================================== new() -> - State = #memory_buffer{buffer = []}, - thrift_transport:new(?MODULE, State). + case gen_server:start_link(?MODULE, [], []) of + {ok, Pid} -> + thrift_transport:new(?MODULE, Pid); + Else -> + Else + end. new_transport_factory() -> {ok, fun() -> new() end}. -%% Writes data into the buffer -write(State = #memory_buffer{buffer = Buf}, Data) -> - {State#memory_buffer{buffer = [Buf, Data]}, ok}. +%%-------------------------------------------------------------------- +%% Function: write(Transport, Data) -> ok +%% +%% Data = iolist() +%% +%% Description: Writes data into the buffer +%%-------------------------------------------------------------------- +write(Transport, Data) -> + gen_server:call(Transport, {write, Data}). -flush(State) -> - {State, ok}. +%%-------------------------------------------------------------------- +%% Function: flush(Transport) -> ok +%% +%% Description: Flushes the buffer through to the wrapped transport +%%-------------------------------------------------------------------- +flush(Transport) -> + gen_server:call(Transport, flush). -close(State) -> - {State, ok}. +%%-------------------------------------------------------------------- +%% Function: close(Transport) -> ok +%% +%% Description: Closes the transport and the wrapped transport +%%-------------------------------------------------------------------- +close(Transport) -> + gen_server:cast(Transport, close). -read(State = #memory_buffer{buffer = Buf}, Len) when is_integer(Len) -> +%%-------------------------------------------------------------------- +%% Function: Read(Transport, Len) -> {ok, Data} +%% +%% Data = binary() +%% +%% Description: Reads data through from the wrapped transoprt +%%-------------------------------------------------------------------- +read(Transport, Len) when is_integer(Len) -> + gen_server:call(Transport, {read, Len}). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([]) -> + {ok, #memory_buffer{buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({write, Data}, _From, State = #memory_buffer{buffer = Buf}) -> + {reply, ok, State#memory_buffer{buffer = [Buf, Data]}}; + +handle_call({read, Len}, _From, State = #memory_buffer{buffer = Buf}) -> Binary = iolist_to_binary(Buf), Give = min(iolist_size(Binary), Len), {Result, Remaining} = split_binary(Binary, Give), - {State#memory_buffer{buffer = Remaining}, {ok, Result}}. + {reply, {ok, Result}, State#memory_buffer{buffer = Remaining}}; + +handle_call(flush, _From, State) -> + {reply, ok, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(close, State) -> + {stop, normal, State}; +handle_cast(Msg, State=#memory_buffer{}) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%-------------------------------------------------------------------- -%% Internal functions +%%% Internal functions %%-------------------------------------------------------------------- min(A,B) when A A; min(_,B) -> B. diff --git a/lib/erl/src/thrift_processor.erl b/lib/erl/src/thrift_processor.erl index 43155050..e26fb330 100644 --- a/lib/erl/src/thrift_processor.erl +++ b/lib/erl/src/thrift_processor.erl @@ -24,54 +24,55 @@ -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(thrift_processor, {handler, protocol, service}). +-record(thrift_processor, {handler, in_protocol, out_protocol, service}). -init({_Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) -> - {ok, Proto} = ProtoGen(), - loop(#thrift_processor{protocol = Proto, +init({Server, ProtoGen, Service, Handler}) when is_function(ProtoGen, 0) -> + {ok, IProt, OProt} = ProtoGen(), + loop(#thrift_processor{in_protocol = IProt, + out_protocol = OProt, service = Service, handler = Handler}). -loop(State0 = #thrift_processor{protocol = Proto0}) -> - {Proto1, MessageBegin} = thrift_protocol:read(Proto0, message_begin), - State1 = State0#thrift_processor{protocol = Proto1}, - case MessageBegin of +loop(State = #thrift_processor{in_protocol = IProto, + out_protocol = OProto}) -> + case thrift_protocol:read(IProto, message_begin) of #protocol_message_begin{name = Function, type = ?tMessageType_CALL} -> - {State2, ok} = handle_function(State1, list_to_atom(Function)), - loop(State2); + ok = handle_function(State, list_to_atom(Function)), + loop(State); #protocol_message_begin{name = Function, type = ?tMessageType_ONEWAY} -> - {State2, ok} = handle_function(State1, list_to_atom(Function)), - loop(State2); + ok = handle_function(State, list_to_atom(Function)), + loop(State); {error, timeout} -> - thrift_protocol:close_transport(Proto1), + thrift_protocol:close_transport(OProto), ok; {error, closed} -> %% error_logger:info_msg("Client disconnected~n"), - thrift_protocol:close_transport(Proto1), + thrift_protocol:close_transport(OProto), exit(shutdown) end. -handle_function(State0=#thrift_processor{protocol = Proto0, - handler = Handler, - service = Service}, +handle_function(State=#thrift_processor{in_protocol = IProto, + out_protocol = OProto, + handler = Handler, + service = Service}, Function) -> InParams = Service:function_info(Function, params_type), - {Proto1, {ok, Params}} = thrift_protocol:read(Proto0, InParams), - State1 = State0#thrift_processor{protocol = Proto1}, + {ok, Params} = thrift_protocol:read(IProto, InParams), try Result = Handler:handle_function(Function, Params), %% {Micro, Result} = better_timer(Handler, handle_function, [Function, Params]), %% error_logger:info_msg("Processed ~p(~p) in ~.4fms~n", %% [Function, Params, Micro/1000.0]), - handle_success(State1, Function, Result) + handle_success(State, Function, Result) catch - Type:Data when Type =:= throw orelse Type =:= error -> - handle_function_catch(State1, Function, Type, Data) - end. + Type:Data -> + handle_function_catch(State, Function, Type, Data) + end, + after_reply(OProto). handle_function_catch(State = #thrift_processor{service = Service}, Function, ErrType, ErrData) -> @@ -83,37 +84,39 @@ handle_function_catch(State = #thrift_processor{service = Service}, error_logger:warning_msg( "oneway void ~p threw error which must be ignored: ~p", [Function, {ErrType, ErrData, Stack}]), - {State, ok}; + ok; {throw, Exception} when is_tuple(Exception), size(Exception) > 0 -> - %error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), - handle_exception(State, Function, Exception); - % we still want to accept more requests from this client + error_logger:warning_msg("~p threw exception: ~p~n", [Function, Exception]), + handle_exception(State, Function, Exception), + ok; % we still want to accept more requests from this client {error, Error} -> - handle_error(State, Function, Error) + ok = handle_error(State, Function, Error) end. -handle_success(State = #thrift_processor{service = Service}, +handle_success(State = #thrift_processor{out_protocol = OProto, + service = Service}, Function, Result) -> ReplyType = Service:function_info(Function, reply_type), StructName = atom_to_list(Function) ++ "_result", - case Result of - {reply, ReplyData} -> - Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, - send_reply(State, Function, ?tMessageType_REPLY, Reply); + ok = case Result of + {reply, ReplyData} -> + Reply = {{struct, [{0, ReplyType}]}, {StructName, ReplyData}}, + send_reply(OProto, Function, ?tMessageType_REPLY, Reply); - ok when ReplyType == {struct, []} -> - send_reply(State, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); + ok when ReplyType == {struct, []} -> + send_reply(OProto, Function, ?tMessageType_REPLY, {ReplyType, {StructName}}); - ok when ReplyType == oneway_void -> - %% no reply for oneway void - {State, ok} - end. + ok when ReplyType == oneway_void -> + %% no reply for oneway void + ok + end. -handle_exception(State = #thrift_processor{service = Service}, +handle_exception(State = #thrift_processor{out_protocol = OProto, + service = Service}, Function, Exception) -> ExceptionType = element(1, Exception), @@ -138,9 +141,9 @@ handle_exception(State = #thrift_processor{service = Service}, % Make sure we got at least one defined case lists:all(fun(X) -> X =:= undefined end, ExceptionList) of true -> - handle_unknown_exception(State, Function, Exception); + ok = handle_unknown_exception(State, Function, Exception); false -> - send_reply(State, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) + ok = send_reply(OProto, Function, ?tMessageType_REPLY, {ReplySpec, ExceptionTuple}) end. %% @@ -151,7 +154,7 @@ handle_unknown_exception(State, Function, Exception) -> handle_error(State, Function, {exception_not_declared_as_thrown, Exception}). -handle_error(State, Function, Error) -> +handle_error(#thrift_processor{out_protocol = OProto}, Function, Error) -> Stack = erlang:get_stacktrace(), error_logger:error_msg("~p had an error: ~p~n", [Function, {Error, Stack}]), @@ -167,14 +170,19 @@ handle_error(State, Function, Error) -> #'TApplicationException'{ message = Message, type = ?TApplicationException_UNKNOWN}}, - send_reply(State, Function, ?tMessageType_EXCEPTION, Reply). - -send_reply(State = #thrift_processor{protocol = Proto0}, Function, ReplyMessageType, Reply) -> - {Proto1, ok} = thrift_protocol:write(Proto0, #protocol_message_begin{ - name = atom_to_list(Function), - type = ReplyMessageType, - seqid = 0}), - {Proto2, ok} = thrift_protocol:write(Proto1, Reply), - {Proto3, ok} = thrift_protocol:write(Proto2, message_end), - {Proto4, ok} = thrift_protocol:flush_transport(Proto3), - {State#thrift_processor{protocol = Proto4}, ok}. + send_reply(OProto, Function, ?tMessageType_EXCEPTION, Reply). + +send_reply(OProto, Function, ReplyMessageType, Reply) -> + ok = thrift_protocol:write(OProto, #protocol_message_begin{ + name = atom_to_list(Function), + type = ReplyMessageType, + seqid = 0}), + ok = thrift_protocol:write(OProto, Reply), + ok = thrift_protocol:write(OProto, message_end), + ok = thrift_protocol:flush_transport(OProto), + ok. + +after_reply(OProto) -> + ok = thrift_protocol:flush_transport(OProto) + %% ok = thrift_protocol:close_transport(OProto) + . diff --git a/lib/erl/src/thrift_protocol.erl b/lib/erl/src/thrift_protocol.erl index 4c334128..1bfb0a42 100644 --- a/lib/erl/src/thrift_protocol.erl +++ b/lib/erl/src/thrift_protocol.erl @@ -49,13 +49,10 @@ new(Module, Data) when is_atom(Module) -> {ok, #protocol{module = Module, data = Data}}. --spec flush_transport(#protocol{}) -> {#protocol{}, ok}. -flush_transport(Proto = #protocol{module = Module, - data = Data}) -> - {NewData, Result} = Module:flush_transport(Data), - {Proto#protocol{data = NewData}, Result}. +flush_transport(#protocol{module = Module, + data = Data}) -> + Module:flush_transport(Data). --spec close_transport(#protocol{}) -> ok. close_transport(#protocol{module = Module, data = Data}) -> Module:close_transport(Data). @@ -89,8 +86,7 @@ term_to_typeid({list, _}) -> ?tType_LIST. %% Structure is like: %% [{Fid, Type}, ...] --spec read(#protocol{}, {struct, _StructDef}, atom()) -> {#protocol{}, {ok, tuple()}}. -read(IProto0, {struct, Structure}, Tag) +read(IProto, {struct, Structure}, Tag) when is_list(Structure), is_atom(Tag) -> % If we want a tagged tuple, we need to offset all the tuple indices @@ -107,23 +103,14 @@ read(IProto0, {struct, Structure}, Tag) % Fid -> {Type, Index} SDict = dict:from_list(SWithIndices), - {IProto1, ok} = read(IProto0, struct_begin), + ok = read(IProto, struct_begin), RTuple0 = erlang:make_tuple(length(Structure) + Offset, undefined), RTuple1 = if Tag =/= undefined -> setelement(1, RTuple0, Tag); true -> RTuple0 end, - {IProto2, RTuple2} = read_struct_loop(IProto1, SDict, RTuple1), - {IProto2, {ok, RTuple2}}. - - -%% NOTE: Keep this in sync with thrift_protocol_behaviour:read --spec read - (#protocol{}, {struct, _Info}) -> {#protocol{}, {ok, tuple()} | {error, _Reason}}; - (#protocol{}, tprot_cont_tag()) -> {#protocol{}, {ok, any()} | {error, _Reason}}; - (#protocol{}, tprot_empty_tag()) -> {#protocol{}, ok | {error, _Reason}}; - (#protocol{}, tprot_header_tag()) -> {#protocol{}, tprot_header_val() | {error, _Reason}}; - (#protocol{}, tprot_data_tag()) -> {#protocol{}, {ok, any()} | {error, _Reason}}. + RTuple2 = read_struct_loop(IProto, SDict, RTuple1), + {ok, RTuple2}. read(IProto, {struct, {Module, StructureName}}) when is_atom(Module), is_atom(StructureName) -> @@ -132,165 +119,137 @@ read(IProto, {struct, {Module, StructureName}}) when is_atom(Module), read(IProto, S={struct, Structure}) when is_list(Structure) -> read(IProto, S, undefined); -read(IProto0, {list, Type}) -> - {IProto1, #protocol_list_begin{etype = EType, size = Size}} = - read(IProto0, list_begin), - {EType, EType} = {term_to_typeid(Type), EType}, - {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) -> - {ProtoS1, {ok, Item}} = read(ProtoS0, Type), - {Item, ProtoS1} - end, - IProto1, - lists:duplicate(Size, 0)), - {IProto3, ok} = read(IProto2, list_end), - {IProto3, {ok, List}}; - -read(IProto0, {map, KeyType, ValType}) -> - {IProto1, #protocol_map_begin{size = Size, ktype = KType, vtype = VType}} = - read(IProto0, map_begin), - {KType, KType} = {term_to_typeid(KeyType), KType}, - {VType, VType} = {term_to_typeid(ValType), VType}, - {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) -> - {ProtoS1, {ok, Key}} = read(ProtoS0, KeyType), - {ProtoS2, {ok, Val}} = read(ProtoS1, ValType), - {{Key, Val}, ProtoS2} - end, - IProto1, - lists:duplicate(Size, 0)), - {IProto3, ok} = read(IProto2, map_end), - {IProto3, {ok, dict:from_list(List)}}; - -read(IProto0, {set, Type}) -> - {IProto1, #protocol_set_begin{etype = EType, size = Size}} = - read(IProto0, set_begin), - {EType, EType} = {term_to_typeid(Type), EType}, - {List, IProto2} = lists:mapfoldl(fun(_, ProtoS0) -> - {ProtoS1, {ok, Item}} = read(ProtoS0, Type), - {Item, ProtoS1} - end, - IProto1, - lists:duplicate(Size, 0)), - {IProto3, ok} = read(IProto2, set_end), - {IProto3, {ok, sets:from_list(List)}}; - -read(Protocol, ProtocolType) -> - read_specific(Protocol, ProtocolType). - -%% NOTE: Keep this in sync with thrift_protocol_behaviour:read --spec read_specific - (#protocol{}, tprot_empty_tag()) -> {#protocol{}, ok | {error, _Reason}}; - (#protocol{}, tprot_header_tag()) -> {#protocol{}, tprot_header_val() | {error, _Reason}}; - (#protocol{}, tprot_data_tag()) -> {#protocol{}, {ok, any()} | {error, _Reason}}. -read_specific(Proto = #protocol{module = Module, - data = ModuleData}, ProtocolType) -> - {NewData, Result} = Module:read(ModuleData, ProtocolType), - {Proto#protocol{data = NewData}, Result}. - -read_struct_loop(IProto0, SDict, RTuple) -> - {IProto1, #protocol_field_begin{type = FType, id = Fid}} = - thrift_protocol:read(IProto0, field_begin), +read(IProto, {list, Type}) -> + #protocol_list_begin{etype = EType, size = Size} = + read(IProto, list_begin), + List = [Result || {ok, Result} <- + [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]], + ok = read(IProto, list_end), + {ok, List}; + +read(IProto, {map, KeyType, ValType}) -> + #protocol_map_begin{size = Size} = + read(IProto, map_begin), + + List = [{Key, Val} || {{ok, Key}, {ok, Val}} <- + [{read(IProto, KeyType), + read(IProto, ValType)} || _X <- lists:duplicate(Size, 0)]], + ok = read(IProto, map_end), + {ok, dict:from_list(List)}; + +read(IProto, {set, Type}) -> + #protocol_set_begin{etype = _EType, + size = Size} = + read(IProto, set_begin), + List = [Result || {ok, Result} <- + [read(IProto, Type) || _X <- lists:duplicate(Size, 0)]], + ok = read(IProto, set_end), + {ok, sets:from_list(List)}; + +read(#protocol{module = Module, + data = ModuleData}, ProtocolType) -> + Module:read(ModuleData, ProtocolType). + +read_struct_loop(IProto, SDict, RTuple) -> + #protocol_field_begin{type = FType, id = Fid, name = Name} = + thrift_protocol:read(IProto, field_begin), case {FType, Fid} of {?tType_STOP, _} -> - {IProto1, RTuple}; + RTuple; _Else -> case dict:find(Fid, SDict) of {ok, {Type, Index}} -> case term_to_typeid(Type) of FType -> - {IProto2, {ok, Val}} = read(IProto1, Type), - {IProto3, ok} = thrift_protocol:read(IProto2, field_end), + {ok, Val} = read(IProto, Type), + thrift_protocol:read(IProto, field_end), NewRTuple = setelement(Index, RTuple, Val), - read_struct_loop(IProto3, SDict, NewRTuple); + read_struct_loop(IProto, SDict, NewRTuple); Expected -> error_logger:info_msg( "Skipping field ~p with wrong type (~p != ~p)~n", [Fid, FType, Expected]), - skip_field(FType, IProto1, SDict, RTuple) + skip_field(FType, IProto, SDict, RTuple) end; _Else2 -> error_logger:info_msg("Skipping field ~p with unknown fid~n", [Fid]), - skip_field(FType, IProto1, SDict, RTuple) + skip_field(FType, IProto, SDict, RTuple) end end. -skip_field(FType, IProto0, SDict, RTuple) -> +skip_field(FType, IProto, SDict, RTuple) -> FTypeAtom = thrift_protocol:typeid_to_atom(FType), - {IProto1, ok} = thrift_protocol:skip(IProto0, FTypeAtom), - {IProto2, ok} = read(IProto1, field_end), - read_struct_loop(IProto2, SDict, RTuple). - --spec skip(#protocol{}, any()) -> {#protocol{}, ok}. - -skip(Proto0, struct) -> - {Proto1, ok} = read(Proto0, struct_begin), - {Proto2, ok} = skip_struct_loop(Proto1), - {Proto3, ok} = read(Proto2, struct_end), - {Proto3, ok}; - -skip(Proto0, map) -> - {Proto1, Map} = read(Proto0, map_begin), - {Proto2, ok} = skip_map_loop(Proto1, Map), - {Proto3, ok} = read(Proto2, map_end), - {Proto3, ok}; - -skip(Proto0, set) -> - {Proto1, Set} = read(Proto0, set_begin), - {Proto2, ok} = skip_set_loop(Proto1, Set), - {Proto3, ok} = read(Proto2, set_end), - {Proto3, ok}; - -skip(Proto0, list) -> - {Proto1, List} = read(Proto0, list_begin), - {Proto2, ok} = skip_list_loop(Proto1, List), - {Proto3, ok} = read(Proto2, list_end), - {Proto3, ok}; - -skip(Proto0, Type) when is_atom(Type) -> - {Proto1, _Ignore} = read(Proto0, Type), - {Proto1, ok}. - - -skip_struct_loop(Proto0) -> - {Proto1, #protocol_field_begin{type = Type}} = read(Proto0, field_begin), + thrift_protocol:skip(IProto, FTypeAtom), + read(IProto, field_end), + read_struct_loop(IProto, SDict, RTuple). + + +skip(Proto, struct) -> + ok = read(Proto, struct_begin), + ok = skip_struct_loop(Proto), + ok = read(Proto, struct_end); + +skip(Proto, map) -> + Map = read(Proto, map_begin), + ok = skip_map_loop(Proto, Map), + ok = read(Proto, map_end); + +skip(Proto, set) -> + Set = read(Proto, set_begin), + ok = skip_set_loop(Proto, Set), + ok = read(Proto, set_end); + +skip(Proto, list) -> + List = read(Proto, list_begin), + ok = skip_list_loop(Proto, List), + ok = read(Proto, list_end); + +skip(Proto, Type) when is_atom(Type) -> + _Ignore = read(Proto, Type), + ok. + + +skip_struct_loop(Proto) -> + #protocol_field_begin{type = Type} = read(Proto, field_begin), case Type of ?tType_STOP -> - {Proto1, ok}; + ok; _Else -> - {Proto2, ok} = skip(Proto1, Type), - {Proto3, ok} = read(Proto2, field_end), - skip_struct_loop(Proto3) + skip(Proto, Type), + ok = read(Proto, field_end), + skip_struct_loop(Proto) end. -skip_map_loop(Proto0, Map = #protocol_map_begin{ktype = Ktype, - vtype = Vtype, - size = Size}) -> +skip_map_loop(Proto, Map = #protocol_map_begin{ktype = Ktype, + vtype = Vtype, + size = Size}) -> case Size of N when N > 0 -> - {Proto1, ok} = skip(Proto0, Ktype), - {Proto2, ok} = skip(Proto1, Vtype), - skip_map_loop(Proto2, + skip(Proto, Ktype), + skip(Proto, Vtype), + skip_map_loop(Proto, Map#protocol_map_begin{size = Size - 1}); - 0 -> {Proto0, ok} + 0 -> ok end. -skip_set_loop(Proto0, Map = #protocol_set_begin{etype = Etype, - size = Size}) -> +skip_set_loop(Proto, Map = #protocol_set_begin{etype = Etype, + size = Size}) -> case Size of N when N > 0 -> - {Proto1, ok} = skip(Proto0, Etype), - skip_set_loop(Proto1, + skip(Proto, Etype), + skip_set_loop(Proto, Map#protocol_set_begin{size = Size - 1}); - 0 -> {Proto0, ok} + 0 -> ok end. -skip_list_loop(Proto0, Map = #protocol_list_begin{etype = Etype, - size = Size}) -> +skip_list_loop(Proto, Map = #protocol_list_begin{etype = Etype, + size = Size}) -> case Size of N when N > 0 -> - {Proto1, ok} = skip(Proto0, Etype), - skip_list_loop(Proto1, + skip(Proto, Etype), + skip_list_loop(Proto, Map#protocol_list_begin{size = Size - 1}); - 0 -> {Proto0, ok} + 0 -> ok end. @@ -308,95 +267,90 @@ skip_list_loop(Proto0, Map = #protocol_list_begin{etype = Etype, %% | list() -- for list %% | dictionary() -- for map %% | set() -- for set -%% | any() -- for base types +%% | term() -- for base types %% %% Description: %%-------------------------------------------------------------------- --spec write(#protocol{}, any()) -> {#protocol{}, ok | {error, _Reason}}. - -write(Proto0, {{struct, StructDef}, Data}) +write(Proto, {{struct, StructDef}, Data}) when is_list(StructDef), is_tuple(Data), length(StructDef) == size(Data) - 1 -> [StructName | Elems] = tuple_to_list(Data), - {Proto1, ok} = write(Proto0, #protocol_struct_begin{name = StructName}), - {Proto2, ok} = struct_write_loop(Proto1, StructDef, Elems), - {Proto3, ok} = write(Proto2, struct_end), - {Proto3, ok}; + ok = write(Proto, #protocol_struct_begin{name = StructName}), + ok = struct_write_loop(Proto, StructDef, Elems), + ok = write(Proto, struct_end), + ok; write(Proto, {{struct, {Module, StructureName}}, Data}) when is_atom(Module), is_atom(StructureName), element(1, Data) =:= StructureName -> + StructType = Module:struct_info(StructureName), write(Proto, {Module:struct_info(StructureName), Data}); -write(Proto0, {{list, Type}, Data}) +write(Proto, {{list, Type}, Data}) when is_list(Data) -> - {Proto1, ok} = write(Proto0, + ok = write(Proto, #protocol_list_begin{ etype = term_to_typeid(Type), size = length(Data) }), - Proto2 = lists:foldl(fun(Elem, ProtoIn) -> - {ProtoOut, ok} = write(ProtoIn, {Type, Elem}), - ProtoOut - end, - Proto1, - Data), - {Proto3, ok} = write(Proto2, list_end), - {Proto3, ok}; - -write(Proto0, {{map, KeyType, ValType}, Data}) -> - {Proto1, ok} = write(Proto0, - #protocol_map_begin{ - ktype = term_to_typeid(KeyType), - vtype = term_to_typeid(ValType), - size = dict:size(Data) - }), - Proto2 = dict:fold(fun(KeyData, ValData, ProtoS0) -> - {ProtoS1, ok} = write(ProtoS0, {KeyType, KeyData}), - {ProtoS2, ok} = write(ProtoS1, {ValType, ValData}), - ProtoS2 - end, - Proto1, - Data), - {Proto3, ok} = write(Proto2, map_end), - {Proto3, ok}; - -write(Proto0, {{set, Type}, Data}) -> + lists:foreach(fun(Elem) -> + ok = write(Proto, {Type, Elem}) + end, + Data), + ok = write(Proto, list_end), + ok; + +write(Proto, {{map, KeyType, ValType}, Data}) -> + ok = write(Proto, + #protocol_map_begin{ + ktype = term_to_typeid(KeyType), + vtype = term_to_typeid(ValType), + size = dict:size(Data) + }), + dict:fold(fun(KeyData, ValData, _Acc) -> + ok = write(Proto, {KeyType, KeyData}), + ok = write(Proto, {ValType, ValData}) + end, + _AccO = ok, + Data), + ok = write(Proto, map_end), + ok; + +write(Proto, {{set, Type}, Data}) -> true = sets:is_set(Data), - {Proto1, ok} = write(Proto0, - #protocol_set_begin{ - etype = term_to_typeid(Type), - size = sets:size(Data) - }), - Proto2 = sets:fold(fun(Elem, ProtoIn) -> - {ProtoOut, ok} = write(ProtoIn, {Type, Elem}), - ProtoOut - end, - Proto1, - Data), - {Proto3, ok} = write(Proto2, set_end), - {Proto3, ok}; - -write(Proto = #protocol{module = Module, - data = ModuleData}, Data) -> - {NewData, Result} = Module:write(ModuleData, Data), - {Proto#protocol{data = NewData}, Result}. - -struct_write_loop(Proto0, [{Fid, Type} | RestStructDef], [Data | RestData]) -> - NewProto = case Data of - undefined -> - Proto0; % null fields are skipped in response - _ -> - {Proto1, ok} = write(Proto0, - #protocol_field_begin{ - type = term_to_typeid(Type), - id = Fid - }), - {Proto2, ok} = write(Proto1, {Type, Data}), - {Proto3, ok} = write(Proto2, field_end), - Proto3 - end, - struct_write_loop(NewProto, RestStructDef, RestData); + ok = write(Proto, + #protocol_set_begin{ + etype = term_to_typeid(Type), + size = sets:size(Data) + }), + sets:fold(fun(Elem, _Acc) -> + ok = write(Proto, {Type, Elem}) + end, + _Acc0 = ok, + Data), + ok = write(Proto, set_end), + ok; + +write(#protocol{module = Module, + data = ModuleData}, Data) -> + Module:write(ModuleData, Data). + +struct_write_loop(Proto, [{Fid, Type} | RestStructDef], [Data | RestData]) -> + case Data of + undefined -> + % null fields are skipped in response + skip; + _ -> + ok = write(Proto, + #protocol_field_begin{ + type = term_to_typeid(Type), + id = Fid + }), + ok = write(Proto, {Type, Data}), + ok = write(Proto, field_end) + end, + struct_write_loop(Proto, RestStructDef, RestData); struct_write_loop(Proto, [], []) -> - write(Proto, field_stop). + ok = write(Proto, field_stop), + ok. diff --git a/lib/erl/src/thrift_server.erl b/lib/erl/src/thrift_server.erl index 5012e168..5d0012ba 100644 --- a/lib/erl/src/thrift_server.erl +++ b/lib/erl/src/thrift_server.erl @@ -126,7 +126,7 @@ handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, {stop, Reason, State} end; -handle_info({inet_async, _ListenSocket, _Ref, Error}, State) -> +handle_info({inet_async, ListenSocket, Ref, Error}, State) -> error_logger:error_msg("Error in acceptor: ~p~n", [Error]), {stop, Error, State}; @@ -177,7 +177,7 @@ start_processor(Socket, Service, Handler) -> {ok, SocketTransport} = thrift_socket_transport:new(Socket), {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), - {ok, Protocol} + {ok, Protocol, Protocol} end, spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]). diff --git a/lib/erl/src/thrift_socket_server.erl b/lib/erl/src/thrift_socket_server.erl index f7c7a028..6794e630 100644 --- a/lib/erl/src/thrift_socket_server.erl +++ b/lib/erl/src/thrift_socket_server.erl @@ -166,12 +166,13 @@ gen_tcp_listen(Port, Opts, State) -> new_acceptor(State=#thrift_socket_server{max=0}) -> error_logger:error_msg("Not accepting new connections"), State#thrift_socket_server{acceptor=null}; -new_acceptor(State=#thrift_socket_server{listen=Listen, +new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen, service=Service, handler=Handler, socket_opts=Opts, framed=Framed }) -> Pid = proc_lib:spawn_link(?MODULE, acceptor_loop, [{self(), Listen, Service, Handler, Opts, Framed}]), +%% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]), State#thrift_socket_server{acceptor=Pid}. acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed}) @@ -187,7 +188,7 @@ acceptor_loop({Server, Listen, Service, Handler, SocketOpts, Framed}) false -> thrift_buffered_transport:new(SocketTransport) end, {ok, Protocol} = thrift_binary_protocol:new(Transport), - {ok, Protocol} + {ok, IProt=Protocol, OProt=Protocol} end, thrift_processor:init({Server, ProtoGen, Service, Handler}); {error, closed} -> diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl index 5e1ef026..fcd69449 100644 --- a/lib/erl/src/thrift_socket_transport.erl +++ b/lib/erl/src/thrift_socket_transport.erl @@ -29,8 +29,6 @@ -record(data, {socket, recv_timeout=infinity}). --type state() :: #data{}. --include("thrift_transport_behaviour.hrl"). new(Socket) -> new(Socket, []). @@ -47,26 +45,25 @@ new(Socket, Opts) when is_list(Opts) -> thrift_transport:new(?MODULE, State). %% Data :: iolist() -write(This = #data{socket = Socket}, Data) -> - {This, gen_tcp:send(Socket, Data)}. +write(#data{socket = Socket}, Data) -> + gen_tcp:send(Socket, Data). -read(This = #data{socket=Socket, recv_timeout=Timeout}, Len) +read(#data{socket=Socket, recv_timeout=Timeout}, Len) when is_integer(Len), Len >= 0 -> case gen_tcp:recv(Socket, Len, Timeout) of Err = {error, timeout} -> error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]), gen_tcp:close(Socket), - {This, Err}; - Data -> - {This, Data} + Err; + Data -> Data end. %% We can't really flush - everything is flushed when we write -flush(This) -> - {This, ok}. +flush(_) -> + ok. -close(This = #data{socket = Socket}) -> - {This, gen_tcp:close(Socket)}. +close(#data{socket = Socket}) -> + gen_tcp:close(Socket). %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl index 39f8c056..20c4b5dc 100644 --- a/lib/erl/src/thrift_transport.erl +++ b/lib/erl/src/thrift_transport.erl @@ -37,42 +37,21 @@ behaviour_info(callbacks) -> -record(transport, {module, data}). --ifdef(transport_wrapper_module). --define(debug_wrap(Transport), - case Transport#transport.module of - ?transport_wrapper_module -> - Transport; - _Else -> - {ok, Result} = ?transport_wrapper_module:new(Transport), - Result - end). --else. --define(debug_wrap(Transport), Transport). --endif. - new(Module, Data) when is_atom(Module) -> - Transport0 = #transport{module = Module, data = Data}, - Transport1 = ?debug_wrap(Transport0), - {ok, Transport1}. + {ok, #transport{module = Module, + data = Data}}. --spec write(#transport{}, iolist() | binary()) -> {#transport{}, ok | {error, _Reason}}. +%% Data :: iolist() write(Transport, Data) -> Module = Transport#transport.module, - {NewTransData, Result} = Module:write(Transport#transport.data, Data), - {Transport#transport{data = NewTransData}, Result}. + Module:write(Transport#transport.data, Data). --spec read(#transport{}, non_neg_integer()) -> {#transport{}, {ok, binary()} | {error, _Reason}}. read(Transport, Len) when is_integer(Len) -> Module = Transport#transport.module, - {NewTransData, Result} = Module:read(Transport#transport.data, Len), - {Transport#transport{data = NewTransData}, Result}. + Module:read(Transport#transport.data, Len). --spec flush(#transport{}) -> {#transport{}, ok | {error, _Reason}}. -flush(Transport = #transport{module = Module, data = Data}) -> - {NewTransData, Result} = Module:flush(Data), - {Transport#transport{data = NewTransData}, Result}. +flush(#transport{module = Module, data = Data}) -> + Module:flush(Data). --spec close(#transport{}) -> {#transport{}, ok | {error, _Reason}}. -close(Transport = #transport{module = Module, data = Data}) -> - {NewTransData, Result} = Module:close(Data), - {Transport#transport{data = NewTransData}, Result}. +close(#transport{module = Module, data = Data}) -> + Module:close(Data). diff --git a/lib/erl/src/thrift_transport_state_test.erl b/lib/erl/src/thrift_transport_state_test.erl deleted file mode 100644 index e83a44d2..00000000 --- a/lib/erl/src/thrift_transport_state_test.erl +++ /dev/null @@ -1,117 +0,0 @@ -%% -%% Licensed to the Apache Software Foundation (ASF) under one -%% or more contributor license agreements. See the NOTICE file -%% distributed with this work for additional information -%% regarding copyright ownership. The ASF licenses this file -%% to you under the Apache License, Version 2.0 (the -%% "License"); you may not use this file except in compliance -%% with the License. You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% - --module(thrift_transport_state_test). - --behaviour(gen_server). --behaviour(thrift_transport). - -%% API --export([new/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% thrift_transport callbacks --export([write/2, read/2, flush/1, close/1]). - --record(trans, {wrapped, % #thrift_transport{} - version :: integer(), - counter :: pid() - }). --type state() :: #trans{}. --include("thrift_transport_behaviour.hrl"). - --record(state, {cversion :: integer()}). - - -new(WrappedTransport) -> - case gen_server:start_link(?MODULE, [], []) of - {ok, Pid} -> - Trans = #trans{wrapped = WrappedTransport, - version = 0, - counter = Pid}, - thrift_transport:new(?MODULE, Trans); - Else -> - Else - end. - -%%==================================================================== -%% thrift_transport callbacks -%%==================================================================== - -write(Transport0 = #trans{wrapped = Wrapped0}, Data) -> - Transport1 = check_version(Transport0), - {Wrapped1, Result} = thrift_transport:write(Wrapped0, Data), - Transport2 = Transport1#trans{wrapped = Wrapped1}, - {Transport2, Result}. - -flush(Transport0 = #trans{wrapped = Wrapped0}) -> - Transport1 = check_version(Transport0), - {Wrapped1, Result} = thrift_transport:flush(Wrapped0), - Transport2 = Transport1#trans{wrapped = Wrapped1}, - {Transport2, Result}. - -close(Transport0 = #trans{wrapped = Wrapped0}) -> - Transport1 = check_version(Transport0), - shutdown_counter(Transport1), - {Wrapped1, Result} = thrift_transport:close(Wrapped0), - Transport2 = Transport1#trans{wrapped = Wrapped1}, - {Transport2, Result}. - -read(Transport0 = #trans{wrapped = Wrapped0}, Len) -> - Transport1 = check_version(Transport0), - {Wrapped1, Result} = thrift_transport:read(Wrapped0, Len), - Transport2 = Transport1#trans{wrapped = Wrapped1}, - {Transport2, Result}. - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([]) -> - {ok, #state{cversion = 0}}. - -handle_call(check_version, _From, State = #state{cversion = Version}) -> - {reply, Version, State#state{cversion = Version+1}}. - -handle_cast(shutdown, State) -> - {stop, normal, State}. - -handle_info(_Info, State) -> {noreply, State}. -code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(_Reason, _State) -> ok. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -check_version(Transport = #trans{version = Version, counter = Counter}) -> - case gen_server:call(Counter, check_version) of - Version -> - Transport#trans{version = Version+1}; - _Else -> - % State wasn't propagated properly. Die. - erlang:error(state_not_propagated) - end. - -shutdown_counter(#trans{counter = Counter}) -> - gen_server:cast(Counter, shutdown). diff --git a/test/erl/Makefile b/test/erl/Makefile index a6b5ae61..21260372 100644 --- a/test/erl/Makefile +++ b/test/erl/Makefile @@ -29,7 +29,7 @@ SRCDIR=src ALL_INCLUDEDIR=$(GEN_INCLUDEDIR) $(INCLUDEDIR) ../../lib/erl/include INCLUDEFLAGS=$(patsubst %,-I%, ${ALL_INCLUDEDIR}) -MODULES = stress_server test_server test_client test_disklog test_membuffer +MODULES = stress_server test_server test_disklog test_membuffer test_tether INCLUDES = TARGETS = $(patsubst %,${TARGETDIR}/%.beam,${MODULES}) @@ -55,11 +55,11 @@ ${GENDIR}/: ${RPCFILE} ${GEN_TARGETDIR}/: ${GENDIR}/ rm -rf ${GEN_TARGETDIR} mkdir -p ${GEN_TARGETDIR} - erlc ${ERLC_FLAGS} ${INCLUDEFLAGS} -o ${GEN_TARGETDIR} ${GEN_SRCDIR}/*.erl + erlc ${INCLUDEFLAGS} -o ${GEN_TARGETDIR} ${GEN_SRCDIR}/*.erl $(TARGETS): ${TARGETDIR}/%.beam: ${SRCDIR}/%.erl ${GEN_INCLUDEDIR}/ ${HEADERS} mkdir -p ${TARGETDIR} - erlc ${ERLC_FLAGS} ${INCLUDEFLAGS} -o ${TARGETDIR} $< + erlc ${INCLUDEFLAGS} -o ${TARGETDIR} $< clean: rm -f ${TARGETDIR}/*.beam diff --git a/test/erl/src/test_client.erl b/test/erl/src/test_client.erl deleted file mode 100644 index a26467f0..00000000 --- a/test/erl/src/test_client.erl +++ /dev/null @@ -1,132 +0,0 @@ -%% -%% Licensed to the Apache Software Foundation (ASF) under one -%% or more contributor license agreements. See the NOTICE file -%% distributed with this work for additional information -%% regarding copyright ownership. The ASF licenses this file -%% to you under the Apache License, Version 2.0 (the -%% "License"); you may not use this file except in compliance -%% with the License. You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% - --module(test_client). - --export([start/0, start/1]). - --include("thriftTest_types.hrl"). - --record(options, {port = 9090, - client_opts = []}). - -parse_args(Args) -> parse_args(Args, #options{}). -parse_args([], Opts) -> Opts; -parse_args([Head | Rest], Opts) -> - NewOpts = - case catch list_to_integer(Head) of - Port when is_integer(Port) -> - Opts#options{port = Port}; - _Else -> - case Head of - "framed" -> - Opts#options{client_opts = [{framed, true} | Opts#options.client_opts]}; - "" -> - Opts; - _Else -> - erlang:error({bad_arg, Head}) - end - end, - parse_args(Rest, NewOpts). - - -start() -> start([]). -start(Args) -> - #options{port = Port, client_opts = ClientOpts} = parse_args(Args), - {ok, Client0} = thrift_client_util:new( - "127.0.0.1", Port, thriftTest_thrift, ClientOpts), - - DemoXtruct = #xtruct{ - string_thing = <<"Zero">>, - byte_thing = 1, - i32_thing = 9128361, - i64_thing = 9223372036854775807}, - - DemoNest = #xtruct2{ - byte_thing = 7, - struct_thing = DemoXtruct, - % Note that we don't set i32_thing, it will come back as undefined - % from the Python server, but 0 from the C++ server, since it is not - % optional - i32_thing = 2}, - - % Is it safe to match these things? - DemoDict = dict:from_list([ {Key, Key-10} || Key <- lists:seq(0,10) ]), - DemoSet = sets:from_list([ Key || Key <- lists:seq(-3,3) ]), - - %DemoInsane = #insanity{ - % userMap = dict:from_list([{?thriftTest_FIVE, 5000}]), - % xtructs = [#xtruct{ string_thing = <<"Truck">>, byte_thing = 8, i32_thing = 8, i64_thing = 8}]}, - - {Client01, {ok, ok}} = thrift_client:call(Client0, testVoid, []), - - {Client02, {ok, <<"Test">>}} = thrift_client:call(Client01, testString, ["Test"]), - {Client03, {ok, <<"Test">>}} = thrift_client:call(Client02, testString, [<<"Test">>]), - {Client04, {ok, 63}} = thrift_client:call(Client03, testByte, [63]), - {Client05, {ok, -1}} = thrift_client:call(Client04, testI32, [-1]), - {Client06, {ok, 0}} = thrift_client:call(Client05, testI32, [0]), - {Client07, {ok, -34359738368}} = thrift_client:call(Client06, testI64, [-34359738368]), - {Client08, {ok, -5.2098523}} = thrift_client:call(Client07, testDouble, [-5.2098523]), - {Client09, {ok, DemoXtruct}} = thrift_client:call(Client08, testStruct, [DemoXtruct]), - {Client10, {ok, DemoNest}} = thrift_client:call(Client09, testNest, [DemoNest]), - {Client11, {ok, DemoDict}} = thrift_client:call(Client10, testMap, [DemoDict]), - {Client12, {ok, DemoSet}} = thrift_client:call(Client11, testSet, [DemoSet]), - {Client13, {ok, [-1,2,3]}} = thrift_client:call(Client12, testList, [[-1,2,3]]), - {Client14, {ok, 1}} = thrift_client:call(Client13, testEnum, [?thriftTest_ONE]), - {Client15, {ok, 309858235082523}} = thrift_client:call(Client14, testTypedef, [309858235082523]), - - % No python implementation, but works with C++ and Erlang. - %{Client16, {ok, InsaneResult}} = thrift_client:call(Client15, testInsanity, [DemoInsane]), - %io:format("~p~n", [InsaneResult]), - Client16 = Client15, - - {Client17, {ok, #xtruct{string_thing = <<"Message">>}}} = - thrift_client:call(Client16, testMultiException, ["Safe", "Message"]), - - Client18 = - try - {ClientS1, Result1} = thrift_client:call(Client17, testMultiException, ["Xception", "Message"]), - io:format("Unexpected return! ~p~n", [Result1]), - ClientS1 - catch - throw:{ClientS2, {exception, ExnS1 = #xception{}}} -> - #xception{errorCode = 1001, message = <<"This is an Xception">>} = ExnS1, - ClientS2; - throw:{ClientS2, {exception, _ExnS1 = #xception2{}}} -> - io:format("Wrong exception type!~n", []), - ClientS2 - end, - - Client19 = - try - {ClientS3, Result2} = thrift_client:call(Client18, testMultiException, ["Xception2", "Message"]), - io:format("Unexpected return! ~p~n", [Result2]), - ClientS3 - catch - throw:{ClientS4, {exception, _ExnS2 = #xception{}}} -> - io:format("Wrong exception type!~n", []), - ClientS4; - throw:{ClientS4, {exception, ExnS2 = #xception2{}}} -> - #xception2{errorCode = 2002, - struct_thing = #xtruct{ - string_thing = <<"This is an Xception2">>}} = ExnS2, - ClientS4 - end, - - thrift_client:close(Client19). diff --git a/test/erl/src/test_disklog.erl b/test/erl/src/test_disklog.erl index fc0dcf86..7b0be72d 100644 --- a/test/erl/src/test_disklog.erl +++ b/test/erl/src/test_disklog.erl @@ -29,21 +29,20 @@ t() -> {size, {1024*1024, 10}}]), {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( TransportFactory, []), - {ok, Proto} = ProtocolFactory(), - {ok, Client0} = thrift_client:new(Proto, thriftTest_thrift), + {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift), io:format("Client started~n"), % We have to make oneway calls into this client only since otherwise it will try % to read from the disklog and go boom. - {Client1, {ok, ok}} = thrift_client:call(Client0, testOneway, [16#deadbeef]), + {ok, ok} = thrift_client:call(Client, testOneway, [16#deadbeef]), io:format("Call written~n"), % Use the send_call method to write a non-oneway call into the log - {Client2, ok} = thrift_client:send_call(Client1, testString, [<<"hello world">>]), + ok = thrift_client:send_call(Client, testString, [<<"hello world">>]), io:format("Non-oneway call sent~n"), - {_Client3, ok} = thrift_client:close(Client2), + ok = thrift_client:close(Client), io:format("Client closed~n"), ok. @@ -62,22 +61,21 @@ t_base64() -> thrift_buffered_transport:new_transport_factory(B64Factory), {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( BufFactory, []), - {ok, Proto} = ProtocolFactory(), - {ok, Client0} = thrift_client:new(Proto, thriftTest_thrift), + {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift), io:format("Client started~n"), % We have to make oneway calls into this client only since otherwise it will try % to read from the disklog and go boom. - {Client1, {ok, ok}} = thrift_client:call(Client0, testOneway, [16#deadbeef]), + {ok, ok} = thrift_client:call(Client, testOneway, [16#deadbeef]), io:format("Call written~n"), % Use the send_call method to write a non-oneway call into the log - {Client2, ok} = thrift_client:send_call(Client1, testString, [<<"hello world">>]), + ok = thrift_client:send_call(Client, testString, [<<"hello world">>]), io:format("Non-oneway call sent~n"), - {_Client3, ok} = thrift_client:close(Client2), + ok = thrift_client:close(Client), io:format("Client closed~n"), ok. - + diff --git a/test/erl/src/test_membuffer.erl b/test/erl/src/test_membuffer.erl index 19ac5277..7bd23a0f 100644 --- a/test/erl/src/test_membuffer.erl +++ b/test/erl/src/test_membuffer.erl @@ -30,12 +30,12 @@ test_data() -> t1() -> {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol0} = thrift_binary_protocol:new(Transport), + {ok, Protocol} = thrift_binary_protocol:new(Transport), TestData = test_data(), - {Protocol1, ok} = thrift_protocol:write(Protocol0, + ok = thrift_protocol:write(Protocol, {{struct, element(2, thriftTest_types:struct_info('xtruct'))}, TestData}), - {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, + {ok, Result} = thrift_protocol:read(Protocol, {struct, element(2, thriftTest_types:struct_info('xtruct'))}, 'xtruct'), @@ -44,12 +44,12 @@ t1() -> t2() -> {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol0} = thrift_binary_protocol:new(Transport), + {ok, Protocol} = thrift_binary_protocol:new(Transport), TestData = test_data(), - {Protocol1, ok} = thrift_protocol:write(Protocol0, + ok = thrift_protocol:write(Protocol, {{struct, element(2, thriftTest_types:struct_info('xtruct'))}, TestData}), - {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, + {ok, Result} = thrift_protocol:read(Protocol, {struct, element(2, thriftTest_types:struct_info('xtruct3'))}, 'xtruct3'), @@ -61,12 +61,12 @@ t2() -> t3() -> {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol0} = thrift_binary_protocol:new(Transport), + {ok, Protocol} = thrift_binary_protocol:new(Transport), TestData = #bools{im_true = true, im_false = false}, - {Protocol1, ok} = thrift_protocol:write(Protocol0, + ok = thrift_protocol:write(Protocol, {{struct, element(2, thriftTest_types:struct_info('bools'))}, TestData}), - {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, + {ok, Result} = thrift_protocol:read(Protocol, {struct, element(2, thriftTest_types:struct_info('bools'))}, 'bools'), @@ -74,23 +74,8 @@ t3() -> true = TestData#bools.im_false =:= Result#bools.im_false. -t4() -> - {ok, Transport} = thrift_memory_buffer:new(), - {ok, Protocol0} = thrift_binary_protocol:new(Transport), - TestData = #insanity{xtructs=[]}, - {Protocol1, ok} = thrift_protocol:write(Protocol0, - {{struct, element(2, thriftTest_types:struct_info('insanity'))}, - TestData}), - {_Protocol2, {ok, Result}} = thrift_protocol:read(Protocol1, - {struct, element(2, thriftTest_types:struct_info('insanity'))}, - 'insanity'), - - TestData = Result. - - t() -> t1(), t2(), - t3(), - t4(). + t3(). diff --git a/test/erl/src/test_server.erl b/test/erl/src/test_server.erl index 28d47b16..cd439ccd 100644 --- a/test/erl/src/test_server.erl +++ b/test/erl/src/test_server.erl @@ -19,42 +19,12 @@ -module(test_server). --export([go/0, go/1, start_link/2, handle_function/2]). +-export([start_link/1, handle_function/2]). -include("thriftTest_types.hrl"). --record(options, {port = 9090, - server_opts = []}). - -parse_args(Args) -> parse_args(Args, #options{}). -parse_args([], Opts) -> Opts; -parse_args([Head | Rest], Opts) -> - NewOpts = - case catch list_to_integer(Head) of - Port when is_integer(Port) -> - Opts#options{port = Port}; - _Else -> - case Head of - "framed" -> - Opts#options{server_opts = [{framed, true} | Opts#options.server_opts]}; - "" -> - Opts; - _Else -> - erlang:error({bad_arg, Head}) - end - end, - parse_args(Rest, NewOpts). - -go() -> go([]). -go(Args) -> - #options{port = Port, server_opts = ServerOpts} = parse_args(Args), - spawn(fun() -> start_link(Port, ServerOpts), receive after infinity -> ok end end). - -start_link(Port, ServerOpts) -> - thrift_socket_server:start([{handler, ?MODULE}, - {service, thriftTest_thrift}, - {port, Port}] ++ - ServerOpts). +start_link(Port) -> + thrift_server:start_link(Port, thriftTest_thrift, ?MODULE). handle_function(testVoid, {}) -> @@ -154,12 +124,12 @@ handle_function(testInsanity, {Insanity}) when is_record(Insanity, insanity) -> {?thriftTest_THREE, Crazy}]), SecondMap = dict:from_list([{?thriftTest_SIX, Looney}]), - + Insane = dict:from_list([{1, FirstMap}, {2, SecondMap}]), - + io:format("Return = ~p~n", [Insane]), - + {reply, Insane}; handle_function(testMulti, Args = {Arg0, Arg1, Arg2, _Arg3, Arg4, Arg5}) @@ -180,7 +150,7 @@ handle_function(testException, {String}) when is_binary(String) -> case String of <<"Xception">> -> throw(#xception{errorCode = 1001, - message = String}); + message = <<"This is an Xception">>}); _ -> ok end; diff --git a/test/erl/src/test_tether.erl b/test/erl/src/test_tether.erl new file mode 100644 index 00000000..dc11a9a9 --- /dev/null +++ b/test/erl/src/test_tether.erl @@ -0,0 +1,186 @@ +%% +%% Licensed to the Apache Software Foundation (ASF) under one +%% or more contributor license agreements. See the NOTICE file +%% distributed with this work for additional information +%% regarding copyright ownership. The ASF licenses this file +%% to you under the Apache License, Version 2.0 (the +%% "License"); you may not use this file except in compliance +%% with the License. You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% Tests the behavior of clients in the face of transport errors. +%% Makes sure start, start_linked, and start_tethered work as expected. + +-module(test_tether). + +-compile(export_all). + + +t() -> + io:format("Beginning transport error test.~n"), + Pid1 = erlang:spawn(?MODULE, t_sub, [2]), + wait_for(Pid1), + io:format("Beginning protocol error test.~n"), + Pid2 = erlang:spawn(?MODULE, t_sub, [22]), + wait_for(Pid2), + ok. + +t_sub(Port) -> + io:format("Starting.~n", []), + register(tester, self()), + + Pid1 = erlang:spawn(?MODULE, test_start, [Port]), + receive after 200 -> ok end, % Wait for completion. + case is_up(Pid1) of + true -> + io:format("PASS. Unlinked owner still alive.~n"); + false -> + io:format("FAIL. Unlinked owner is dead.~n") + end, + + Pid2 = erlang:spawn(?MODULE, test_linked, [Port]), + receive after 200 -> ok end, % Wait for completion. + case is_up(Pid2) of + true -> + io:format("FAIL. Linked owner still alive.~n"); + false -> + io:format("PASS. Linked owner is dead.~n") + end, + + Pid3 = erlang:spawn(?MODULE, test_tethered, [Port]), + receive after 200 -> ok end, % Wait for completion. + case is_up(Pid3) of + true -> + io:format("PASS. Tethered owner still alive.~n"); + false -> + io:format("FAIL. Tethered owner is dead.~n") + end, + + check_extras(3). + +is_up(Pid) -> + MonitorRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonitorRef, process, Pid, _Info} -> + false + after + 50 -> + erlang:demonitor(MonitorRef), + true + end. + +wait_for(Pid) -> + MonitorRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonitorRef, process, Pid, _Info} -> + ok + end. + +check_extras(0) -> ok; +check_extras(N) -> + receive + {client, Type, Pid} -> + case {Type, is_up(Pid)} of + {unlinked, true} -> + io:format("PASS. Unlinked client still alive.~n"); + {unlinked, false} -> + io:format("FAIL. Unlinked client dead.~n"); + {linked, true} -> + io:format("FAIL. Linked client still alive.~n"); + {linked, false} -> + io:format("PASS. Linked client dead.~n"); + {tethered, true} -> + io:format("FAIL. Tethered client still alive.~n"); + {tethered, false} -> + io:format("PASS. Tethered client dead.~n") + end, + check_extras(N-1) + after + 500 -> + io:format("FAIL. Expected ~p more clients.~n", [N]) + end. + +make_thrift_client(Opts) -> + thrift_client:start(fun()->ok end, thriftTest_thrift, Opts). + +make_protocol_factory(Port) -> + {ok, TransportFactory} = + thrift_socket_transport:new_transport_factory( + "127.0.0.1", Port, []), + {ok, ProtocolFactory} = + thrift_binary_protocol:new_protocol_factory( + TransportFactory, []), + ProtocolFactory. + + +test_start(Port) -> + {ok, Client1} = make_thrift_client([{connect, false}]), + tester ! {client, unlinked, Client1}, + {ok, Client2} = make_thrift_client([{connect, false}]), + io:format("PASS. Unlinked clients created.~n"), + try + gen_server:call(Client2, {connect, make_protocol_factory(Port)}), + thrift_client:call(Client2, testVoid, []), + io:format("FAIL. Unlinked client connected and called.~n", []) + catch + Kind:Info -> + io:format("PASS. Caught unlinked error. ~p:~p~n", [Kind, Info]) + end, + receive after 100 -> + io:format("PASS. Still alive after unlinked death.~n"), + %% Hang around a little longer so our parent can verify. + receive after 200 -> ok end + end, + %% Exit abnormally to not kill our unlinked extra client. + exit(die). + +test_linked(Port) -> + {ok, Client1} = make_thrift_client([{connect, false}, {monitor, link}]), + tester ! {client, linked, Client1}, + {ok, Client2} = make_thrift_client([{connect, false}, {monitor, link}]), + io:format("PASS. Linked clients created.~n"), + try + gen_server:call(Client2, {connect, make_protocol_factory(Port)}), + thrift_client:call(Client2, testVoid, []), + io:format("FAIL. Linked client connected and called.~n", []) + catch + Kind:Info -> + io:format("FAIL. Caught linked error. ~p:~p~n", [Kind, Info]) + end, + receive after 100 -> + io:format("FAIL. Still alive after linked death.~n"), + % Hang around a little longer so our parent can verify. + receive after 200 -> ok end + end, + %% Exit abnormally to kill our linked extra client. + %% But we should never get here. + exit(die). + +test_tethered(Port) -> + {ok, Client1} = make_thrift_client([{connect, false}, {monitor, tether}]), + tester ! {client, tethered, Client1}, + {ok, Client2} = make_thrift_client([{connect, false}, {monitor, tether}]), + io:format("PASS. Tethered clients created.~n"), + try + gen_server:call(Client2, {connect, make_protocol_factory(Port)}), + thrift_client:call(Client2, testVoid, []), + io:format("FAIL. Tethered client connected and called.~n", []) + catch + Kind:Info -> + io:format("PASS. Caught tethered error. ~p:~p~n", [Kind, Info]) + end, + receive after 100 -> + io:format("PASS. Still alive after tethered death.~n"), + % Hang around a little longer so our parent can verify. + receive after 200 -> ok end + end, + %% Exit abnormally to kill our tethered extra client. + exit(die). diff --git a/tutorial/erl/client.erl b/tutorial/erl/client.erl index adaebe42..97803349 100644 --- a/tutorial/erl/client.erl +++ b/tutorial/erl/client.erl @@ -29,50 +29,46 @@ p(X) -> t() -> Port = 9999, + + {ok, Client} = thrift_client:start_link("127.0.0.1", + Port, + calculator_thrift), - {ok, Client0} = thrift_client_util:new("127.0.0.1", - Port, - calculator_thrift, - []), - - {Client1, {ok, ok}} = thrift_client:call(Client0, ping, []), + thrift_client:call(Client, ping, []), io:format("ping~n", []), - {Client2, {ok, Sum}} = thrift_client:call(Client1, add, [1, 1]), + {ok, Sum} = thrift_client:call(Client, add, [1, 1]), io:format("1+1=~p~n", [Sum]), - {Client3, {ok, Sum1}} = thrift_client:call(Client2, add, [1, 4]), + {ok, Sum1} = thrift_client:call(Client, add, [1, 4]), io:format("1+4=~p~n", [Sum1]), Work = #work{op=?tutorial_SUBTRACT, num1=15, num2=10}, - {Client4, {ok, Diff}} = thrift_client:call(Client3, calculate, [1, Work]), + {ok, Diff} = thrift_client:call(Client, calculate, [1, Work]), io:format("15-10=~p~n", [Diff]), - {Client5, {ok, Log}} = thrift_client:call(Client4, getStruct, [1]), + {ok, Log} = thrift_client:call(Client, getStruct, [1]), io:format("Log: ~p~n", [Log]), - Client6 = - try - Work1 = #work{op=?tutorial_DIVIDE, - num1=1, - num2=0}, - {ClientS1, {ok, _Quot}} = thrift_client:call(Client5, calculate, [2, Work1]), + try + Work1 = #work{op=?tutorial_DIVIDE, + num1=1, + num2=0}, + {ok, _Quot} = thrift_client:call(Client, calculate, [2, Work1]), - io:format("LAME: exception handling is broken~n", []), - ClientS1 - catch - throw:{ClientS2, Z} -> - io:format("Got exception where expecting - the " ++ - "following is NOT a problem!!!~n"), - p(Z), - ClientS2 - end, + io:format("LAME: exception handling is broken~n", []) + catch + Z -> + io:format("Got exception where expecting - the " ++ + "following is NOT a problem!!!~n"), + p(Z) + end, - {Client7, {ok, ok}} = thrift_client:call(Client6, zip, []), + {ok, ok} = thrift_client:call(Client, zip, []), io:format("zip~n", []), - {_Client8, ok} = thrift_client:close(Client7), + ok = thrift_client:close(Client), ok.