From b7c8802d36209e0ffb8af0cdb0a2c5422d8e8ad2 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 11 Jun 2008 01:00:20 +0000 Subject: [PATCH] allow configurable recv_timeouts git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666435 13f79535-47bb-0310-9956-ffa450edef68 --- lib/alterl/src/thrift_socket_server.erl | 21 +++++++++++------ lib/alterl/src/thrift_socket_transport.erl | 27 +++++++++++++++++----- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/lib/alterl/src/thrift_socket_server.erl b/lib/alterl/src/thrift_socket_server.erl index 299cc91e..bf767781 100644 --- a/lib/alterl/src/thrift_socket_server.erl +++ b/lib/alterl/src/thrift_socket_server.erl @@ -27,7 +27,9 @@ max=2048, ip=any, listen=null, - acceptor=null}). + acceptor=null, + socket_opts=[{recv_timeout, 500}] + }). start(State=#thrift_socket_server{}) -> io:format("~p~n", [State]), @@ -78,6 +80,8 @@ parse_options([{ip, Ip} | Rest], State) -> IpTuple end, parse_options(Rest, State#thrift_socket_server{ip=ParsedIp}); +parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 -> + parse_options(Rest, State#thrift_socket_server{socket_opts=L}); parse_options([{handler, Handler} | Rest], State) -> parse_options(Rest, State#thrift_socket_server{handler=Handler}); parse_options([{service, Service} | Rest], State) -> @@ -152,19 +156,22 @@ gen_tcp_listen(Port, Opts, State) -> new_acceptor(State=#thrift_socket_server{max=0}) -> error_logger:error_msg("Not accepting new connections"), State#thrift_socket_server{acceptor=null}; -new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,service=Service, handler=Handler}) -> +new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen, + service=Service, handler=Handler, + socket_opts=Opts + }) -> Pid = proc_lib:spawn_link(?MODULE, acceptor_loop, - [{self(), Listen, Service, Handler}]), + [{self(), Listen, Service, Handler, Opts}]), %% error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]), State#thrift_socket_server{acceptor=Pid}. -acceptor_loop({Server, Listen, Service, Handler}) - when is_pid(Server) -> - case catch gen_tcp:accept(Listen) of +acceptor_loop({Server, Listen, Service, Handler, SocketOpts}) + when is_pid(Server), is_list(SocketOpts) -> + case catch gen_tcp:accept(Listen) of % infiinite timeout {ok, Socket} -> gen_server:cast(Server, {accepted, self()}), ProtoGen = fun() -> - {ok, SocketTransport} = thrift_socket_transport:new(Socket), + {ok, SocketTransport} = thrift_socket_transport:new(Socket, SocketOpts), {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), {ok, IProt=Protocol, OProt=Protocol} diff --git a/lib/alterl/src/thrift_socket_transport.erl b/lib/alterl/src/thrift_socket_transport.erl index d1e56135..9a27de04 100644 --- a/lib/alterl/src/thrift_socket_transport.erl +++ b/lib/alterl/src/thrift_socket_transport.erl @@ -3,18 +3,33 @@ -behaviour(thrift_transport). -export([new/1, + new/2, write/2, read/2, flush/1, close/1]). --record(data, {socket}). +-record(data, {socket, + recv_timeout=infinity}). new(Socket) -> - thrift_transport:new(?MODULE, #data{socket = Socket}). - -write(#data{socket = Socket}, Data) when is_binary(Data) -> + new(Socket, []). + +new(Socket, Opts) when is_list(Opts) -> + State = + case lists:keysearch(recv_timeout, 1, Opts) of + {value, {recv_timeout, Timeout}} + when is_integer(Timeout), Timeout > 0 -> + #data{socket=Socket, recv_timeout=Timeout}; + _ -> + #data{socket=Socket} + end, + thrift_transport:new(?MODULE, State). + +write(#data{socket = Socket}, Data) + when is_binary(Data) -> gen_tcp:send(Socket, Data). -read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 -> - gen_tcp:recv(Socket, Len). +read(D = #data{socket=Socket, recv_timeout=Timeout}, Len) + when is_integer(Len), Len >= 0 -> + gen_tcp:recv(Socket, Len, Timeout). %% We can't really flush - everything is flushed when we write flush(_) -> -- 2.17.1