From: David Reiss Date: Mon, 30 Aug 2010 22:05:43 +0000 (+0000) Subject: erlang: Refactor thrift_transport and all transport implementations X-Git-Tag: 0.5.0~116 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=639e1cfe2b9eb6e9e852899adeeec8a41daeda81;p=common%2Fthrift.git erlang: Refactor thrift_transport and all transport implementations Note that the buffering transports still use a separate process to maintain their state. This change just changes them to use a "return-the-new-version"-style API. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@990989 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/erl/include/thrift_transport_impl.hrl b/lib/erl/include/thrift_transport_impl.hrl index bbf1b891..a9980daf 100644 --- a/lib/erl/include/thrift_transport_impl.hrl +++ b/lib/erl/include/thrift_transport_impl.hrl @@ -22,10 +22,10 @@ -ifndef(THRIFT_TRANSPORT_IMPL_INCLUDED). -define(THRIFT_TRANSPORT_IMPL_INCLUDED, true). --spec write(state(), iolist() | binary()) -> ok | {error, _Reason}. --spec read(state(), non_neg_integer()) -> {ok, binary()} | {error, _Reason}. --spec flush(state()) -> ok | {error, _Reason}. --spec close(state()) -> ok | {error, _Reason}. +-spec write(state(), iolist() | binary()) -> {state(), ok | {error, _Reason}}. +-spec read(state(), non_neg_integer()) -> {state(), {ok, binary()} | {error, _Reason}}. +-spec flush(state()) -> {state(), ok | {error, _Reason}}. +-spec close(state()) -> {state(), ok | {error, _Reason}}. -endif. diff --git a/lib/erl/src/thrift_base64_transport.erl b/lib/erl/src/thrift_base64_transport.erl index 76303331..3cdb3515 100644 --- a/lib/erl/src/thrift_base64_transport.erl +++ b/lib/erl/src/thrift_base64_transport.erl @@ -37,24 +37,27 @@ new(Wrapped) -> thrift_transport:new(?MODULE, State). -write(#b64_transport{wrapped = Wrapped}, Data) -> - thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))). +write(This = #b64_transport{wrapped = Wrapped}, Data) -> + {NewWrapped, Result} = thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))), + {This#b64_transport{wrapped = NewWrapped}, Result}. %% base64 doesn't support reading quite yet since it would involve %% nasty buffering and such -read(#b64_transport{wrapped = Wrapped}, Data) -> - {error, no_reads_allowed}. +read(This = #b64_transport{}, _Data) -> + {This, {error, no_reads_allowed}}. -flush(#b64_transport{wrapped = Wrapped}) -> - thrift_transport:write(Wrapped, <<"\n">>), - thrift_transport:flush(Wrapped). +flush(This = #b64_transport{wrapped = Wrapped0}) -> + {Wrapped1, ok} = thrift_transport:write(Wrapped0, <<"\n">>), + {Wrapped2, ok} = thrift_transport:flush(Wrapped1), + {This#b64_transport{wrapped = Wrapped2}, ok}. -close(Me = #b64_transport{wrapped = Wrapped}) -> - flush(Me), - thrift_transport:close(Wrapped). +close(This0) -> + {This1 = #b64_transport{wrapped = Wrapped}, ok} = flush(This0), + {NewWrapped, ok} = thrift_transport:close(Wrapped), + {This1#b64_transport{wrapped = NewWrapped}, ok}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_binary_protocol.erl b/lib/erl/src/thrift_binary_protocol.erl index fcb072bd..796089c1 100644 --- a/lib/erl/src/thrift_binary_protocol.erl +++ b/lib/erl/src/thrift_binary_protocol.erl @@ -61,12 +61,12 @@ parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) -> flush_transport(This = #binary_protocol{transport = Transport}) -> - Result = thrift_transport:flush(Transport), - {This, Result}. + {NewTransport, Result} = thrift_transport:flush(Transport), + {This#binary_protocol{transport = NewTransport}, Result}. close_transport(This = #binary_protocol{transport = Transport}) -> - Result = thrift_transport:close(Transport), - {This, Result}. + {NewTransport, Result} = thrift_transport:close(Transport), + {This#binary_protocol{transport = NewTransport}, Result}. %%% %%% instance methods @@ -166,8 +166,8 @@ write(This0, {string, Bin}) when is_binary(Bin) -> %% Data :: iolist() write(This = #binary_protocol{transport = Trans}, Data) -> - Result = thrift_transport:write(Trans, Data), - {This, Result}. + {NewTransport, Result} = thrift_transport:write(Trans, Data), + {This#binary_protocol{transport = NewTransport}, Result}. %% @@ -312,8 +312,8 @@ read(This0, string) -> {#binary_protocol{}, {ok, binary()} | {error, _Reason}}. read_data(This, 0) -> {This, {ok, <<>>}}; read_data(This = #binary_protocol{transport = Trans}, Len) when is_integer(Len) andalso Len > 0 -> - Result = thrift_transport:read(Trans, Len), - {This, Result}. + {NewTransport, Result} = thrift_transport:read(Trans, Len), + {This#binary_protocol{transport = NewTransport}, Result}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_buffered_transport.erl b/lib/erl/src/thrift_buffered_transport.erl index c6df73a3..6668549c 100644 --- a/lib/erl/src/thrift_buffered_transport.erl +++ b/lib/erl/src/thrift_buffered_transport.erl @@ -64,7 +64,7 @@ new(WrappedTransport) -> %% Description: Writes data into the buffer %%-------------------------------------------------------------------- write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). + {Transport, gen_server:call(Transport, {write, Data})}. %%-------------------------------------------------------------------- %% Function: flush(Transport) -> ok @@ -72,7 +72,7 @@ write(Transport, Data) -> %% Description: Flushes the buffer through to the wrapped transport %%-------------------------------------------------------------------- flush(Transport) -> - gen_server:call(Transport, flush). + {Transport, gen_server:call(Transport, flush)}. %%-------------------------------------------------------------------- %% Function: close(Transport) -> ok @@ -80,7 +80,7 @@ flush(Transport) -> %% Description: Closes the transport and the wrapped transport %%-------------------------------------------------------------------- close(Transport) -> - gen_server:cast(Transport, close). + {Transport, gen_server:cast(Transport, close)}. %%-------------------------------------------------------------------- %% Function: Read(Transport, Len) -> {ok, Data} @@ -90,7 +90,7 @@ close(Transport) -> %% Description: Reads data through from the wrapped transoprt %%-------------------------------------------------------------------- read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}, _Timeout=10000). + {Transport, gen_server:call(Transport, {read, Len}, _Timeout=10000)}. %%==================================================================== %% gen_server callbacks @@ -120,14 +120,17 @@ handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBu {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}}; handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) -> - Response = thrift_transport:read(Wrapped, Len), - {reply, Response, State}; + {NewWrapped, Response} = thrift_transport:read(Wrapped, Len), + NewState = State#buffered_transport{wrapped = NewWrapped}, + {reply, Response, NewState}; handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf, - wrapped = Wrapped}) -> - Response = thrift_transport:write(Wrapped, WBuf), - thrift_transport:flush(Wrapped), - {reply, Response, State#buffered_transport{write_buffer = []}}. + wrapped = Wrapped0}) -> + {Wrapped1, Response} = thrift_transport:write(Wrapped0, WBuf), + {Wrapped2, _} = thrift_transport:flush(Wrapped1), + NewState = State#buffered_transport{write_buffer = [], + wrapped = Wrapped2}, + {reply, Response, NewState}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | diff --git a/lib/erl/src/thrift_disk_log_transport.erl b/lib/erl/src/thrift_disk_log_transport.erl index 2645c67d..87d9547a 100644 --- a/lib/erl/src/thrift_disk_log_transport.erl +++ b/lib/erl/src/thrift_disk_log_transport.erl @@ -68,30 +68,33 @@ parse_opts([{sync_every, Int} | Rest], State) when is_integer(Int), Int > 0 -> %%%% TRANSPORT IMPLENTATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% disk_log_transport is write-only -read(_State, Len) -> - {error, no_read_from_disk_log}. +read(State, Len) -> + {State, {error, no_read_from_disk_log}}. -write(#dl_transport{log = Log}, Data) -> - disk_log:balog(Log, erlang:iolist_to_binary(Data)). +write(This = #dl_transport{log = Log}, Data) -> + {This, disk_log:balog(Log, erlang:iolist_to_binary(Data))}. force_flush(#dl_transport{log = Log}) -> error_logger:info_msg("~p syncing~n", [?MODULE]), disk_log:sync(Log). -flush(#dl_transport{log = Log, sync_every = SE}) -> +flush(This = #dl_transport{log = Log, sync_every = SE}) -> case SE of undefined -> % no time-based sync disk_log:sync(Log); _Else -> % sync will happen automagically ok - end. + end, + {This, ok}. + + %% On close, close the underlying log if we're configured to do so. -close(#dl_transport{close_on_close = false}) -> - ok; -close(#dl_transport{log = Log}) -> - disk_log:lclose(Log). +close(This = #dl_transport{close_on_close = false}) -> + {This, ok}; +close(This = #dl_transport{log = Log}) -> + {This, disk_log:lclose(Log)}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_file_transport.erl b/lib/erl/src/thrift_file_transport.erl index 7ee1c127..63f2d751 100644 --- a/lib/erl/src/thrift_file_transport.erl +++ b/lib/erl/src/thrift_file_transport.erl @@ -65,25 +65,25 @@ parse_opts([], State) -> %%%% TRANSPORT IMPL %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -write(#t_file_transport{device = Device, mode = write}, Data) -> - file:write(Device, Data); -write(_T, _D) -> - {error, read_mode}. +write(This = #t_file_transport{device = Device, mode = write}, Data) -> + {This, file:write(Device, Data)}; +write(This, _D) -> + {This, {error, read_mode}}. -read(#t_file_transport{device = Device, mode = read}, Len) +read(This = #t_file_transport{device = Device, mode = read}, Len) when is_integer(Len), Len >= 0 -> - file:read(Device, Len); -read(_T, _D) -> - {error, read_mode}. + {This, file:read(Device, Len)}; +read(This, _D) -> + {This, {error, read_mode}}. -flush(#t_file_transport{device = Device, mode = write}) -> - file:sync(Device). +flush(This = #t_file_transport{device = Device, mode = write}) -> + {This, file:sync(Device)}. -close(#t_file_transport{device = Device, should_close = SC}) -> +close(This = #t_file_transport{device = Device, should_close = SC}) -> case SC of true -> - file:close(Device); + {This, file:close(Device)}; false -> - ok + {This, ok} end. diff --git a/lib/erl/src/thrift_framed_transport.erl b/lib/erl/src/thrift_framed_transport.erl index 92bd5885..c1c8850c 100644 --- a/lib/erl/src/thrift_framed_transport.erl +++ b/lib/erl/src/thrift_framed_transport.erl @@ -62,7 +62,7 @@ new(WrappedTransport) -> %% Description: Writes data into the buffer %%-------------------------------------------------------------------- write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). + {Transport, gen_server:call(Transport, {write, Data})}. %%-------------------------------------------------------------------- %% Function: flush(Transport) -> ok @@ -70,7 +70,7 @@ write(Transport, Data) -> %% Description: Flushes the buffer through to the wrapped transport %%-------------------------------------------------------------------- flush(Transport) -> - gen_server:call(Transport, flush). + {Transport, gen_server:call(Transport, flush)}. %%-------------------------------------------------------------------- %% Function: close(Transport) -> ok @@ -78,7 +78,7 @@ flush(Transport) -> %% Description: Closes the transport and the wrapped transport %%-------------------------------------------------------------------- close(Transport) -> - gen_server:cast(Transport, close). + {Transport, gen_server:cast(Transport, close)}. %%-------------------------------------------------------------------- %% Function: Read(Transport, Len) -> {ok, Data} @@ -88,7 +88,7 @@ close(Transport) -> %% Description: Reads data through from the wrapped transoprt %%-------------------------------------------------------------------- read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}). + {Transport, gen_server:call(Transport, {read, Len})}. %%==================================================================== %% gen_server callbacks @@ -118,22 +118,22 @@ init([Wrapped]) -> handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) -> {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}}; -handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped, +handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped0, read_buffer = RBuf}) -> - {RBuf1, RBuf1Size} = + {Wrapped1, {RBuf1, RBuf1Size}} = %% if the read buffer is empty, read another frame %% otherwise, just read from what's left in the buffer case iolist_size(RBuf) of 0 -> %% read the frame length - {ok, <>} = - thrift_transport:read(Wrapped, 4), + {WrappedS1, {ok, <>}} = + thrift_transport:read(Wrapped0, 4), %% then read the data - {ok, Bin} = - thrift_transport:read(Wrapped, FrameLen), - {Bin, erlang:byte_size(Bin)}; + {WrappedS2, {ok, Bin}} = + thrift_transport:read(WrappedS1, FrameLen), + {WrappedS2, {Bin, erlang:byte_size(Bin)}}; Sz -> - {RBuf, Sz} + {Wrapped0, {RBuf, Sz}} end, %% pull off Give bytes, return them to the user, leave the rest in the buffer @@ -141,7 +141,7 @@ handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped, <> = iolist_to_binary(RBuf1), Response = {ok, Data}, - State1 = State#framed_transport{read_buffer=RBuf2}, + State1 = State#framed_transport{wrapped = Wrapped1, read_buffer=RBuf2}, {reply, Response, State1}; @@ -193,15 +193,15 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- do_flush(State = #framed_transport{write_buffer = Buffer, - wrapped = Wrapped}) -> + wrapped = Wrapped0}) -> FrameLen = iolist_size(Buffer), Data = [<>, Buffer], - Response = thrift_transport:write(Wrapped, Data), + {Wrapped1, Response} = thrift_transport:write(Wrapped0, Data), - thrift_transport:flush(Wrapped), + {Wrapped2, _} = thrift_transport:flush(Wrapped1), - State1 = State#framed_transport{write_buffer = []}, + State1 = State#framed_transport{wrapped = Wrapped2, write_buffer = []}, {Response, State1}. min(A,B) when A A; diff --git a/lib/erl/src/thrift_http_transport.erl b/lib/erl/src/thrift_http_transport.erl index f0a45fe7..44f23138 100644 --- a/lib/erl/src/thrift_http_transport.erl +++ b/lib/erl/src/thrift_http_transport.erl @@ -77,7 +77,7 @@ new(Host, Path, Options) -> %% Description: Writes data into the buffer %%-------------------------------------------------------------------- write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). + {Transport, gen_server:call(Transport, {write, Data})}. %%-------------------------------------------------------------------- %% Function: flush(Transport) -> ok @@ -85,7 +85,7 @@ write(Transport, Data) -> %% Description: Flushes the buffer, making a request %%-------------------------------------------------------------------- flush(Transport) -> - gen_server:call(Transport, flush). + {Transport, gen_server:call(Transport, flush)}. %%-------------------------------------------------------------------- %% Function: close(Transport) -> ok @@ -93,7 +93,7 @@ flush(Transport) -> %% Description: Closes the transport %%-------------------------------------------------------------------- close(Transport) -> - gen_server:cast(Transport, close). + {Transport, gen_server:cast(Transport, close)}. %%-------------------------------------------------------------------- %% Function: Read(Transport, Len) -> {ok, Data} @@ -103,7 +103,7 @@ close(Transport) -> %% Description: Reads data through from the wrapped transoprt %%-------------------------------------------------------------------- read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}). + {Transport, gen_server:call(Transport, {read, Len})}. %%==================================================================== %% gen_server callbacks diff --git a/lib/erl/src/thrift_memory_buffer.erl b/lib/erl/src/thrift_memory_buffer.erl index 34a36689..ae8b6d2b 100644 --- a/lib/erl/src/thrift_memory_buffer.erl +++ b/lib/erl/src/thrift_memory_buffer.erl @@ -58,7 +58,7 @@ new_transport_factory() -> %% Description: Writes data into the buffer %%-------------------------------------------------------------------- write(Transport, Data) -> - gen_server:call(Transport, {write, Data}). + {Transport, gen_server:call(Transport, {write, Data})}. %%-------------------------------------------------------------------- %% Function: flush(Transport) -> ok @@ -66,7 +66,7 @@ write(Transport, Data) -> %% Description: Flushes the buffer through to the wrapped transport %%-------------------------------------------------------------------- flush(Transport) -> - gen_server:call(Transport, flush). + {Transport, gen_server:call(Transport, flush)}. %%-------------------------------------------------------------------- %% Function: close(Transport) -> ok @@ -74,7 +74,7 @@ flush(Transport) -> %% Description: Closes the transport and the wrapped transport %%-------------------------------------------------------------------- close(Transport) -> - gen_server:cast(Transport, close). + {Transport, gen_server:cast(Transport, close)}. %%-------------------------------------------------------------------- %% Function: Read(Transport, Len) -> {ok, Data} @@ -84,7 +84,7 @@ close(Transport) -> %% Description: Reads data through from the wrapped transoprt %%-------------------------------------------------------------------- read(Transport, Len) when is_integer(Len) -> - gen_server:call(Transport, {read, Len}). + {Transport, gen_server:call(Transport, {read, Len})}. %%==================================================================== %% gen_server callbacks diff --git a/lib/erl/src/thrift_socket_transport.erl b/lib/erl/src/thrift_socket_transport.erl index 1a8ba817..4c552ae3 100644 --- a/lib/erl/src/thrift_socket_transport.erl +++ b/lib/erl/src/thrift_socket_transport.erl @@ -47,25 +47,26 @@ new(Socket, Opts) when is_list(Opts) -> thrift_transport:new(?MODULE, State). %% Data :: iolist() -write(#data{socket = Socket}, Data) -> - gen_tcp:send(Socket, Data). +write(This = #data{socket = Socket}, Data) -> + {This, gen_tcp:send(Socket, Data)}. -read(#data{socket=Socket, recv_timeout=Timeout}, Len) +read(This = #data{socket=Socket, recv_timeout=Timeout}, Len) when is_integer(Len), Len >= 0 -> case gen_tcp:recv(Socket, Len, Timeout) of Err = {error, timeout} -> error_logger:info_msg("read timeout: peer conn ~p", [inet:peername(Socket)]), gen_tcp:close(Socket), - Err; - Data -> Data + {This, Err}; + Data -> + {This, Data} end. %% We can't really flush - everything is flushed when we write -flush(_) -> - ok. +flush(This) -> + {This, ok}. -close(#data{socket = Socket}) -> - gen_tcp:close(Socket). +close(This = #data{socket = Socket}) -> + {This, gen_tcp:close(Socket)}. %%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/lib/erl/src/thrift_transport.erl b/lib/erl/src/thrift_transport.erl index 420281c7..0de7a107 100644 --- a/lib/erl/src/thrift_transport.erl +++ b/lib/erl/src/thrift_transport.erl @@ -41,20 +41,24 @@ new(Module, Data) when is_atom(Module) -> {ok, #transport{module = Module, data = Data}}. --spec write(#transport{}, iolist() | binary()) -> ok | {error, _Reason}. +-spec write(#transport{}, iolist() | binary()) -> {#transport{}, ok | {error, _Reason}}. write(Transport, Data) -> Module = Transport#transport.module, - Module:write(Transport#transport.data, Data). + {NewTransData, Result} = Module:write(Transport#transport.data, Data), + {Transport#transport{data = NewTransData}, Result}. --spec read(#transport{}, non_neg_integer()) -> {ok, binary()} | {error, _Reason}. +-spec read(#transport{}, non_neg_integer()) -> {#transport{}, {ok, binary()} | {error, _Reason}}. read(Transport, Len) when is_integer(Len) -> Module = Transport#transport.module, - Module:read(Transport#transport.data, Len). + {NewTransData, Result} = Module:read(Transport#transport.data, Len), + {Transport#transport{data = NewTransData}, Result}. --spec flush(#transport{}) -> ok | {error, _Reason}. -flush(#transport{module = Module, data = Data}) -> - Module:flush(Data). +-spec flush(#transport{}) -> {#transport{}, ok | {error, _Reason}}. +flush(Transport = #transport{module = Module, data = Data}) -> + {NewTransData, Result} = Module:flush(Data), + {Transport#transport{data = NewTransData}, Result}. --spec close(#transport{}) -> ok | {error, _Reason}. -close(#transport{module = Module, data = Data}) -> - Module:close(Data). +-spec close(#transport{}) -> {#transport{}, ok | {error, _Reason}}. +close(Transport = #transport{module = Module, data = Data}) -> + {NewTransData, Result} = Module:close(Data), + {Transport#transport{data = NewTransData}, Result}.