From 90b4083d4cd468aa33bd441268c5069e5674da81 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 10 Jun 2008 22:58:52 +0000 Subject: [PATCH] Implement buffered transport git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666386 13f79535-47bb-0310-9956-ffa450edef68 --- lib/alterl/src/thrift_binary_protocol.erl | 5 +- lib/alterl/src/thrift_buffered_transport.erl | 154 +++++++++++++++++++ lib/alterl/src/thrift_processor.erl | 1 + lib/alterl/src/thrift_protocol.erl | 8 +- lib/alterl/src/thrift_server.erl | 5 +- lib/alterl/src/thrift_socket_transport.erl | 6 +- lib/alterl/src/thrift_transport.erl | 9 +- 7 files changed, 181 insertions(+), 7 deletions(-) create mode 100644 lib/alterl/src/thrift_buffered_transport.erl diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl index 4e8cfb3f..0605dc1e 100644 --- a/lib/alterl/src/thrift_binary_protocol.erl +++ b/lib/alterl/src/thrift_binary_protocol.erl @@ -13,7 +13,8 @@ -export([new/1, read/2, - write/2 + write/2, + flush_transport/1 ]). -record(binary_protocol, {transport}). @@ -26,6 +27,8 @@ new(Transport) -> thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}). +flush_transport(#binary_protocol{transport = Transport}) -> + thrift_transport:flush(Transport). %%% %%% instance methods diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl new file mode 100644 index 00000000..c16f26ac --- /dev/null +++ b/lib/alterl/src/thrift_buffered_transport.erl @@ -0,0 +1,154 @@ +%%%------------------------------------------------------------------- +%%% File : thrift_buffered_transport.erl +%%% Author : +%%% Description : Buffered transport for thrift +%%% +%%% Created : 30 Jan 2008 by +%%%------------------------------------------------------------------- +-module(thrift_buffered_transport). + +-behaviour(gen_server). +-behaviour(thrift_transport). + +%% API +-export([new/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% thrift_transport callbacks +-export([write/2, read/2, flush/1]). + +-record(state, { + % The wrapped transport + wrapped, + + % a list of binaries which will be concatenated and sent during + % a flush. + % + % *** THIS LIST IS STORED IN REVERSE ORDER!!! *** + % + buffer}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +new(WrappedTransport) -> + case gen_server:start_link(?MODULE, [WrappedTransport], []) of + {ok, Pid} -> + thrift_transport:new(?MODULE, Pid); + Else -> + Else + end. + + + +%%-------------------------------------------------------------------- +%% Function: write(Transport, Data) -> ok +%% +%% Data = binary() +%% +%% Description: Writes data into the buffer +%%-------------------------------------------------------------------- +write(Transport, Data) when is_binary(Data) -> + gen_server:call(Transport, {write, Data}). + +%%-------------------------------------------------------------------- +%% Function: flush(Transpor) -> ok +%% +%% Description: Flushes the buffer through to the wrapped transport +%%-------------------------------------------------------------------- +flush(Transport) -> + gen_server:call(Transport, {flush}). + +%%-------------------------------------------------------------------- +%% Function: Read(Transport, Len) -> {ok, Data} +%% +%% Data = binary() +%% +%% Description: Reads data through from the wrapped transoprt +%%-------------------------------------------------------------------- +read(Transport, Len) when is_integer(Len) -> + gen_server:call(Transport, {read, Len}). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Wrapped]) -> + {ok, #state{wrapped = Wrapped, + buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({write, Data}, _From, State = #state{buffer = Buffer}) -> + {reply, ok, State#state{buffer = [Data | Buffer]}}; + +handle_call({read, Len}, _From, State = #state{wrapped = Wrapped}) -> + Response = thrift_transport:read(Wrapped, Len), + {reply, Response, State}; + +handle_call({flush}, _From, State = #state{buffer = Buffer, + wrapped = Wrapped}) -> + Concat = concat_binary(lists:reverse(Buffer)), + Response = thrift_transport:write(Wrapped, Concat), + % todo(todd) - flush wrapped transport here? + {reply, Response, State#state{buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- diff --git a/lib/alterl/src/thrift_processor.erl b/lib/alterl/src/thrift_processor.erl index 217c2161..fa33d3bc 100644 --- a/lib/alterl/src/thrift_processor.erl +++ b/lib/alterl/src/thrift_processor.erl @@ -123,6 +123,7 @@ send_reply(OProto, Function, ReplyMessageType, Reply) -> seqid = 0}), ok = thrift_protocol:write(OProto, Reply), ok = thrift_protocol:write(OProto, message_end), + ok = thrift_protocol:flush_transport(OProto), ok. diff --git a/lib/alterl/src/thrift_protocol.erl b/lib/alterl/src/thrift_protocol.erl index f50c612f..7db79291 100644 --- a/lib/alterl/src/thrift_protocol.erl +++ b/lib/alterl/src/thrift_protocol.erl @@ -4,6 +4,7 @@ write/2, read/2, skip/2, + flush_transport/1, typeid_to_atom/1, @@ -17,7 +18,8 @@ behaviour_info(callbacks) -> [ {read, 2}, - {write, 2} + {write, 2}, + {flush_transport, 1} ]; behaviour_info(_Else) -> undefined. @@ -27,6 +29,10 @@ new(Module, Data) when is_atom(Module) -> data = Data}}. +flush_transport(#protocol{module = Module, + data = Data}) -> + Module:flush_transport(Data). + typeid_to_atom(?tType_STOP) -> field_stop; typeid_to_atom(?tType_VOID) -> void; typeid_to_atom(?tType_BOOL) -> bool; diff --git a/lib/alterl/src/thrift_server.erl b/lib/alterl/src/thrift_server.erl index 5760e1cf..3f111641 100644 --- a/lib/alterl/src/thrift_server.erl +++ b/lib/alterl/src/thrift_server.erl @@ -111,8 +111,9 @@ acceptor(ListenSocket, Service, Handler) {ok, Socket} = gen_tcp:accept(ListenSocket), error_logger:info_msg("Accepted client"), - {ok, Transport} = thrift_socket_transport:new(Socket), - {ok, Protocol} = thrift_binary_protocol:new(Transport), + {ok, SocketTransport} = thrift_socket_transport:new(Socket), + {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), + {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), thrift_processor:start(Protocol, Protocol, Service, Handler), receive diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl index 2921c20d..a1b2e05a 100644 --- a/lib/alterl/src/thrift_socket_transport.erl +++ b/lib/alterl/src/thrift_socket_transport.erl @@ -4,7 +4,7 @@ -export([new/1, - write/2, read/2]). + write/2, read/2, flush/1]). -record(data, {socket}). @@ -16,3 +16,7 @@ write(#data{socket = Socket}, Data) when is_binary(Data) -> read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 -> gen_tcp:recv(Socket, Len). + +% We can't really flush - everything is flushed when we write +flush(_) -> + ok. diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl index bf9dc53b..c47d90f8 100644 --- a/lib/alterl/src/thrift_transport.erl +++ b/lib/alterl/src/thrift_transport.erl @@ -4,12 +4,14 @@ new/2, write/2, - read/2 + read/2, + flush/1 ]). behaviour_info(callbacks) -> [{write/2, - read/2}]; + read/2, + flush/1}]; behaviour_info(_Else) -> undefined. @@ -27,3 +29,6 @@ write(Transport, Data) when is_binary(Data) -> read(Transport, Len) when is_integer(Len) -> Module = Transport#transport.module, Module:read(Transport#transport.data, Len). + +flush(#transport{module = Module, data = Data}) -> + Module:flush(Data). -- 2.17.1