| %% | 
 | %% Licensed to the Apache Software Foundation (ASF) under one | 
 | %% or more contributor license agreements. See the NOTICE file | 
 | %% distributed with this work for additional information | 
 | %% regarding copyright ownership. The ASF licenses this file | 
 | %% to you under the Apache License, Version 2.0 (the | 
 | %% "License"); you may not use this file except in compliance | 
 | %% with the License. You may obtain a copy of the License at | 
 | %% | 
 | %%   http://www.apache.org/licenses/LICENSE-2.0 | 
 | %% | 
 | %% Unless required by applicable law or agreed to in writing, | 
 | %% software distributed under the License is distributed on an | 
 | %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | %% KIND, either express or implied. See the License for the | 
 | %% specific language governing permissions and limitations | 
 | %% under the License. | 
 | %% | 
 |  | 
 | -module(thrift_server). | 
 |  | 
 | -behaviour(gen_server). | 
 |  | 
 | %% API | 
 | -export([start_link/3, stop/1, take_socket/2]). | 
 |  | 
 | %% gen_server callbacks | 
 | -export([init/1, handle_call/3, handle_cast/2, handle_info/2, | 
 |          terminate/2, code_change/3]). | 
 |  | 
 | -define(SERVER, ?MODULE). | 
 |  | 
 | -record(state, {listen_socket, acceptor_ref, service, handler}). | 
 |  | 
 | %%==================================================================== | 
 | %% API | 
 | %%==================================================================== | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} | 
 | %% Description: Starts the server | 
 | %%-------------------------------------------------------------------- | 
 | start_link(Port, Service, HandlerModule) when is_integer(Port), is_atom(HandlerModule) -> | 
 |     gen_server:start_link({local, ?SERVER}, ?MODULE, {Port, Service, HandlerModule}, []). | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: stop(Pid) -> ok, {error, Reason} | 
 | %% Description: Stops the server. | 
 | %%-------------------------------------------------------------------- | 
 | stop(Pid) when is_pid(Pid) -> | 
 |     gen_server:call(Pid, stop). | 
 |  | 
 |  | 
 | take_socket(Server, Socket) -> | 
 |     gen_server:call(Server, {take_socket, Socket}). | 
 |  | 
 |  | 
 | %%==================================================================== | 
 | %% gen_server callbacks | 
 | %%==================================================================== | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: init(Args) -> {ok, State} | | 
 | %%                         {ok, State, Timeout} | | 
 | %%                         ignore               | | 
 | %%                         {stop, Reason} | 
 | %% Description: Initiates the server | 
 | %%-------------------------------------------------------------------- | 
 | init({Port, Service, Handler}) -> | 
 |     {ok, Socket} = gen_tcp:listen(Port, | 
 |                                   [binary, | 
 |                                    {packet, 0}, | 
 |                                    {active, false}, | 
 |                                    {nodelay, true}, | 
 |                                    {reuseaddr, true}]), | 
 |     {ok, Ref} = prim_inet:async_accept(Socket, -1), | 
 |     {ok, #state{listen_socket = Socket, | 
 |                 acceptor_ref = Ref, | 
 |                 service = Service, | 
 |                 handler = Handler}}. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | | 
 | %%                                      {reply, Reply, State, Timeout} | | 
 | %%                                      {noreply, State} | | 
 | %%                                      {noreply, State, Timeout} | | 
 | %%                                      {stop, Reason, Reply, State} | | 
 | %%                                      {stop, Reason, State} | 
 | %% Description: Handling call messages | 
 | %%-------------------------------------------------------------------- | 
 | handle_call(stop, _From, State) -> | 
 |     {stop, stopped, ok, State}; | 
 |  | 
 | handle_call({take_socket, Socket}, {FromPid, _Tag}, State) -> | 
 |     Result = gen_tcp:controlling_process(Socket, FromPid), | 
 |     {reply, Result, State}. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: handle_cast(Msg, State) -> {noreply, State} | | 
 | %%                                      {noreply, State, Timeout} | | 
 | %%                                      {stop, Reason, State} | 
 | %% Description: Handling cast messages | 
 | %%-------------------------------------------------------------------- | 
 | handle_cast(_Msg, State) -> | 
 |     {noreply, State}. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: handle_info(Info, State) -> {noreply, State} | | 
 | %%                                       {noreply, State, Timeout} | | 
 | %%                                       {stop, Reason, State} | 
 | %% Description: Handling all non call/cast messages | 
 | %%-------------------------------------------------------------------- | 
 | handle_info({inet_async, ListenSocket, Ref, {ok, ClientSocket}}, | 
 |             State = #state{listen_socket = ListenSocket, | 
 |                            acceptor_ref = Ref, | 
 |                            service = Service, | 
 |                            handler = Handler}) -> | 
 |     case set_sockopt(ListenSocket, ClientSocket) of | 
 |         ok -> | 
 |             %% New client connected - start processor | 
 |             start_processor(ClientSocket, Service, Handler), | 
 |             {ok, NewRef} = prim_inet:async_accept(ListenSocket, -1), | 
 |             {noreply, State#state{acceptor_ref = NewRef}}; | 
 |         {error, Reason} -> | 
 |             error_logger:error_msg("Couldn't set socket opts: ~p~n", | 
 |                                    [Reason]), | 
 |             {stop, Reason, State} | 
 |     end; | 
 |  | 
 | handle_info({inet_async, ListenSocket, Ref, Error}, State) -> | 
 |     error_logger:error_msg("Error in acceptor: ~p~n", [Error]), | 
 |     {stop, Error, State}; | 
 |  | 
 | handle_info(_Info, State) -> | 
 |     {noreply, State}. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Function: terminate(Reason, State) -> void() | 
 | %% Description: This function is called by a gen_server when it is about to | 
 | %% terminate. It should be the opposite of Module:init/1 and do any necessary | 
 | %% cleaning up. When it returns, the gen_server terminates with Reason. | 
 | %% The return value is ignored. | 
 | %%-------------------------------------------------------------------- | 
 | terminate(_Reason, _State) -> | 
 |     ok. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} | 
 | %% Description: Convert process state when code is changed | 
 | %%-------------------------------------------------------------------- | 
 | code_change(_OldVsn, State, _Extra) -> | 
 |     {ok, State}. | 
 |  | 
 | %%-------------------------------------------------------------------- | 
 | %%% Internal functions | 
 | %%-------------------------------------------------------------------- | 
 | set_sockopt(ListenSocket, ClientSocket) -> | 
 |     true = inet_db:register_socket(ClientSocket, inet_tcp), | 
 |     case prim_inet:getopts(ListenSocket, | 
 |                            [active, nodelay, keepalive, delay_send, priority, tos]) of | 
 |         {ok, Opts} -> | 
 |             case prim_inet:setopts(ClientSocket, Opts) of | 
 |                 ok    -> ok; | 
 |                 Error -> gen_tcp:close(ClientSocket), | 
 |                          Error | 
 |             end; | 
 |         Error -> | 
 |             gen_tcp:close(ClientSocket), | 
 |             Error | 
 |     end. | 
 |  | 
 | start_processor(Socket, Service, Handler) -> | 
 |     Server = self(), | 
 |  | 
 |     ProtoGen = fun() -> | 
 |                        % Become the controlling process | 
 |                        ok = take_socket(Server, Socket), | 
 |                        {ok, SocketTransport} = thrift_socket_transport:new(Socket), | 
 |                        {ok, BufferedTransport} = thrift_buffered_transport:new(SocketTransport), | 
 |                        {ok, Protocol} = thrift_binary_protocol:new(BufferedTransport), | 
 |                        {ok, Protocol, Protocol} | 
 |                end, | 
 |  | 
 |     spawn(thrift_processor, init, [{Server, ProtoGen, Service, Handler}]). |