Refactor TNonblockingServer to use event_base construct

Summary: This allows the event loop to be shared across different components of a program of for a separate thread in a TNonblockingServer to safely use its own libevent code without conflicts.

Reviewed By: mcslee

Test Plan: Updated test/ committed here

Other Notes: submitted by Ben Maurer, patched in by mcslee with slight modifications


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665364 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 5470ad4..2cdd897 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -14,7 +14,7 @@
 #include <stack>
 #include <event.h>
 
-namespace facebook { namespace thrift { namespace server { 
+namespace facebook { namespace thrift { namespace server {
 
 using facebook::thrift::transport::TMemoryBuffer;
 using facebook::thrift::protocol::TProtocol;
@@ -55,6 +55,12 @@
   // Is thread pool processing?
   bool threadPoolProcessing_;
 
+  // The event base for libevent
+  event_base* eventBase_;
+
+  // Event struct, for use with eventBase_
+  struct event serverEvent_;
+
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -64,7 +70,7 @@
   std::stack<TConnection*> connectionStack_;
 
   // Pointer to optional function called after opening the listen socket and
-  // before running the event loop, along with its argument data
+  // before running the event loop, along with its argument data.
   void (*preServeCallback_)(void*);
   void* preServeCallbackArg_;
 
@@ -74,22 +80,24 @@
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      int port) :
     TServer(processor),
-    serverSocket_(0),
+    serverSocket_(-1),
     port_(port),
     frameResponses_(true),
     threadPoolProcessing_(false),
+    eventBase_(NULL),
     preServeCallback_(NULL),
     preServeCallbackArg_(NULL) {}
 
-  TNonblockingServer(boost::shared_ptr<TProcessor> processor, 
+  TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
                      int port,
                      boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
-    serverSocket_(0),
+    serverSocket_(-1),
     port_(port),
     frameResponses_(true),
-    threadManager_(threadManager), 
+    threadManager_(threadManager),
+    eventBase_(NULL),
     preServeCallback_(NULL),
     preServeCallbackArg_(NULL) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
@@ -119,7 +127,7 @@
     setOutputProtocolFactory(outputProtocolFactory);
     setThreadManager(threadManager);
   }
-        
+
   ~TNonblockingServer() {}
 
   void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
@@ -127,7 +135,7 @@
     threadPoolProcessing_ = (threadManager != NULL);
   }
 
-  bool isThreadPoolProcessing() {
+  bool isThreadPoolProcessing() const {
     return threadPoolProcessing_;
   }
 
@@ -139,10 +147,14 @@
     frameResponses_ = frameResponses;
   }
 
-  bool getFrameResponses() {
+  bool getFrameResponses() const {
     return frameResponses_;
   }
 
+  event_base* getEventBase() const {
+    return eventBase_;
+  }
+
   TConnection* createConnection(int socket, short flags);
 
   void returnConnection(TConnection* connection);
@@ -151,6 +163,12 @@
     ((TNonblockingServer*)v)->handleEvent(fd, which);
   }
 
+  void listenSocket();
+
+  void listenSocket(int fd);
+
+  void registerEvents(event_base* base);
+
   void serve();
 
   void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) {
@@ -225,7 +243,7 @@
 
   // Write buffer
   uint8_t* writeBuffer_;
-  
+
   // Write buffer size
   uint32_t writeBufferSize_;
 
@@ -256,7 +274,7 @@
 
   // Protocol encoder
   boost::shared_ptr<TProtocol> outputProtocol_;
-  
+
   // Go into read mode
   void setRead() {
     setFlags(EV_READ | EV_PERSIST);
@@ -290,13 +308,13 @@
       throw new facebook::thrift::TException("Out of memory.");
     }
     readBufferSize_ = 1024;
-    
+
     // Allocate input and output tranpsorts
-    // these only need to be allocated once per TConnection (they don't need to be 
+    // these only need to be allocated once per TConnection (they don't need to be
     // reallocated on init() call)
     inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
     outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
-        
+
     init(socket, eventFlags, s);
   }
 
@@ -311,7 +329,7 @@
     assert(fd == ((TConnection*)v)->socket_);
     ((TConnection*)v)->workSocket();
   }
-  
+
   // Handler wrapper for task block
   static void taskHandler(int fd, short which, void* v) {
     assert(fd == ((TConnection*)v)->taskHandle_);