From: David Reiss Date: Wed, 11 Jun 2008 01:12:31 +0000 (+0000) Subject: Add thrift_base64_transport which writes base64 encoded data X-Git-Tag: 0.2.0~696 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=844540669eca6b2c65e2d916d261a02602026d92;p=common%2Fthrift.git Add thrift_base64_transport which writes base64 encoded data Summary: This is to make it easy to run Hadoop mapreduces using Hadoop Streaming on thrift-serialized structs without implementing any special file splitter or anything Test plan: test_disklog:t_base64() git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666466 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/alterl/src/thrift_base64_transport.erl b/lib/alterl/src/thrift_base64_transport.erl new file mode 100644 index 00000000..e521d118 --- /dev/null +++ b/lib/alterl/src/thrift_base64_transport.erl @@ -0,0 +1,45 @@ +-module(thrift_base64_transport). + +-behaviour(thrift_transport). + +%% API +-export([new/1, new_transport_factory/1]). + +%% thrift_transport callbacks +-export([write/2, read/2, flush/1, close/1]). + +%% State +-record(b64_transport, {wrapped}). + +new(Wrapped) -> + State = #b64_transport{wrapped = Wrapped}, + thrift_transport:new(?MODULE, State). + + +write(#b64_transport{wrapped = Wrapped}, Data) -> + thrift_transport:write(Wrapped, base64:encode(iolist_to_binary(Data))). + + +%% base64 doesn't support reading quite yet since it would involve +%% nasty buffering and such +read(#b64_transport{wrapped = Wrapped}, Data) -> + {error, no_reads_allowed}. + + +flush(#b64_transport{wrapped = Wrapped}) -> + thrift_transport:write(Wrapped, <<"\n">>), + thrift_transport:flush(Wrapped). + + +close(Me = #b64_transport{wrapped = Wrapped}) -> + flush(Me), + thrift_transport:close(Wrapped). + + +%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +new_transport_factory(WrapFactory) -> + F = fun() -> + {ok, Wrapped} = WrapFactory(), + new(Wrapped) + end, + {ok, F}. diff --git a/lib/alterl/src/thrift_buffered_transport.erl b/lib/alterl/src/thrift_buffered_transport.erl index d71291d3..a298cb16 100644 --- a/lib/alterl/src/thrift_buffered_transport.erl +++ b/lib/alterl/src/thrift_buffered_transport.erl @@ -11,7 +11,7 @@ -behaviour(thrift_transport). %% API --export([new/1]). +-export([new/1, new_transport_factory/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -39,6 +39,8 @@ new(WrappedTransport) -> Else end. + + %%-------------------------------------------------------------------- %% Function: write(Transport, Data) -> ok %% @@ -157,3 +159,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- +%%%% FACTORY GENERATION %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +new_transport_factory(WrapFactory) -> + F = fun() -> + {ok, Wrapped} = WrapFactory(), + new(Wrapped) + end, + {ok, F}. diff --git a/test/erl/src/test_disklog.erl b/test/erl/src/test_disklog.erl index 78b792c4..81b7b50c 100644 --- a/test/erl/src/test_disklog.erl +++ b/test/erl/src/test_disklog.erl @@ -28,3 +28,35 @@ t() -> ok. + + +t_base64() -> + {ok, TransportFactory} = + thrift_disk_log_transport:new_transport_factory( + test_disklog, + [{file, "/tmp/test_b64_log"}, + {size, {1024*1024, 10}}]), + {ok, B64Factory} = + thrift_base64_transport:new_transport_factory(TransportFactory), + {ok, BufFactory} = + thrift_buffered_transport:new_transport_factory(B64Factory), + {ok, ProtocolFactory} = thrift_binary_protocol:new_protocol_factory( + BufFactory, []), + {ok, Client} = thrift_client:start_link(ProtocolFactory, thriftTest_thrift), + + io:format("Client started~n"), + + % We have to make async calls into this client only since otherwise it will try + % to read from the disklog and go boom. + {ok, ok} = thrift_client:call(Client, testAsync, [16#deadbeef]), + io:format("Call written~n"), + + % Use the send_call method to write a non-async call into the log + ok = thrift_client:send_call(Client, testString, [<<"hello world">>]), + io:format("Non-async call sent~n"), + + ok = thrift_client:close(Client), + io:format("Client closed~n"), + + ok. +