Refactor TNonblockingServer to use event_base construct
Summary: This allows the event loop to be shared across different components of a program of for a separate thread in a TNonblockingServer to safely use its own libevent code without conflicts.
Reviewed By: mcslee
Test Plan: Updated test/ committed here
Other Notes: submitted by Ben Maurer, patched in by mcslee with slight modifications
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665364 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 5470ad4..2cdd897 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -14,7 +14,7 @@
#include <stack>
#include <event.h>
-namespace facebook { namespace thrift { namespace server {
+namespace facebook { namespace thrift { namespace server {
using facebook::thrift::transport::TMemoryBuffer;
using facebook::thrift::protocol::TProtocol;
@@ -55,6 +55,12 @@
// Is thread pool processing?
bool threadPoolProcessing_;
+ // The event base for libevent
+ event_base* eventBase_;
+
+ // Event struct, for use with eventBase_
+ struct event serverEvent_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -64,7 +70,7 @@
std::stack<TConnection*> connectionStack_;
// Pointer to optional function called after opening the listen socket and
- // before running the event loop, along with its argument data
+ // before running the event loop, along with its argument data.
void (*preServeCallback_)(void*);
void* preServeCallbackArg_;
@@ -74,22 +80,24 @@
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
int port) :
TServer(processor),
- serverSocket_(0),
+ serverSocket_(-1),
port_(port),
frameResponses_(true),
threadPoolProcessing_(false),
+ eventBase_(NULL),
preServeCallback_(NULL),
preServeCallbackArg_(NULL) {}
- TNonblockingServer(boost::shared_ptr<TProcessor> processor,
+ TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
- serverSocket_(0),
+ serverSocket_(-1),
port_(port),
frameResponses_(true),
- threadManager_(threadManager),
+ threadManager_(threadManager),
+ eventBase_(NULL),
preServeCallback_(NULL),
preServeCallbackArg_(NULL) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
@@ -119,7 +127,7 @@
setOutputProtocolFactory(outputProtocolFactory);
setThreadManager(threadManager);
}
-
+
~TNonblockingServer() {}
void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
@@ -127,7 +135,7 @@
threadPoolProcessing_ = (threadManager != NULL);
}
- bool isThreadPoolProcessing() {
+ bool isThreadPoolProcessing() const {
return threadPoolProcessing_;
}
@@ -139,10 +147,14 @@
frameResponses_ = frameResponses;
}
- bool getFrameResponses() {
+ bool getFrameResponses() const {
return frameResponses_;
}
+ event_base* getEventBase() const {
+ return eventBase_;
+ }
+
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
@@ -151,6 +163,12 @@
((TNonblockingServer*)v)->handleEvent(fd, which);
}
+ void listenSocket();
+
+ void listenSocket(int fd);
+
+ void registerEvents(event_base* base);
+
void serve();
void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) {
@@ -225,7 +243,7 @@
// Write buffer
uint8_t* writeBuffer_;
-
+
// Write buffer size
uint32_t writeBufferSize_;
@@ -256,7 +274,7 @@
// Protocol encoder
boost::shared_ptr<TProtocol> outputProtocol_;
-
+
// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
@@ -290,13 +308,13 @@
throw new facebook::thrift::TException("Out of memory.");
}
readBufferSize_ = 1024;
-
+
// Allocate input and output tranpsorts
- // these only need to be allocated once per TConnection (they don't need to be
+ // these only need to be allocated once per TConnection (they don't need to be
// reallocated on init() call)
inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
-
+
init(socket, eventFlags, s);
}
@@ -311,7 +329,7 @@
assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
-
+
// Handler wrapper for task block
static void taskHandler(int fd, short which, void* v) {
assert(fd == ((TConnection*)v)->taskHandle_);