From: Roger Meier Date: Mon, 8 Jul 2013 21:35:25 +0000 (+0200) Subject: THRIFT-1442 TNonblockingServer: Refactor to allow multiple IO Threads X-Git-Tag: 0.9.1~43 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=6f2a5037105ccad05eb84ec0a60da3389c85eb3f;p=common%2Fthrift.git THRIFT-1442 TNonblockingServer: Refactor to allow multiple IO Threads Patch: Pavlin Radoslavov --- diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index 398eadec..dcc90e25 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -1191,13 +1191,12 @@ void TNonblockingServer::stop() { } } -/** - * Main workhorse function, starts up the server listening on a port and - * loops over the libevent handler. - */ -void TNonblockingServer::serve() { +void TNonblockingServer::registerEvents(event_base* user_event_base) { + userEventBase_ = user_event_base; + // init listen socket - createAndListenOnSocket(); + if (serverSocket_ < 0) + createAndListenOnSocket(); // set up the IO threads assert(ioThreads_.empty()); @@ -1248,6 +1247,18 @@ void TNonblockingServer::serve() { } } + // Register the events for the primary (listener) IO thread + ioThreads_[0]->registerEvents(); +} + +/** + * Main workhorse function, starts up the server listening on a port and + * loops over the libevent handler. + */ +void TNonblockingServer::serve() { + + registerEvents(NULL); + // Run the primary (listener) IO thread loop in our main thread; this will // only return when the server is shutting down. ioThreads_[0]->run(); @@ -1267,7 +1278,8 @@ TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server, , number_(number) , listenSocket_(listenSocket) , useHighPriority_(useHighPriority) - , eventBase_(NULL) { + , eventBase_(NULL) + , ownEventBase_(false) { notificationPipeFDs_[0] = -1; notificationPipeFDs_[1] = -1; } @@ -1276,8 +1288,9 @@ TNonblockingIOThread::~TNonblockingIOThread() { // make sure our associated thread is fully finished join(); - if (eventBase_) { + if (eventBase_ && ownEventBase_) { event_base_free(eventBase_); + ownEventBase_ = false; } if (listenSocket_ >= 0) { @@ -1330,6 +1343,22 @@ void TNonblockingIOThread::createNotificationPipe() { * Register the core libevent events onto the proper base. */ void TNonblockingIOThread::registerEvents() { + threadId_ = Thread::get_current(); + + assert(eventBase_ == 0); + eventBase_ = getServer()->getUserEventBase(); + if (eventBase_ == NULL) { + eventBase_ = event_base_new(); + ownEventBase_ = true; + } + + // Print some libevent stats + if (number_ == 0) { + GlobalOutput.printf("TNonblockingServer: using libevent %s method %s", + event_get_version(), + event_base_get_method(eventBase_)); + } + if (listenSocket_ >= 0) { // Register the server event event_set(&serverEvent_, @@ -1478,20 +1507,8 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { } void TNonblockingIOThread::run() { - threadId_ = Thread::get_current(); - - assert(eventBase_ == 0); - eventBase_ = event_base_new(); - - // Print some libevent stats - if (number_ == 0) { - GlobalOutput.printf("TNonblockingServer: using libevent %s method %s", - event_get_version(), - event_base_get_method(eventBase_)); - } - - - registerEvents(); + if (eventBase_ == NULL) + registerEvents(); GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_); diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h index 9e6ba170..585aa79b 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.h +++ b/lib/cpp/src/thrift/server/TNonblockingServer.h @@ -160,6 +160,9 @@ class TNonblockingServer : public TServer { /// Port server runs on int port_; + /// The optional user-provided event-base (for single-thread servers) + event_base* userEventBase_; + /// For processing via thread pool, may be NULL boost::shared_ptr threadManager_; @@ -279,6 +282,7 @@ class TNonblockingServer : public TServer { nextIOThread_ = 0; useHighPriorityIOThreads_ = false; port_ = port; + userEventBase_ = NULL; threadPoolProcessing_ = false; numTConnections_ = 0; numActiveProcessors_ = 0; @@ -756,15 +760,6 @@ class TNonblockingServer : public TServer { */ void stop(); - private: - /** - * Callback function that the threadmanager calls when a task reaches - * its expiration time. It is needed to clean up the expired connection. - * - * @param task the runnable associated with the expired task. - */ - void expireClose(boost::shared_ptr task); - /// Creates a socket to listen on and binds it to the local port. void createAndListenOnSocket(); @@ -775,6 +770,32 @@ class TNonblockingServer : public TServer { * @param fd descriptor of socket to be initialized/ */ void listenSocket(THRIFT_SOCKET fd); + + /** + * Register the optional user-provided event-base (for single-thread servers) + * + * This method should be used when the server is running in a single-thread + * mode, and the event base is provided by the user (i.e., the caller). + * + * @param user_event_base the user-provided event-base. The user is + * responsible for freeing the event base memory. + */ + void registerEvents(event_base* user_event_base); + + /** + * Returns the optional user-provided event-base (for single-thread servers). + */ + event_base* getUserEventBase() const { return userEventBase_; } + + private: + /** + * Callback function that the threadmanager calls when a task reaches + * its expiration time. It is needed to clean up the expired connection. + * + * @param task the runnable associated with the expired task. + */ + void expireClose(boost::shared_ptr task); + /** * Return an initialized connection object. Creates or recovers from * pool a TConnection and initializes it with the provided socket FD @@ -847,6 +868,9 @@ class TNonblockingIOThread : public Runnable { // Ensures that the event-loop thread is fully finished and shut down. void join(); + /// Registers the events for the notification & listen sockets + void registerEvents(); + private: /** * C-callable event handler for signaling task completion. Provides a @@ -873,9 +897,6 @@ class TNonblockingIOThread : public Runnable { /// Exits the loop ASAP in case of shutdown or error. void breakLoop(bool error); - /// Registers the events for the notification & listen sockets - void registerEvents(); - /// Create the pipe used to notify I/O process of task completion. void createNotificationPipe(); @@ -904,6 +925,10 @@ class TNonblockingIOThread : public Runnable { /// pointer to eventbase to be used for looping event_base* eventBase_; + /// Set to true if this class is responsible for freeing the event base + /// memory. + bool ownEventBase_; + /// Used with eventBase_ for connection events (only in listener thread) struct event serverEvent_;