From b4d3e7b052c960a5ab96310f7cb16ba46e6d32b4 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Wed, 28 Nov 2007 01:51:43 +0000 Subject: [PATCH] Create a TServerEventHandler interface in TServer Summary: Such that users can supply an event handler to a server that will be used to signal various events that take place inside the server core. Reviewed By: dreiss Test Plan: Rebuilt all servers, work by default Other Notes: Partially submitted and also reviewed by Dave Simpson at Powerset git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665371 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TNonblockingServer.cpp | 6 +-- lib/cpp/src/server/TNonblockingServer.h | 22 ++------ lib/cpp/src/server/TServer.h | 62 ++++++++++++++++++++--- lib/cpp/src/server/TSimpleServer.cpp | 15 +++++- lib/cpp/src/server/TSimpleServer.h | 6 +-- lib/cpp/src/server/TThreadPoolServer.cpp | 38 ++++++++++---- lib/cpp/src/server/TThreadPoolServer.h | 10 ++-- lib/cpp/src/server/TThreadedServer.cpp | 45 ++++++++++------ lib/cpp/src/server/TThreadedServer.h | 4 +- 9 files changed, 141 insertions(+), 67 deletions(-) diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 7735ec2d..de32db54 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -675,9 +675,9 @@ void TNonblockingServer::serve() { // Initialize libevent core registerEvents(static_cast(event_init())); - // Run pre-serve callback function if we have one - if (preServeCallback_) { - preServeCallback_(preServeCallbackArg_); + // Run the preServe event + if (eventHandler_ != NULL) { + eventHandler_->preServe(); } // Run libevent engine, never returns, invokes calls to eventHandler diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 2cdd897d..84833067 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -69,11 +69,6 @@ class TNonblockingServer : public TServer { */ std::stack connectionStack_; - // Pointer to optional function called after opening the listen socket and - // before running the event loop, along with its argument data. - void (*preServeCallback_)(void*); - void* preServeCallbackArg_; - void handleEvent(int fd, short which); public: @@ -84,9 +79,7 @@ class TNonblockingServer : public TServer { port_(port), frameResponses_(true), threadPoolProcessing_(false), - eventBase_(NULL), - preServeCallback_(NULL), - preServeCallbackArg_(NULL) {} + eventBase_(NULL) {} TNonblockingServer(boost::shared_ptr processor, boost::shared_ptr protocolFactory, @@ -97,9 +90,7 @@ class TNonblockingServer : public TServer { port_(port), frameResponses_(true), threadManager_(threadManager), - eventBase_(NULL), - preServeCallback_(NULL), - preServeCallbackArg_(NULL) { + eventBase_(NULL) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); setOutputTransportFactory(boost::shared_ptr(new TTransportFactory())); setInputProtocolFactory(protocolFactory); @@ -118,9 +109,7 @@ class TNonblockingServer : public TServer { serverSocket_(0), port_(port), frameResponses_(true), - threadManager_(threadManager), - preServeCallback_(NULL), - preServeCallbackArg_(NULL) { + threadManager_(threadManager) { setInputTransportFactory(inputTransportFactory); setOutputTransportFactory(outputTransportFactory); setInputProtocolFactory(inputProtocolFactory); @@ -171,11 +160,6 @@ class TNonblockingServer : public TServer { void serve(); - void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) { - preServeCallback_ = fn_ptr; - preServeCallbackArg_ = arg; - } - }; /** diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 5f132553..1b6bd88e 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -14,22 +14,62 @@ #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using facebook::thrift::TProcessor; using facebook::thrift::protocol::TBinaryProtocolFactory; +using facebook::thrift::protocol::TProtocol; using facebook::thrift::protocol::TProtocolFactory; using facebook::thrift::transport::TServerTransport; using facebook::thrift::transport::TTransport; using facebook::thrift::transport::TTransportFactory; +/** + * Virtual interface class that can handle events from the server core. To + * use this you should subclass it and implement the methods that you care + * about. Your subclass can also store local data that you may care about, + * such as additional "arguments" to these methods (stored in the object + * instance's state). + */ +class TServerEventHandler { + public: + + virtual ~TServerEventHandler() {} + + /** + * Called before the server begins. + */ + virtual void preServe() {} + + /** + * Called when a new client has connected and is about to being processing. + */ + virtual void clientBegin(boost::shared_ptr input, + boost::shared_ptr output) {} + + /** + * Called when a client has finished making requests. + */ + virtual void clientEnd(boost::shared_ptr input, + boost::shared_ptr output) {} + + protected: + + /** + * Prevent direct instantiation. + */ + TServerEventHandler() {} + +}; + /** * Thrift server. * * @author Mark Slee */ class TServer : public concurrency::Runnable { -public: + public: + virtual ~TServer() {} virtual void serve() = 0; @@ -40,7 +80,7 @@ public: virtual void run() { serve(); } - + boost::shared_ptr getProcessor() { return processor_; } @@ -56,7 +96,7 @@ public: boost::shared_ptr getOutputTransportFactory() { return outputTransportFactory_; } - + boost::shared_ptr getInputProtocolFactory() { return inputProtocolFactory_; } @@ -65,6 +105,10 @@ public: return outputProtocolFactory_; } + boost::shared_ptr getEventHandler() { + return eventHandler_; + } + protected: TServer(boost::shared_ptr processor): processor_(processor) { @@ -108,7 +152,7 @@ protected: inputProtocolFactory_(inputProtocolFactory), outputProtocolFactory_(outputProtocolFactory) {} - + // Class variables boost::shared_ptr processor_; boost::shared_ptr serverTransport_; @@ -119,6 +163,8 @@ protected: boost::shared_ptr inputProtocolFactory_; boost::shared_ptr outputProtocolFactory_; + boost::shared_ptr eventHandler_; + void setInputTransportFactory(boost::shared_ptr inputTransportFactory) { inputTransportFactory_ = inputTransportFactory; } @@ -135,8 +181,12 @@ protected: outputProtocolFactory_ = outputProtocolFactory; } + void setServerEventHandler(boost::shared_ptr eventHandler) { + eventHandler_ = eventHandler; + } + }; - + }}} // facebook::thrift::server #endif // #ifndef _THRIFT_SERVER_TSERVER_H_ diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp index d5d3797b..f3011a20 100644 --- a/lib/cpp/src/server/TSimpleServer.cpp +++ b/lib/cpp/src/server/TSimpleServer.cpp @@ -9,7 +9,7 @@ #include #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using namespace std; using namespace facebook::thrift; @@ -38,6 +38,11 @@ void TSimpleServer::serve() { return; } + // Run the preServe event + if (eventHandler_ != NULL) { + eventHandler_->preServe(); + } + // Fetch client from server while (!stop_) { try { @@ -46,6 +51,9 @@ void TSimpleServer::serve() { outputTransport = outputTransportFactory_->getTransport(client); inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + if (eventHandler_ != NULL) { + eventHandler_->clientBegin(inputProtocol, outputProtocol); + } try { while (processor_->process(inputProtocol, outputProtocol)) { // Peek ahead, is the remote side closed? @@ -58,9 +66,12 @@ void TSimpleServer::serve() { } catch (TException& tx) { cerr << "TSimpleServer exception: " << tx.what() << endl; } + if (eventHandler_ != NULL) { + eventHandler_->clientEnd(inputProtocol, outputProtocol); + } inputTransport->close(); outputTransport->close(); - client->close(); + client->close(); } catch (TTransportException& ttx) { if (inputTransport != NULL) { inputTransport->close(); } if (outputTransport != NULL) { outputTransport->close(); } diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h index b02f1060..1ab6f073 100644 --- a/lib/cpp/src/server/TSimpleServer.h +++ b/lib/cpp/src/server/TSimpleServer.h @@ -10,7 +10,7 @@ #include "server/TServer.h" #include "transport/TServerTransport.h" -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { /** * This is the most basic simple server. It is single-threaded and runs a @@ -35,11 +35,11 @@ class TSimpleServer : public TServer { boost::shared_ptr outputTransportFactory, boost::shared_ptr inputProtocolFactory, boost::shared_ptr outputProtocolFactory): - TServer(processor, serverTransport, + TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory), stop_(false) {} - + ~TSimpleServer() {} void serve(); diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index 26eb373d..355bf8e4 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -11,7 +11,7 @@ #include #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using boost::shared_ptr; using namespace std; @@ -20,21 +20,28 @@ using namespace facebook::thrift::concurrency; using namespace facebook::thrift::protocol;; using namespace facebook::thrift::transport; -class TThreadPoolServer::Task: public Runnable { - +class TThreadPoolServer::Task : public Runnable { + public: - - Task(shared_ptr processor, + + Task(TThreadPoolServer &server, + shared_ptr processor, shared_ptr input, shared_ptr output) : + server_(server), processor_(processor), input_(input), output_(output) { } ~Task() {} - + void run() { + boost::shared_ptr eventHandler = + server_.getEventHandler(); + if (eventHandler != NULL) { + eventHandler->clientBegin(input_, output_); + } try { while (processor_->process(input_, output_)) { if (!input_->getTransport()->peek()) { @@ -50,23 +57,27 @@ public: } catch (...) { cerr << "TThreadPoolServer uncaught exception." << endl; } + if (eventHandler != NULL) { + eventHandler->clientEnd(input_, output_); + } input_->getTransport()->close(); output_->getTransport()->close(); } private: + TServer& server_; shared_ptr processor_; shared_ptr input_; shared_ptr output_; }; - + TThreadPoolServer::TThreadPoolServer(shared_ptr processor, shared_ptr serverTransport, shared_ptr transportFactory, shared_ptr protocolFactory, shared_ptr threadManager) : - TServer(processor, serverTransport, transportFactory, protocolFactory), + TServer(processor, serverTransport, transportFactory, protocolFactory), threadManager_(threadManager), stop_(false), timeout_(0) {} @@ -75,7 +86,7 @@ TThreadPoolServer::TThreadPoolServer(shared_ptr processor, shared_ptr inputTransportFactory, shared_ptr outputTransportFactory, shared_ptr inputProtocolFactory, - shared_ptr outputProtocolFactory, + shared_ptr outputProtocolFactory, shared_ptr threadManager) : TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory), @@ -99,7 +110,12 @@ void TThreadPoolServer::serve() { cerr << "TThreadPoolServer::run() listen(): " << ttx.what() << endl; return; } - + + // Run the preServe event + if (eventHandler_ != NULL) { + eventHandler_->preServe(); + } + while (!stop_) { try { client.reset(); @@ -118,7 +134,7 @@ void TThreadPoolServer::serve() { outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); // Add to threadmanager pool - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)), timeout_); + threadManager_->add(shared_ptr(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol)), timeout_); } catch (TTransportException& ttx) { if (inputTransport != NULL) { inputTransport->close(); } diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h index 769db47b..f27e8f70 100644 --- a/lib/cpp/src/server/TThreadPoolServer.h +++ b/lib/cpp/src/server/TThreadPoolServer.h @@ -13,7 +13,7 @@ #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using facebook::thrift::concurrency::ThreadManager; using facebook::thrift::protocol::TProtocolFactory; @@ -23,7 +23,7 @@ using facebook::thrift::transport::TTransportFactory; class TThreadPoolServer : public TServer { public: class Task; - + TThreadPoolServer(boost::shared_ptr processor, boost::shared_ptr serverTransport, boost::shared_ptr transportFactory, @@ -35,7 +35,7 @@ class TThreadPoolServer : public TServer { boost::shared_ptr inputTransportFactory, boost::shared_ptr outputTransportFactory, boost::shared_ptr inputProtocolFactory, - boost::shared_ptr outputProtocolFactory, + boost::shared_ptr outputProtocolFactory, boost::shared_ptr threadManager); virtual ~TThreadPoolServer(); @@ -45,7 +45,7 @@ class TThreadPoolServer : public TServer { virtual int64_t getTimeout() const; virtual void setTimeout(int64_t value); - + virtual void stop() { stop_ = true; serverTransport_->interrupt(); @@ -58,7 +58,7 @@ class TThreadPoolServer : public TServer { volatile bool stop_; volatile int64_t timeout_; - + }; }}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp index 34584bd4..d07e9dab 100644 --- a/lib/cpp/src/server/TThreadedServer.cpp +++ b/lib/cpp/src/server/TThreadedServer.cpp @@ -13,7 +13,7 @@ #include #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using boost::shared_ptr; using namespace std; @@ -23,10 +23,10 @@ using namespace facebook::thrift::transport; using namespace facebook::thrift::concurrency; class TThreadedServer::Task: public Runnable { - + public: - - Task(TThreadedServer* server, + + Task(TThreadedServer& server, shared_ptr processor, shared_ptr input, shared_ptr output) : @@ -37,8 +37,13 @@ public: } ~Task() {} - + void run() { + boost::shared_ptr eventHandler = + server_.getEventHandler(); + if (eventHandler != NULL) { + eventHandler->clientBegin(input_, output_); + } try { while (processor_->process(input_, output_)) { if (!input_->getTransport()->peek()) { @@ -52,22 +57,25 @@ public: } catch (...) { cerr << "TThreadedServer uncaught exception." << endl; } + if (eventHandler != NULL) { + eventHandler->clientEnd(input_, output_); + } input_->getTransport()->close(); output_->getTransport()->close(); - + // Remove this task from parent bookkeeping { - Synchronized s(server_->tasksMonitor_); - server_->tasks_.erase(this); - if (server_->tasks_.empty()) { - server_->tasksMonitor_.notify(); + Synchronized s(server_.tasksMonitor_); + server_.tasks_.erase(this); + if (server_.tasks_.empty()) { + server_.tasksMonitor_.notify(); } } } private: - TThreadedServer* server_; + TThreadedServer& server_; friend class TThreadedServer; shared_ptr processor_; @@ -103,7 +111,12 @@ void TThreadedServer::serve() { return; } - while (!stop_) { + // Run the preServe event + if (eventHandler_ != NULL) { + eventHandler_->preServe(); + } + + while (!stop_) { try { client.reset(); inputTransport.reset(); @@ -120,11 +133,11 @@ void TThreadedServer::serve() { inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - TThreadedServer::Task* task = new TThreadedServer::Task(this, - processor_, + TThreadedServer::Task* task = new TThreadedServer::Task(*this, + processor_, inputProtocol, outputProtocol); - + // Create a task shared_ptr runnable = shared_ptr(task); @@ -132,7 +145,7 @@ void TThreadedServer::serve() { // Create a thread for this task shared_ptr thread = shared_ptr(threadFactory_->newThread(runnable)); - + // Insert thread into the set of threads { Synchronized s(tasksMonitor_); diff --git a/lib/cpp/src/server/TThreadedServer.h b/lib/cpp/src/server/TThreadedServer.h index 4fe775a8..28d35491 100644 --- a/lib/cpp/src/server/TThreadedServer.h +++ b/lib/cpp/src/server/TThreadedServer.h @@ -14,7 +14,7 @@ #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using facebook::thrift::TProcessor; using facebook::thrift::transport::TServerTransport; @@ -26,7 +26,7 @@ class TThreadedServer : public TServer { public: class Task; - + TThreadedServer(boost::shared_ptr processor, boost::shared_ptr serverTransport, boost::shared_ptr transportFactory, -- 2.17.1