From 914ebb4b69dc8fe351efce4f029e68cde3c3ba3d Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 11 Jun 2008 01:01:48 +0000 Subject: [PATCH] Erlang: add framed_transport and non-strict binary_protocol - thrift_client now takes as its fourth parameter Options: framed, strict_{read,write}, connect_timeout (P.S. fourth param used to be Timeout) - binary protocol now takes options: strict_{read,write} - buffers in framed and buffered transport are now iolists and not reversed lists of binaries - rename buffer in buffered transport "write_buffer" to match framed transport git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666447 13f79535-47bb-0310-9956-ffa450edef68 --- lib/alterl/src/thrift_binary_protocol.erl | 68 +++++-- lib/alterl/src/thrift_buffered_transport.erl | 27 ++- lib/alterl/src/thrift_client.erl | 52 +++-- lib/alterl/src/thrift_framed_transport.erl | 198 +++++++++++++++++++ lib/alterl/src/thrift_socket_transport.erl | 4 +- lib/alterl/src/thrift_transport.erl | 3 +- 6 files changed, 305 insertions(+), 47 deletions(-) create mode 100644 lib/alterl/src/thrift_framed_transport.erl diff --git a/lib/alterl/src/thrift_binary_protocol.erl b/lib/alterl/src/thrift_binary_protocol.erl index f115b2f4..da43d148 100644 --- a/lib/alterl/src/thrift_binary_protocol.erl +++ b/lib/alterl/src/thrift_binary_protocol.erl @@ -11,22 +11,37 @@ -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --export([new/1, +-export([new/1, new/2, read/2, write/2, flush_transport/1, close_transport/1 -]). - --record(binary_protocol, {transport}). + ]). +-record(binary_protocol, {transport, + strict_read=true, + strict_write=true + }). -define(VERSION_MASK, 16#FFFF0000). -define(VERSION_1, 16#80010000). - +-define(TYPE_MASK, 16#000000ff). new(Transport) -> - thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}). + new(Transport, _Options = []). + +new(Transport, Options) -> + State = #binary_protocol{transport = Transport}, + State1 = parse_options(Options, State), + thrift_protocol:new(?MODULE, State1). + +parse_options([], State) -> + State; +parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) -> + parse_options(Rest, State#binary_protocol{strict_read=Bool}); +parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) -> + parse_options(Rest, State#binary_protocol{strict_write=Bool}). + flush_transport(#binary_protocol{transport = Transport}) -> thrift_transport:flush(Transport). @@ -42,9 +57,16 @@ write(This, #protocol_message_begin{ name = Name, type = Type, seqid = Seqid}) -> - write(This, {i32, ?VERSION_1 bor Type}), - write(This, {string, Name}), - write(This, {i32, Seqid}), + case This#binary_protocol.strict_write of + true -> + write(This, {i32, ?VERSION_1 bor Type}), + write(This, {string, Name}), + write(This, {i32, Seqid}); + false -> + write(This, {string, Name}), + write(This, {byte, Type}), + write(This, {i32, Seqid}) + end, ok; write(This, message_end) -> ok; @@ -121,20 +143,40 @@ write(This, {string, Bin}) when is_binary(Bin) -> write(This, {i32, size(Bin)}), write(This, Bin); -write(This, Binary) when is_binary(Binary) -> - thrift_transport:write(This#binary_protocol.transport, Binary). +%% Data :: iolist() +write(This, Data) -> + thrift_transport:write(This#binary_protocol.transport, Data). %% read(This, message_begin) -> case read(This, i32) of - {ok, Version} when Version band ?VERSION_MASK == ?VERSION_1 -> - Type = Version band 16#000000ff, + {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 -> + %% we're at version 1 {ok, Name} = read(This, string), + Type = Sz band ?TYPE_MASK, {ok, SeqId} = read(This, i32), #protocol_message_begin{name = binary_to_list(Name), type = Type, seqid = SeqId}; + + {ok, Sz} when Sz < 0 -> + %% there's a version number but it's unexpected + {error, {bad_binary_protocol_version, Sz}}; + + {ok, Sz} when This#binary_protocol.strict_read =:= true -> + %% strict_read is true and there's no version header; that's an error + {error, no_binary_protocol_version}; + + {ok, Sz} when This#binary_protocol.strict_read =:= false -> + %% strict_read is false, so just read the old way + {ok, Name} = read(This, Sz), + {ok, Type} = read(This, byte), + {ok, SeqId} = read(This, i32), + #protocol_message_begin{name = binary_to_list(Name), + type = Type, + seqid = SeqId}; + Err = {error, closed} -> Err; Err = {error, ebadf} -> Err end; diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl index d79e9873..575245db 100644 --- a/lib/alterl/src/thrift_buffered_transport.erl +++ b/lib/alterl/src/thrift_buffered_transport.erl @@ -21,11 +21,7 @@ -export([write/2, read/2, flush/1, close/1]). -record(buffered_transport, {wrapped, % a thrift_transport - buffer - %% a list of binaries which will be concatenated and sent during - %% a flush. - %% - %% *** THIS LIST IS STORED IN REVERSE ORDER!!! *** + write_buffer % iolist() }). %%==================================================================== @@ -46,11 +42,11 @@ new(WrappedTransport) -> %%-------------------------------------------------------------------- %% Function: write(Transport, Data) -> ok %% -%% Data = binary() +%% Data = iolist() %% %% Description: Writes data into the buffer %%-------------------------------------------------------------------- -write(Transport, Data) when is_binary(Data) -> +write(Transport, Data) -> gen_server:call(Transport, {write, Data}). %%-------------------------------------------------------------------- @@ -94,7 +90,7 @@ init([Wrapped]) -> %% TODO(cpiro): need to trap exits here so when transport exits %% normally from under our feet we exit normally {ok, #buffered_transport{wrapped = Wrapped, - buffer = []}}. + write_buffer = []}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -105,19 +101,18 @@ init([Wrapped]) -> %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call({write, Data}, _From, State = #buffered_transport{buffer = Buffer}) -> - {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}}; +handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = WBuf}) -> + {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}}; handle_call({read, Len}, _From, State = #buffered_transport{wrapped = Wrapped}) -> Response = thrift_transport:read(Wrapped, Len), {reply, Response, State}; -handle_call(flush, _From, State = #buffered_transport{buffer = Buffer, +handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf, wrapped = Wrapped}) -> - Concat = concat_binary(lists:reverse(Buffer)), - Response = thrift_transport:write(Wrapped, Concat), + Response = thrift_transport:write(Wrapped, WBuf), thrift_transport:flush(Wrapped), - {reply, Response, State#buffered_transport{buffer = []}}. + {reply, Response, State#buffered_transport{write_buffer = []}}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | @@ -125,9 +120,9 @@ handle_call(flush, _From, State = #buffered_transport{buffer = Buffer, %% {stop, Reason, State} %% Description: Handling cast messages %%-------------------------------------------------------------------- -handle_cast(close, State = #buffered_transport{buffer = Buffer, +handle_cast(close, State = #buffered_transport{write_buffer = WBuf, wrapped = Wrapped}) -> - thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))), + thrift_transport:write(Wrapped, WBuf), %% Wrapped is closed by terminate/2 %% error_logger:info_msg("thrift_buffered_transport ~p: closing", [self()]), {stop, normal, State}; diff --git a/lib/alterl/src/thrift_client.erl b/lib/alterl/src/thrift_client.erl index af7aea9f..779595cb 100644 --- a/lib/alterl/src/thrift_client.erl +++ b/lib/alterl/src/thrift_client.erl @@ -20,7 +20,12 @@ -include("thrift_constants.hrl"). -include("thrift_protocol.hrl"). --record(state, {service, protocol, seqid}). +-record(state, {service, protocol, seqid, + strict_read = true, + strict_write = true, + framed = false, + connect_timeout = infinity + }). %%==================================================================== %% API @@ -30,10 +35,10 @@ %% Description: Starts the server %%-------------------------------------------------------------------- start_link(Host, Port, Service) -> - start_link(Host, Port, Service, _Timeout = infinity). + start_link(Host, Port, Service, []). -start_link(Host, Port, Service, Timeout) when is_integer(Port), is_atom(Service) -> - gen_server:start_link(?MODULE, [Host, Port, Service, Timeout], []). +start_link(Host, Port, Service, Options) when is_integer(Port), is_atom(Service), is_list(Options) -> + gen_server:start_link(?MODULE, [Host, Port, Service, Options], []). call(Client, Function, Args) when is_pid(Client), is_atom(Function), is_list(Args) -> @@ -57,24 +62,41 @@ close(Client) when is_pid(Client) -> %% {stop, Reason} %% Description: Initiates the server %%-------------------------------------------------------------------- -init([Host, Port, Service]) -> - init([Host, Port, Service, infinity]); +init([Host, Port, Service, Options]) -> + State = parse_options(Options, #state{}), -init([Host, Port, Service, Timeout]) -> {ok, Sock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}, {active, false}, {nodelay, true} ], - Timeout), - - {ok, Transport} = thrift_socket_transport:new(Sock), - {ok, BufTransport} = thrift_buffered_transport:new(Transport), - {ok, Protocol} = thrift_binary_protocol:new(BufTransport), - {ok, #state{service = Service, - protocol = Protocol, - seqid = 0}}. + State#state.connect_timeout), + + {ok, Transport} = thrift_socket_transport:new(Sock), + {ok, BufTransport} = + case State#state.framed of + true -> thrift_framed_transport:new(Transport); + false -> thrift_buffered_transport:new(Transport) + end, + {ok, Protocol} = thrift_binary_protocol:new(BufTransport, + [{strict_read, State#state.strict_read}, + {strict_write, State#state.strict_write}]), + + {ok, State#state{service = Service, + protocol = Protocol, + seqid = 0}}. + +parse_options([], State) -> + State; +parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) -> + parse_options(Rest, State#state{strict_read=Bool}); +parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) -> + parse_options(Rest, State#state{strict_write=Bool}); +parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) -> + parse_options(Rest, State#state{framed=Bool}); +parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; is_integer(TO) -> + parse_options(Rest, State#state{connect_timeout=TO}). %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | diff --git a/lib/alterl/src/thrift_framed_transport.erl b/lib/alterl/src/thrift_framed_transport.erl new file mode 100644 index 00000000..814e0d94 --- /dev/null +++ b/lib/alterl/src/thrift_framed_transport.erl @@ -0,0 +1,198 @@ +%%%------------------------------------------------------------------- +%%% File : thrift_framed_transport.erl +%%% Author : +%%% Description : Framed transport for thrift +%%% +%%% Created : 12 Mar 2008 by +%%%------------------------------------------------------------------- +-module(thrift_framed_transport). + +-behaviour(gen_server). +-behaviour(thrift_transport). + +%% API +-export([new/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% thrift_transport callbacks +-export([write/2, read/2, flush/1, close/1]). + +-record(framed_transport, {wrapped, % a thrift_transport + read_buffer, % iolist() + write_buffer % iolist() + }). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +new(WrappedTransport) -> + case gen_server:start_link(?MODULE, [WrappedTransport], []) of + {ok, Pid} -> + thrift_transport:new(?MODULE, Pid); + Else -> + Else + end. + +%%-------------------------------------------------------------------- +%% Function: write(Transport, Data) -> ok +%% +%% Data = iolist() +%% +%% Description: Writes data into the buffer +%%-------------------------------------------------------------------- +write(Transport, Data) -> + gen_server:call(Transport, {write, Data}). + +%%-------------------------------------------------------------------- +%% Function: flush(Transport) -> ok +%% +%% Description: Flushes the buffer through to the wrapped transport +%%-------------------------------------------------------------------- +flush(Transport) -> + gen_server:call(Transport, flush). + +%%-------------------------------------------------------------------- +%% Function: close(Transport) -> ok +%% +%% Description: Closes the transport and the wrapped transport +%%-------------------------------------------------------------------- +close(Transport) -> + gen_server:cast(Transport, close). + +%%-------------------------------------------------------------------- +%% Function: Read(Transport, Len) -> {ok, Data} +%% +%% Data = binary() +%% +%% Description: Reads data through from the wrapped transoprt +%%-------------------------------------------------------------------- +read(Transport, Len) when is_integer(Len) -> + gen_server:call(Transport, {read, Len}). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Wrapped]) -> + %% TODO(cpiro): need to trap exits here so when transport exits + %% normally from under our feet we exit normally + {ok, #framed_transport{wrapped = Wrapped, + read_buffer = [], + write_buffer = []}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({write, Data}, _From, State = #framed_transport{write_buffer = WBuf}) -> + {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}}; + +handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped, + read_buffer = RBuf}) -> + {RBuf1, RBuf1Size} = + %% if the read buffer is empty, read another frame + %% otherwise, just read from what's left in the buffer + case iolist_size(RBuf) of + 0 -> + %% read the frame length + {ok, <>} = + thrift_transport:read(Wrapped, 4), + %% then read the data + {ok, Bin} = + thrift_transport:read(Wrapped, FrameLen), + {Bin, size(Bin)}; + Sz -> + {RBuf, Sz} + end, + + %% pull off Give bytes, return them to the user, leave the rest in the buffer + Give = min(RBuf1Size, Len), + <> = iolist_to_binary(RBuf1), + + Response = {ok, Data}, + State1 = State#framed_transport{read_buffer=RBuf2}, + + {reply, Response, State1}; + +handle_call(flush, _From, State) -> + {Response, State1} = do_flush(State), + {reply, Response, State1}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(close, State) -> + {_, State1} = do_flush(State), + %% Wrapped is closed by terminate/2 + %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]), + {stop, normal, State}; +handle_cast(Msg, State=#framed_transport{}) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) -> + thrift_transport:close(Wrapped), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +do_flush(State = #framed_transport{write_buffer = Buffer, + wrapped = Wrapped}) -> + FrameLen = iolist_size(Buffer), + Data = [<>, Buffer], + + Response = thrift_transport:write(Wrapped, Data), + + thrift_transport:flush(Wrapped), + + State1 = State#framed_transport{write_buffer = []}, + {Response, State1}. + +min(A,B) when A A; +min(_,B) -> B. + diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl index 70367d61..0b463cc9 100644 --- a/lib/alterl/src/thrift_socket_transport.erl +++ b/lib/alterl/src/thrift_socket_transport.erl @@ -23,8 +23,8 @@ new(Socket, Opts) when is_list(Opts) -> end, thrift_transport:new(?MODULE, State). -write(#data{socket = Socket}, Data) - when is_binary(Data) -> +%% Data :: iolist() +write(#data{socket = Socket}, Data) -> gen_tcp:send(Socket, Data). read(#data{socket=Socket, recv_timeout=Timeout}, Len) diff --git a/lib/alterl/src/thrift_transport.erl b/lib/alterl/src/thrift_transport.erl index 919927d8..4bbb0a2c 100644 --- a/lib/alterl/src/thrift_transport.erl +++ b/lib/alterl/src/thrift_transport.erl @@ -22,7 +22,8 @@ new(Module, Data) when is_atom(Module) -> {ok, #transport{module = Module, data = Data}}. -write(Transport, Data) when is_binary(Data) -> +%% Data :: iolist() +write(Transport, Data) -> Module = Transport#transport.module, Module:write(Transport#transport.data, Data). -- 2.17.1