From: Mark Slee Date: Mon, 26 Nov 2007 19:05:29 +0000 (+0000) Subject: Refactor TNonblockingServer to use event_base construct X-Git-Tag: 0.2.0~1108 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=79b1694043e07a673c343c42ad75d72f3b560539;p=common%2Fthrift.git Refactor TNonblockingServer to use event_base construct Summary: This allows the event loop to be shared across different components of a program of for a separate thread in a TNonblockingServer to safely use its own libevent code without conflicts. Reviewed By: mcslee Test Plan: Updated test/ committed here Other Notes: submitted by Ben Maurer, patched in by mcslee with slight modifications git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665364 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/configure.ac b/lib/cpp/configure.ac index d9a66daf..25b6152c 100644 --- a/lib/cpp/configure.ac +++ b/lib/cpp/configure.ac @@ -14,6 +14,8 @@ AM_INIT_AUTOMAKE AC_FUNC_MALLOC +AC_FUNC_MEMCMP + AC_FUNC_REALLOC AC_FUNC_SELECT_ARGTYPES diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index b7ed7618..ad1fb654 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -15,7 +15,7 @@ #include #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using namespace facebook::thrift::protocol; using namespace facebook::thrift::transport; @@ -46,7 +46,7 @@ class TConnection::Task: public Runnable { } catch (...) { cerr << "TThreadedServer uncaught exception." << endl; } - + // Signal completion back to the libevent thread via a socketpair int8_t b = 0; if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) { @@ -79,7 +79,7 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { socketState_ = SOCKET_RECV; appState_ = APP_INIT; - + taskHandle_ = -1; // Set flags, which also registers the event @@ -119,14 +119,14 @@ void TConnection::workSocket() { // Read from the socket fetch = readWant_ - readBufferPos_; got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0); - + if (got > 0) { // Move along in the buffer readBufferPos_ += got; // Check that we did not overdo it assert(readBufferPos_ <= readWant_); - + // We are done reading, move onto the next state if (readBufferPos_ == readWant_) { transition(); @@ -145,7 +145,7 @@ void TConnection::workSocket() { // Whenever we get down here it means a remote disconnect close(); - + return; case SOCKET_SEND: @@ -154,7 +154,7 @@ void TConnection::workSocket() { // If there is no data to send, then let us move on if (writeBufferPos_ == writeBufferSize_) { - fprintf(stderr, "WARNING: Send state with no data to send\n"); + GlobalOutput("WARNING: Send state with no data to send\n"); transition(); return; } @@ -186,7 +186,7 @@ void TConnection::workSocket() { // Did we overdo it? assert(writeBufferPos_ <= writeBufferSize_); - // We are done! + // We are done! if (writeBufferPos_ == writeBufferSize_) { transition(); } @@ -216,7 +216,7 @@ void TConnection::transition() { // and get back some data from the dispatch function inputTransport_->resetBuffer(readBuffer_, readBufferPos_); outputTransport_->resetBuffer(); - + if (server_->isThreadPoolProcessing()) { // We are setting up a Task to do this work and we will wait on it int sv[2]; @@ -230,13 +230,19 @@ void TConnection::transition() { inputProtocol_, outputProtocol_, sv[1])); + // The application is now waiting on the task to finish appState_ = APP_WAIT_TASK; + + // Create an event to be notified when the task finishes event_set(&taskEvent_, taskHandle_ = sv[0], EV_READ, TConnection::taskHandler, this); + // Attach to the base + event_base_set(server_->getEventBase(), &taskEvent_); + // Add the event and start up the server if (-1 == event_add(&taskEvent_, 0)) { GlobalOutput("TNonblockingServer::serve(): coult not event_add"); @@ -260,7 +266,7 @@ void TConnection::transition() { return; } catch (TException &x) { fprintf(stderr, "TException: Server::process() %s\n", x.what()); - close(); + close(); return; } catch (...) { fprintf(stderr, "Server::process() unknown exception\n"); @@ -434,6 +440,7 @@ void TConnection::setFlags(short eventFlags) { * its own ev. */ event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this); + event_base_set(server_->getEventBase(), &event_); // Add the event if (event_add(&event_, 0) == -1) { @@ -493,15 +500,15 @@ void TNonblockingServer::returnConnection(TConnection* connection) { void TNonblockingServer::handleEvent(int fd, short which) { // Make sure that libevent didn't fuck up the socket handles assert(fd == serverSocket_); - + // Server socket accepted a new connection socklen_t addrLen; struct sockaddr addr; - addrLen = sizeof(addr); - + addrLen = sizeof(addr); + // Going to accept a new client socket int clientSocket; - + // Accept as many new clients as possible, even though libevent signaled only // one, this helps us to avoid having to go back into the libevent engine so // many times @@ -530,7 +537,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Put this client connection into the proper state clientConnection->transition(); } - + // Done looping accept, now we have to make sure the error is due to // blocking. Any other error is a problem if (errno != EAGAIN && errno != EWOULDBLOCK) { @@ -539,21 +546,13 @@ void TNonblockingServer::handleEvent(int fd, short which) { } /** - * Main workhorse function, starts up the server listening on a port and - * loops over the libevent handler. + * Creates a socket to listen on and binds it to the local port. */ -void TNonblockingServer::serve() { - // Initialize libevent - event_init(); - - // Print some libevent stats - fprintf(stderr, - "libevent %s method %s\n", - event_get_version(), - event_get_method()); - +void TNonblockingServer::listenSocket() { + int s; struct addrinfo hints, *res, *res0; int error; + char port[sizeof("65536") + 1]; memset(&hints, 0, sizeof(hints)); hints.ai_family = PF_UNSPEC; @@ -576,64 +575,105 @@ void TNonblockingServer::serve() { } // Create the server socket - serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (serverSocket_ == -1) { - GlobalOutput("TNonblockingServer::serve() socket() -1"); - return; + s = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (s == -1) { + freeaddrinfo(res0); + throw TException("TNonblockingServer::serve() socket() -1"); + } + + int one = 1; + + // Set reuseaddr to avoid 2MSL delay on server restart + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + + if (bind(s, res->ai_addr, res->ai_addrlen) == -1) { + close(s); + freeaddrinfo(res0); + throw TException("TNonblockingServer::serve() bind"); } + // Done with the addr info + freeaddrinfo(res0); + + // Set up this file descriptor for listening + listenSocket(s); +} + +/** + * Takes a socket created by listenSocket() and sets various options on it + * to prepare for use in the server. + */ +void TNonblockingServer::listenSocket(int s) { // Set socket to nonblocking mode int flags; - if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 || - fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) { - GlobalOutput("TNonblockingServer::serve() O_NONBLOCK"); - ::close(serverSocket_); - return; + if ((flags = fcntl(s, F_GETFL, 0)) < 0 || + fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) { + close(s); + throw TException("TNonblockingServer::serve() O_NONBLOCK"); } int one = 1; struct linger ling = {0, 0}; - - // Set reuseaddr to avoid 2MSL delay on server restart - setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); // Keepalive to ensure full result flushing - setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)); + setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)); // Turn linger off to avoid hung sockets - setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); + setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); // Set TCP nodelay if available, MAC OS X Hack // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html #ifndef TCP_NOPUSH - setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); #endif - if (bind(serverSocket_, res->ai_addr, res->ai_addrlen) == -1) { - GlobalOutput("TNonblockingServer::serve() bind"); - close(serverSocket_); - return; + if (listen(s, LISTEN_BACKLOG) == -1) { + close(s); + throw TException("TNonblockingServer::serve() listen"); } - if (listen(serverSocket_, LISTEN_BACKLOG) == -1) { - GlobalOutput("TNonblockingServer::serve() listen"); - close(serverSocket_); - return; - } + // Cool, this socket is good to go, set it as the serverSocket_ + serverSocket_ = s; +} + +/** + * Register the core libevent events onto the proper base. + */ +void TNonblockingServer::registerEvents(event_base* base) { + assert(serverSocket_ != -1); + assert(!eventBase_); + eventBase_ = base; + + // Print some libevent stats + fprintf(stderr, + "libevent %s method %s\n", + event_get_version(), + event_get_method()); // Register the server event - struct event serverEvent; - event_set(&serverEvent, + event_set(&serverEvent_, serverSocket_, EV_READ | EV_PERSIST, TNonblockingServer::eventHandler, this); + event_base_set(eventBase_, &serverEvent_); // Add the event and start up the server - if (-1 == event_add(&serverEvent, 0)) { - GlobalOutput("TNonblockingServer::serve(): coult not event_add"); - return; + if (-1 == event_add(&serverEvent_, 0)) { + throw TException("TNonblockingServer::serve(): coult not event_add"); } +} + +/** + * Main workhorse function, starts up the server listening on a port and + * loops over the libevent handler. + */ +void TNonblockingServer::serve() { + // Init socket + listenSocket(); + + // Initialize libevent core + registerEvents(static_cast(event_init())); // Run pre-serve callback function if we have one if (preServeCallback_) { @@ -641,7 +681,7 @@ void TNonblockingServer::serve() { } // Run libevent engine, never returns, invokes calls to eventHandler - event_loop(0); + event_base_loop(eventBase_, 0); } }}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 5470ad4e..2cdd897d 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -14,7 +14,7 @@ #include #include -namespace facebook { namespace thrift { namespace server { +namespace facebook { namespace thrift { namespace server { using facebook::thrift::transport::TMemoryBuffer; using facebook::thrift::protocol::TProtocol; @@ -55,6 +55,12 @@ class TNonblockingServer : public TServer { // Is thread pool processing? bool threadPoolProcessing_; + // The event base for libevent + event_base* eventBase_; + + // Event struct, for use with eventBase_ + struct event serverEvent_; + /** * This is a stack of all the objects that have been created but that * are NOT currently in use. When we close a connection, we place it on this @@ -64,7 +70,7 @@ class TNonblockingServer : public TServer { std::stack connectionStack_; // Pointer to optional function called after opening the listen socket and - // before running the event loop, along with its argument data + // before running the event loop, along with its argument data. void (*preServeCallback_)(void*); void* preServeCallbackArg_; @@ -74,22 +80,24 @@ class TNonblockingServer : public TServer { TNonblockingServer(boost::shared_ptr processor, int port) : TServer(processor), - serverSocket_(0), + serverSocket_(-1), port_(port), frameResponses_(true), threadPoolProcessing_(false), + eventBase_(NULL), preServeCallback_(NULL), preServeCallbackArg_(NULL) {} - TNonblockingServer(boost::shared_ptr processor, + TNonblockingServer(boost::shared_ptr processor, boost::shared_ptr protocolFactory, int port, boost::shared_ptr threadManager = boost::shared_ptr()) : TServer(processor), - serverSocket_(0), + serverSocket_(-1), port_(port), frameResponses_(true), - threadManager_(threadManager), + threadManager_(threadManager), + eventBase_(NULL), preServeCallback_(NULL), preServeCallbackArg_(NULL) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); @@ -119,7 +127,7 @@ class TNonblockingServer : public TServer { setOutputProtocolFactory(outputProtocolFactory); setThreadManager(threadManager); } - + ~TNonblockingServer() {} void setThreadManager(boost::shared_ptr threadManager) { @@ -127,7 +135,7 @@ class TNonblockingServer : public TServer { threadPoolProcessing_ = (threadManager != NULL); } - bool isThreadPoolProcessing() { + bool isThreadPoolProcessing() const { return threadPoolProcessing_; } @@ -139,10 +147,14 @@ class TNonblockingServer : public TServer { frameResponses_ = frameResponses; } - bool getFrameResponses() { + bool getFrameResponses() const { return frameResponses_; } + event_base* getEventBase() const { + return eventBase_; + } + TConnection* createConnection(int socket, short flags); void returnConnection(TConnection* connection); @@ -151,6 +163,12 @@ class TNonblockingServer : public TServer { ((TNonblockingServer*)v)->handleEvent(fd, which); } + void listenSocket(); + + void listenSocket(int fd); + + void registerEvents(event_base* base); + void serve(); void setPreServeCallback(void(*fn_ptr)(void*), void* arg = NULL) { @@ -225,7 +243,7 @@ class TConnection { // Write buffer uint8_t* writeBuffer_; - + // Write buffer size uint32_t writeBufferSize_; @@ -256,7 +274,7 @@ class TConnection { // Protocol encoder boost::shared_ptr outputProtocol_; - + // Go into read mode void setRead() { setFlags(EV_READ | EV_PERSIST); @@ -290,13 +308,13 @@ class TConnection { throw new facebook::thrift::TException("Out of memory."); } readBufferSize_ = 1024; - + // Allocate input and output tranpsorts - // these only need to be allocated once per TConnection (they don't need to be + // these only need to be allocated once per TConnection (they don't need to be // reallocated on init() call) inputTransport_ = boost::shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); outputTransport_ = boost::shared_ptr(new TMemoryBuffer()); - + init(socket, eventFlags, s); } @@ -311,7 +329,7 @@ class TConnection { assert(fd == ((TConnection*)v)->socket_); ((TConnection*)v)->workSocket(); } - + // Handler wrapper for task block static void taskHandler(int fd, short which, void* v) { assert(fd == ((TConnection*)v)->taskHandle_); diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h index 3a974480..1ebbfbda 100644 --- a/lib/cpp/src/transport/TTransportUtils.h +++ b/lib/cpp/src/transport/TTransportUtils.h @@ -12,14 +12,14 @@ #include #include -namespace facebook { namespace thrift { namespace transport { +namespace facebook { namespace thrift { namespace transport { /** * The null transport is a dummy transport that doesn't actually do anything. * It's sort of an analogy to /dev/null, you can never read anything from it * and it will let you write anything you want to it, though it won't actually * go anywhere. - * + * * @author Mark Slee */ class TNullTransport : public TTransport { @@ -82,8 +82,8 @@ class TBufferedTransport : public TTransport { bool isOpen() { return transport_->isOpen(); } - - bool peek() { + + bool peek() { if (rPos_ >= rLen_) { rLen_ = transport_->read(rBuf_, rBufSize_); rPos_ = 0; @@ -101,7 +101,7 @@ class TBufferedTransport : public TTransport { } uint32_t read(uint8_t* buf, uint32_t len); - + void write(const uint8_t* buf, uint32_t len); void flush(); @@ -192,7 +192,7 @@ class TFramedTransport : public TTransport { void setWrite(bool write) { write_ = write; } - + void open() { transport_->open(); } @@ -209,10 +209,12 @@ class TFramedTransport : public TTransport { } void close() { - flush(); + if (wLen_ > 0) { + flush(); + } transport_->close(); } - + uint32_t read(uint8_t* buf, uint32_t len); void write(const uint8_t* buf, uint32_t len); @@ -397,16 +399,16 @@ class TMemoryBuffer : public TTransport { void consume(uint32_t len); - private: + private: // Data buffer uint8_t* buffer_; - + // Allocated buffer size uint32_t bufferSize_; // Where the write is at uint32_t wPos_; - + // Where the reader is at uint32_t rPos_; @@ -416,17 +418,17 @@ class TMemoryBuffer : public TTransport { }; /** - * TPipedTransport. This transport allows piping of a request from one + * TPipedTransport. This transport allows piping of a request from one * transport to another either when readEnd() or writeEnd(). The typical * use case for this is to log a request or a reply to disk. - * The underlying buffer expands to a keep a copy of the entire + * The underlying buffer expands to a keep a copy of the entire * request/response. * * @author Aditya Agarwal */ class TPipedTransport : virtual public TTransport { public: - TPipedTransport(boost::shared_ptr srcTrans, + TPipedTransport(boost::shared_ptr srcTrans, boost::shared_ptr dstTrans) : srcTrans_(srcTrans), dstTrans_(dstTrans), @@ -435,14 +437,14 @@ class TPipedTransport : virtual public TTransport { // default is to to pipe the request when readEnd() is called pipeOnRead_ = true; - pipeOnWrite_ = false; + pipeOnWrite_ = false; rBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * rBufSize_); wBuf_ = (uint8_t*) malloc(sizeof(uint8_t) * wBufSize_); } - - TPipedTransport(boost::shared_ptr srcTrans, - boost::shared_ptr dstTrans, + + TPipedTransport(boost::shared_ptr srcTrans, + boost::shared_ptr dstTrans, uint32_t sz) : srcTrans_(srcTrans), dstTrans_(dstTrans), @@ -461,15 +463,15 @@ class TPipedTransport : virtual public TTransport { bool isOpen() { return srcTrans_->isOpen(); } - - bool peek() { + + bool peek() { if (rPos_ >= rLen_) { // Double the size of the underlying buffer if it is full if (rLen_ == rBufSize_) { rBufSize_ *=2; rBuf_ = (uint8_t *)realloc(rBuf_, sizeof(uint8_t) * rBufSize_); } - + // try to fill up the buffer rLen_ += srcTrans_->read(rBuf_+rPos_, rBufSize_ - rPos_); } @@ -492,7 +494,7 @@ class TPipedTransport : virtual public TTransport { void setPipeOnWrite(bool pipeVal) { pipeOnWrite_ = pipeVal; } - + uint32_t read(uint8_t* buf, uint32_t len); void readEnd() { @@ -522,7 +524,7 @@ class TPipedTransport : virtual public TTransport { boost::shared_ptr getTargetTransport() { return dstTrans_; - } + } protected: boost::shared_ptr srcTrans_; @@ -576,7 +578,7 @@ class TPipedTransportFactory : public TTransportFactory { /** * TPipedFileTransport. This is just like a TTransport, except that - * it is a templatized class, so that clients who rely on a specific + * it is a templatized class, so that clients who rely on a specific * TTransport can still access the original transport. * * @author James Wang @@ -622,7 +624,7 @@ class TPipedFileReaderTransport : public TPipedTransport, class TPipedFileReaderTransportFactory : public TPipedTransportFactory { public: TPipedFileReaderTransportFactory() {} - TPipedFileReaderTransportFactory(boost::shared_ptr dstTrans) + TPipedFileReaderTransportFactory(boost::shared_ptr dstTrans) : TPipedTransportFactory(dstTrans) {} virtual ~TPipedFileReaderTransportFactory() {} diff --git a/test/cpp/src/nb-main.cpp b/test/cpp/src/nb-main.cpp index 9fc1beb9..164febb3 100644 --- a/test/cpp/src/nb-main.cpp +++ b/test/cpp/src/nb-main.cpp @@ -61,7 +61,7 @@ class Server : public ServiceIf { Server() {} void count(const char* method) { - MutexMonitor m(lock_); + Guard m(lock_); int ct = counts_[method]; counts_[method] = ++ct; } @@ -74,7 +74,7 @@ class Server : public ServiceIf { } count_map getCount() { - MutexMonitor m(lock_); + Guard m(lock_); return counts_; } @@ -354,10 +354,12 @@ int main(int argc, char **argv) { } shared_ptr serverThread; + shared_ptr serverThread2; if (serverType == "simple") { serverThread = threadFactory->newThread(shared_ptr(new TNonblockingServer(serviceProcessor, protocolFactory, port))); + serverThread2 = threadFactory->newThread(shared_ptr(new TNonblockingServer(serviceProcessor, protocolFactory, port+1))); } else if (serverType == "thread-pool") { @@ -366,17 +368,21 @@ int main(int argc, char **argv) { threadManager->threadFactory(threadFactory); threadManager->start(); serverThread = threadFactory->newThread(shared_ptr(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager))); + serverThread2 = threadFactory->newThread(shared_ptr(new TNonblockingServer(serviceProcessor, protocolFactory, port+1, threadManager))); } - cerr << "Starting the server on port " << port << endl; + cerr << "Starting the server on port " << port << " and " << (port + 1) << endl; serverThread->start(); + serverThread2->start(); // If we aren't running clients, just wait forever for external clients if (clientCount == 0) { serverThread->join(); + serverThread2->join(); } } + sleep(1); if (clientCount > 0) { @@ -395,7 +401,7 @@ int main(int argc, char **argv) { for (size_t ix = 0; ix < clientCount; ix++) { - shared_ptr socket(new TSocket("127.0.0.1", port)); + shared_ptr socket(new TSocket("127.0.0.1", port + (ix % 2))); shared_ptr framedSocket(new TFramedTransport(socket)); shared_ptr protocol(new TBinaryProtocol(framedSocket)); shared_ptr serviceClient(new ServiceClient(protocol));