From: Jake Farrell Date: Tue, 6 Dec 2011 01:17:26 +0000 (+0000) Subject: Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads X-Git-Tag: 0.9.1~515 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=b0d9560c330fedb49be6b2272b95c83b000fd415;p=common%2Fthrift.git Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads Client: cpp Patch: Dave Watson Ads multiple IO threads to TNonblocking Server git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1210737 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index 70204f11..6924aa64 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -156,7 +156,13 @@ class PthreadThread: public Thread { cause the process to run out of thread resources. We're beyond the point of throwing an exception. Not clear how best to handle this. */ - detached_ = pthread_join(pthread_, &ignore) == 0; + int res = pthread_join(pthread_, &ignore); + detached_ = (res == 0); + if (res != 0) { + GlobalOutput.printf("PthreadThread::join(): fail with code %d", res); + } + } else { + GlobalOutput.printf("PthreadThread::join(): detached thread"); } } diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index c331edab..ba029a9c 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -24,6 +24,7 @@ #include "TNonblockingServer.h" #include #include +#include #include @@ -50,6 +51,7 @@ #include #include +#include #ifndef AF_LOCAL #define AF_LOCAL AF_UNIX @@ -63,6 +65,7 @@ using namespace apache::thrift::concurrency; using namespace std; using apache::thrift::transport::TSocket; using apache::thrift::transport::TTransportException; +using boost::shared_ptr; /// Three states for sockets: recv frame size, recv data, and send mode enum TSocketState { @@ -94,6 +97,8 @@ enum TAppState { */ class TNonblockingServer::TConnection { private: + /// Server IO Thread handling this connection + TNonblockingIOThread* ioThread_; /// Server handle TNonblockingServer* server_; @@ -209,25 +214,25 @@ class TNonblockingServer::TConnection { class Task; /// Constructor - TConnection(int socket, short eventFlags, TNonblockingServer *s, + TConnection(int socket, TNonblockingIOThread* ioThread, const sockaddr* addr, socklen_t addrLen) { readBuffer_ = NULL; readBufferSize_ = 0; - // Allocate input and output transports - // these only need to be allocated once per TConnection (they don't need to be - // reallocated on init() call) - inputTransport_ = boost::shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); - outputTransport_ = boost::shared_ptr(new TMemoryBuffer(s->getWriteBufferDefaultSize())); - tSocket_.reset(new TSocket()); + ioThread_ = ioThread; + server_ = ioThread->getServer(); - init(socket, eventFlags, s, addr, addrLen); - server_->incrementNumConnections(); + // Allocate input and output transports these only need to be allocated + // once per TConnection (they don't need to be reallocated on init() call) + inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_)); + outputTransport_.reset(new TMemoryBuffer( + server_->getWriteBufferDefaultSize())); + tSocket_.reset(new TSocket()); + init(socket, ioThread, addr, addrLen); } ~TConnection() { std::free(readBuffer_); - server_->decrementNumConnections(); } /** @@ -239,7 +244,7 @@ class TNonblockingServer::TConnection { void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); /// Initialize - void init(int socket, short eventFlags, TNonblockingServer *s, + void init(int socket, TNonblockingIOThread* ioThread, const sockaddr* addr, socklen_t addrLen); /** @@ -262,61 +267,42 @@ class TNonblockingServer::TConnection { ((TConnection*)v)->workSocket(); } - /** - * C-callable event handler for signaling task completion. Provides a - * callback that libevent can understand that will read a connection - * object's address from a pipe and call connection->transition() for - * that object. - * - * @param fd the descriptor the event occurred on. - */ - static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) { - TConnection* connection; - ssize_t nBytes; - while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0)) - == sizeof(TConnection*)) { - connection->transition(); - } - if (nBytes > 0) { - throw TException("TConnection::taskHandler unexpected partial read"); - } - if (errno != EWOULDBLOCK && errno != EAGAIN) { - GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno); - } - } - /** * Notification to server that processing has ended on this request. * Can be called either when processing is completed or when a waiting * task has been preemptively terminated (on overload). * + * Don't call this from the IO thread itself. + * * @return true if successful, false if unable to notify (check errno). */ - bool notifyServer() { - TConnection* connection = this; - if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection), - sizeof(TConnection*), 0) != sizeof(TConnection*)) { - return false; - } + bool notifyIOThread() { + return ioThread_->notify(this); + } - return true; + /* + * Returns the number of this connection's currently assigned IO + * thread. + */ + int getIOThreadNumber() const { + return ioThread_->getThreadNumber(); } /// Force connection shutdown for this connection. void forceClose() { appState_ = APP_CLOSE_CONNECTION; - if (!notifyServer()) { + if (!notifyIOThread()) { throw TException("TConnection::forceClose: failed write on notify pipe"); } } /// return the server this connection was initialized for. - TNonblockingServer* getServer() { + TNonblockingServer* getServer() const { return server_; } /// get state of connection. - TAppState getState() { + TAppState getState() const { return appState_; } @@ -362,19 +348,20 @@ class TNonblockingServer::TConnection::Task: public Runnable { } } } catch (const TTransportException& ttx) { - GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what()); + GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what()); } catch (const bad_alloc&) { - GlobalOutput("TNonblockingServer caught bad_alloc exception."); + GlobalOutput("TNonblockingServer: caught bad_alloc exception."); exit(-1); } catch (const std::exception& x) { - GlobalOutput.printf("TNonblockingServer process() exception: %s: %s", + GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s", typeid(x).name(), x.what()); } catch (...) { - GlobalOutput("TNonblockingServer uncaught exception."); + GlobalOutput.printf( + "TNonblockingServer: unknown exception while processing."); } // Signal completion back to the libevent thread via a pipe - if (!connection_->notifyServer()) { + if (!connection_->notifyIOThread()) { throw TException("TNonblockingServer::Task::run: failed write on notify pipe"); } } @@ -392,14 +379,15 @@ class TNonblockingServer::TConnection::Task: public Runnable { void* connectionContext_; }; -void TNonblockingServer::TConnection::init(int socket, short eventFlags, - TNonblockingServer* s, +void TNonblockingServer::TConnection::init(int socket, + TNonblockingIOThread* ioThread, const sockaddr* addr, socklen_t addrLen) { tSocket_->setSocketFD(socket); tSocket_->setCachedAddress(addr, addrLen); - server_ = s; + ioThread_ = ioThread; + server_ = ioThread->getServer(); appState_ = APP_INIT; eventFlags_ = 0; @@ -412,30 +400,31 @@ void TNonblockingServer::TConnection::init(int socket, short eventFlags, largestWriteBufferSize_ = 0; socketState_ = SOCKET_RECV_FRAMING; - appState_ = APP_INIT; callsForResize_ = 0; - // Set flags, which also registers the event - setFlags(eventFlags); - // get input/transports - factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_); - factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_); + factoryInputTransport_ = server_->getInputTransportFactory()->getTransport( + inputTransport_); + factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport( + outputTransport_); // Create protocol - inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_); - outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); + inputProtocol_ = server_->getInputProtocolFactory()->getProtocol( + factoryInputTransport_); + outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol( + factoryOutputTransport_); // Set up for any server event handler serverEventHandler_ = server_->getEventHandler(); if (serverEventHandler_ != NULL) { - connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_); + connectionContext_ = serverEventHandler_->createContext(inputProtocol_, + outputProtocol_); } else { connectionContext_ = NULL; } // Get the processor - processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_); + processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_); } void TNonblockingServer::TConnection::workSocket() { @@ -500,7 +489,7 @@ void TNonblockingServer::TConnection::workSocket() { return; } - + if (got > 0) { // Move along in the buffer readBufferPos_ += got; @@ -565,6 +554,9 @@ void TNonblockingServer::TConnection::workSocket() { * to, or finished receiving the data that it needed to. */ void TNonblockingServer::TConnection::transition() { + // ensure this connection is active right now + assert(ioThread_); + assert(server_); // Switch upon the state that we are currently in and move to a new state switch (appState_) { @@ -800,7 +792,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) { */ event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this); - event_base_set(server_->getEventBase(), &event_); + event_base_set(ioThread_->getEventBase(), &event_); // Add the event if (event_add(&event_, 0) == -1) { @@ -820,6 +812,7 @@ void TNonblockingServer::TConnection::close() { if (serverEventHandler_ != NULL) { serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_); } + ioThread_ = NULL; // Close the socket tSocket_->close(); @@ -861,14 +854,6 @@ TNonblockingServer::~TNonblockingServer() { connectionStack_.pop(); delete connection; } - - if (eventBase_ && ownEventBase_) { - event_base_free(eventBase_); - } - - if (serverSocket_ >= 0) { - close(serverSocket_); - } } /** @@ -876,27 +861,41 @@ TNonblockingServer::~TNonblockingServer() { * by allocating a new one entirely */ TNonblockingServer::TConnection* TNonblockingServer::createConnection( - int socket, short flags, - const sockaddr* addr, - socklen_t addrLen) { + int socket, const sockaddr* addr, socklen_t addrLen) { // Check the stack + Guard g(connMutex_); + + // pick an IO thread to handle this connection -- currently round robin + assert(nextIOThread_ >= 0); + assert(nextIOThread_ < ioThreads_.size()); + int selectedThreadIdx = nextIOThread_; + nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size(); + + TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get(); + + // Check the connection stack to see if we can re-use + TConnection* result = NULL; if (connectionStack_.empty()) { - return new TConnection(socket, flags, this, addr, addrLen); + result = new TConnection(socket, ioThread, addr, addrLen); + ++numTConnections_; } else { - TConnection* result = connectionStack_.top(); + result = connectionStack_.top(); connectionStack_.pop(); - result->init(socket, flags, this, addr, addrLen); - return result; + result->init(socket, ioThread, addr, addrLen); } + return result; } /** * Returns a connection to the stack */ void TNonblockingServer::returnConnection(TConnection* connection) { + Guard g(connMutex_); + if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) { delete connection; + --numTConnections_; } else { connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_); connectionStack_.push(connection); @@ -927,6 +926,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) { // If we're overloaded, take action here if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) { + Guard g(connMutex_); nConnectionsDropped_++; nTotalConnectionsDropped_++; if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) { @@ -940,6 +940,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { } } } + // Explicitly set this socket to NONBLOCK mode int flags; if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 || @@ -951,7 +952,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Create a new TConnection for this client socket. TConnection* clientConnection = - createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen); + createConnection(clientSocket, addrp, addrLen); // Fail fast if we could not create a TConnection object if (clientConnection == NULL) { @@ -960,13 +961,29 @@ void TNonblockingServer::handleEvent(int fd, short which) { return; } - // Put this client connection into the proper state - clientConnection->transition(); + /* + * Either notify the ioThread that is assigned this connection to + * start processing, or if it is us, we'll just ask this + * connection to do its initial state change here. + * + * (We need to avoid writing to our own notification pipe, to + * avoid possible deadlocks if the pipe is full.) + * + * The IO thread #0 is the only one that handles these listen + * events, so unless the connection has been assigned to thread #0 + * we know it's not on our thread. + */ + if (clientConnection->getIOThreadNumber() == 0) { + clientConnection->transition(); + } else { + clientConnection->notifyIOThread(); + } // addrLen is written by the accept() call, so needs to be set before the next call. addrLen = sizeof(addrStorage); } + // Done looping accept, now we have to make sure the error is due to // blocking. Any other error is a problem if (errno != EAGAIN && errno != EWOULDBLOCK) { @@ -977,8 +994,9 @@ void TNonblockingServer::handleEvent(int fd, short which) { /** * Creates a socket to listen on and binds it to the local port. */ -void TNonblockingServer::listenSocket() { +void TNonblockingServer::createAndListenOnSocket() { int s; + struct addrinfo hints, *res, *res0; int error; @@ -1082,63 +1100,6 @@ void TNonblockingServer::listenSocket(int s) { serverSocket_ = s; } -void TNonblockingServer::createNotificationPipe() { - if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { - GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR()); - throw TException("can't create notification pipe"); - } - if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 || - evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) { - close(notificationPipeFDs_[0]); - close(notificationPipeFDs_[1]); - throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK"); - } -} - -/** - * Register the core libevent events onto the proper base. - */ -void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) { - assert(serverSocket_ != -1); - assert(!eventBase_); - eventBase_ = base; - ownEventBase_ = ownEventBase; - - // Print some libevent stats - GlobalOutput.printf("libevent %s method %s", - event_get_version(), - event_base_get_method(eventBase_)); - - // Register the server event - event_set(&serverEvent_, - serverSocket_, - EV_READ | EV_PERSIST, - TNonblockingServer::eventHandler, - this); - event_base_set(eventBase_, &serverEvent_); - - // Add the event and start up the server - if (-1 == event_add(&serverEvent_, 0)) { - throw TException("TNonblockingServer::serve(): coult not event_add"); - } - if (threadPoolProcessing_) { - // Create an event to be notified when a task finishes - event_set(¬ificationEvent_, - getNotificationRecvFD(), - EV_READ | EV_PERSIST, - TConnection::taskHandler, - this); - - // Attach to the base - event_base_set(eventBase_, ¬ificationEvent_); - - // Add the event and start up the server - if (-1 == event_add(¬ificationEvent_, 0)) { - throw TException("TNonblockingServer::serve(): notification event_add fail"); - } - } -} - void TNonblockingServer::setThreadManager(boost::shared_ptr threadManager) { threadManager_ = threadManager; if (threadManager != NULL) { @@ -1154,14 +1115,15 @@ bool TNonblockingServer::serverOverloaded() { if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) { if (!overloaded_) { - GlobalOutput.printf("thrift non-blocking server overload condition"); + GlobalOutput.printf("TNonblockingServer: overload condition begun."); overloaded_ = true; } } else { if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) && (activeConnections <= overloadHysteresis_ * maxConnections_)) { - GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)", + GlobalOutput.printf("TNonblockingServer: overload ended; " + "%u dropped (%llu total)", nConnectionsDropped_, nTotalConnectionsDropped_); nConnectionsDropped_ = 0; overloaded_ = false; @@ -1189,73 +1151,361 @@ bool TNonblockingServer::drainPendingTask() { void TNonblockingServer::expireClose(boost::shared_ptr task) { TConnection* connection = static_cast(task.get())->getTConnection(); - assert(connection && connection->getServer() - && connection->getState() == APP_WAIT_TASK); + assert(connection && connection->getServer() && + connection->getState() == APP_WAIT_TASK); connection->forceClose(); } +void TNonblockingServer::stop() { + // Breaks the event loop in all threads so that they end ASAP. + for (int i = 0; i < ioThreads_.size(); ++i) { + ioThreads_[i]->stop(); + } +} + /** * Main workhorse function, starts up the server listening on a port and * loops over the libevent handler. */ void TNonblockingServer::serve() { - // Init socket - listenSocket(); + // init listen socket + createAndListenOnSocket(); - if (threadPoolProcessing_) { - // Init task completion notification pipe - createNotificationPipe(); + // set up the IO threads + assert(ioThreads_.empty()); + if (!numIOThreads_) { + numIOThreads_ = DEFAULT_IO_THREADS; } - // Initialize libevent core - registerEvents(static_cast(event_base_new()), true); + for (int id = 0; id < numIOThreads_; ++id) { + // the first IO thread also does the listening on server socket + int listenFd = (id == 0 ? serverSocket_ : -1); + + shared_ptr thread( + new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); + ioThreads_.push_back(thread); + } - // Run the preServe event + // Notify handler of the preServe event if (eventHandler_ != NULL) { eventHandler_->preServe(); } - // Run libevent engine, invokes calls to eventHandler - // Only returns if stop() is called. - event_base_loop(eventBase_, 0); + // Start all of our helper IO threads. Note that the threads run forever, + // only terminating if stop() is called. + assert(ioThreads_.size() == numIOThreads_); + assert(ioThreads_.size() > 0); + + GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.", + port_, ioThreads_.size()); + + // Launch all the secondary IO threads in separate threads + if (ioThreads_.size() > 1) { + ioThreadFactory_.reset(new PosixThreadFactory( + PosixThreadFactory::OTHER, // scheduler + PosixThreadFactory::NORMAL, // priority + 1, // stack size (MB) + false // detached + )); + + assert(ioThreadFactory_.get()); + + // intentionally starting at thread 1, not 0 + for (int i = 1; i < ioThreads_.size(); ++i) { + shared_ptr thread = ioThreadFactory_->newThread(ioThreads_[i]); + ioThreads_[i]->setThread(thread); + thread->start(); + } + } + + // Run the primary (listener) IO thread loop in our main thread; this will + // only return when the server is shutting down. + ioThreads_[0]->run(); + + // Ensure all threads are finished before exiting serve() + for (int i = 0; i < ioThreads_.size(); ++i) { + ioThreads_[i]->join(); + GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i); + } } -void TNonblockingServer::stop() { - if (!eventBase_) { - return; +TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server, + int number, + int listenSocket, + bool useHighPriority) + : server_(server) + , number_(number) + , listenSocket_(listenSocket) + , useHighPriority_(useHighPriority) + , eventBase_(NULL) { + notificationPipeFDs_[0] = -1; + notificationPipeFDs_[1] = -1; +} + +TNonblockingIOThread::~TNonblockingIOThread() { + // make sure our associated thread is fully finished + join(); + + if (eventBase_) { + event_base_free(eventBase_); } - // Call event_base_loopbreak() to tell libevent to exit the loop - // - // (The libevent documentation doesn't explicitly state that this function is - // safe to call from another thread. However, all it does is set a variable, - // in the event_base, so it should be fine.) + if (listenSocket_ >= 0) { + if (0 != close(listenSocket_)) { + GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", + errno); + } + listenSocket_ = TNonblockingServer::INVALID_SOCKET; + } + + for (int i = 0; i < 2; ++i) { + if (notificationPipeFDs_[i] >= 0) { + if (0 != ::close(notificationPipeFDs_[i])) { + GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ", + errno); + } + notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET; + } + } +} + +void TNonblockingIOThread::createNotificationPipe() { + if (pipe(notificationPipeFDs_) != 0) { + GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno); + throw TException("can't create notification pipe"); + } + int flags; + if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 || + fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) { + close(notificationPipeFDs_[0]); + close(notificationPipeFDs_[1]); + throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK"); + } + for (int i = 0; i < 2; ++i) { + if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 || + fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) { + close(notificationPipeFDs_[0]); + close(notificationPipeFDs_[1]); + throw TException("TNonblockingServer::createNotificationPipe() " + "FD_CLOEXEC"); + } + } +} + +/** + * Register the core libevent events onto the proper base. + */ +void TNonblockingIOThread::registerEvents() { + if (listenSocket_ >= 0) { + // Register the server event + event_set(&serverEvent_, + listenSocket_, + EV_READ | EV_PERSIST, + TNonblockingIOThread::listenHandler, + server_); + event_base_set(eventBase_, &serverEvent_); + + // Add the event and start up the server + if (-1 == event_add(&serverEvent_, 0)) { + throw TException("TNonblockingServer::serve(): " + "event_add() failed on server listen event"); + } + GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", + number_); + } + + createNotificationPipe(); + + // Create an event to be notified when a task finishes + event_set(¬ificationEvent_, + getNotificationRecvFD(), + EV_READ | EV_PERSIST, + TNonblockingIOThread::notifyHandler, + this); + + // Attach to the base + event_base_set(eventBase_, ¬ificationEvent_); + + // Add the event and start up the server + if (-1 == event_add(¬ificationEvent_, 0)) { + throw TException("TNonblockingServer::serve(): " + "event_add() failed on task-done notification event"); + } + GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", + number_); +} + +bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { + int fd = getNotificationSendFD(); + if (fd < 0) { + return false; + } + + const int kSize = sizeof(conn); + if (write(fd, &conn, kSize) != kSize) { + return false; + } + + return true; +} + +/* static */ +void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) { + TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v; + assert(ioThread); + + while (true) { + TNonblockingServer::TConnection* connection = 0; + const int kSize = sizeof(connection); + int nBytes = read(fd, &connection, kSize); + if (nBytes == kSize) { + if (connection == NULL) { + // this is the command to stop our thread, exit the handler! + return; + } + connection->transition(); + } else if (nBytes > 0) { + // throw away these bytes and hope that next time we get a solid read + GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", + nBytes, kSize); + ioThread->breakLoop(true); + return; + } else if (nBytes == 0) { + GlobalOutput.printf("notifyHandler: Notify socket closed!"); + // exit the loop + break; + } else { // nBytes < 0 + if (errno != EWOULDBLOCK && errno != EAGAIN) { + GlobalOutput.perror( + "TNonblocking: notifyHandler read() failed: ", errno); + ioThread->breakLoop(true); + return; + } + // exit the loop + break; + } + } +} + +void TNonblockingIOThread::breakLoop(bool error) { + if (error) { + GlobalOutput.printf( + "TNonblockingServer: IO thread #%d exiting with error.", number_); + // TODO: figure out something better to do here, but for now kill the + // whole process. + GlobalOutput.printf("TNonblockingServer: aborting process."); + ::abort(); + } + + // sets a flag so that the loop exits on the next event event_base_loopbreak(eventBase_); - // event_base_loopbreak() only causes the loop to exit the next time it wakes - // up. We need to force it to wake up, in case there are no real events - // it needs to process. - // - // Attempt to connect to the server socket. If anything fails, - // we'll just have to wait until libevent wakes up on its own. + // event_base_loopbreak() only causes the loop to exit the next time + // it wakes up. We need to force it to wake up, in case there are + // no real events it needs to process. // - // First create a socket - int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - if (fd < 0) { - return; + // If we're running in the same thread, we can't use the notify(0) + // mechanism to stop the thread, but happily if we're running in the + // same thread, this means the thread can't be blocking in the event + // loop either. + if (!pthread_equal(pthread_self(), threadId_)) { + notify(NULL); + } +} + +void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { + // Start out with a standard, low-priority setup for the sched params. + struct sched_param sp; + bzero((void*) &sp, sizeof(sp)); + int policy = SCHED_OTHER; + + // If desired, set up high-priority sched params structure. + if (value) { + // FIFO scheduler, ranked above default SCHED_OTHER queue + policy = SCHED_FIFO; + // The priority only compares us to other SCHED_FIFO threads, so we + // just pick a random priority halfway between min & max. + const int priority = (sched_get_priority_max(policy) + + sched_get_priority_min(policy)) / 2; + + sp.sched_priority = priority; + } + + // Actually set the sched params for the current thread. + if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) { + GlobalOutput.printf( + "TNonblocking: IO Thread #%d using high-priority scheduler!", number_); + } else { + GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno); + } +} + +void TNonblockingIOThread::run() { + threadId_ = pthread_self(); + + assert(eventBase_ == 0); + eventBase_ = event_base_new(); + + // Print some libevent stats + if (number_ == 0) { + GlobalOutput.printf("TNonblockingServer: using libevent %s method %s", + event_get_version(), + event_base_get_method(eventBase_)); } - // Set up the address - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1 - addr.sin_port = htons(port_); - // Finally do the connect(). - // We don't care about the return value; - // we're just going to close the socket either way. - connect(fd, reinterpret_cast(&addr), sizeof(addr)); - close(fd); + registerEvents(); + + GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", + number_); + + if (useHighPriority_) { + setCurrentThreadHighPriority(true); + } + + // Run libevent engine, never returns, invokes calls to eventHandler + event_base_loop(eventBase_, 0); + + if (useHighPriority_) { + setCurrentThreadHighPriority(false); + } + + // cleans up our registered events + cleanupEvents(); + + GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", + number_); +} + +void TNonblockingIOThread::cleanupEvents() { + // stop the listen socket, if any + if (listenSocket_ >= 0) { + if (event_del(&serverEvent_) == -1) { + GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno); + } + } + + event_del(¬ificationEvent_); +} + + +void TNonblockingIOThread::stop() { + // This should cause the thread to fall out of its event loop ASAP. + breakLoop(false); +} + +void TNonblockingIOThread::join() { + // If this was a thread created by a factory (not the thread that called + // serve()), we join() it to make sure we shut down fully. + if (thread_) { + try { + // Note that it is safe to both join() ourselves twice, as well as join + // the current thread as the pthread implementation checks for deadlock. + thread_->join(); + } catch(...) { + // swallow everything + } + } } }}} // apache::thrift::server diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index da36045d..84e384c2 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -26,7 +26,11 @@ #include #include #include +#include +#include +#include #include +#include #include #include #include @@ -35,6 +39,8 @@ #endif #include + + namespace apache { namespace thrift { namespace server { using apache::thrift::transport::TMemoryBuffer; @@ -42,6 +48,11 @@ using apache::thrift::transport::TSocket; using apache::thrift::protocol::TProtocol; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::ThreadManager; +using apache::thrift::concurrency::PosixThreadFactory; +using apache::thrift::concurrency::ThreadFactory; +using apache::thrift::concurrency::Thread; +using apache::thrift::concurrency::Mutex; +using apache::thrift::concurrency::Guard; #ifdef LIBEVENT_VERSION_NUMBER #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24) @@ -78,9 +89,10 @@ inline SOCKOPT_CAST_T* cast_sockopt(T* v) { } /** - * This is a non-blocking server in C++ for high performance that operates a - * single IO thread. It assumes that all incoming requests are framed with a - * 4 byte length indicator and writes out responses using the same framing. + * This is a non-blocking server in C++ for high performance that + * operates a set of IO threads (by default only one). It assumes that + * all incoming requests are framed with a 4 byte length indicator and + * writes out responses using the same framing. * * It does not use the TServerTransport framework, but rather has socket * operations hardcoded for use with select. @@ -95,10 +107,14 @@ enum TOverloadAction { T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */ }; +class TNonblockingIOThread; + class TNonblockingServer : public TServer { private: class TConnection; + friend class TNonblockingIOThread; + private: /// Listen backlog static const int LISTEN_BACKLOG = 1024; @@ -123,6 +139,18 @@ class TNonblockingServer : public TServer { /// # of calls before resizing oversized buffers (0 = check only on close) static const int RESIZE_BUFFER_EVERY_N = 512; + /// # of IO threads to use by default + static const int DEFAULT_IO_THREADS = 1; + + /// File descriptor of an invalid socket + static const int INVALID_SOCKET = -1; + + /// # of IO threads this server will use + size_t numIOThreads_; + + /// Whether to set high scheduling priority for IO threads + bool useHighPriorityIOThreads_; + /// Server socket file descriptor int serverSocket_; @@ -135,15 +163,17 @@ class TNonblockingServer : public TServer { /// Is thread pool processing? bool threadPoolProcessing_; - /// The event base for libevent - event_base* eventBase_; - bool ownEventBase_; + // Factory to create the IO threads + boost::shared_ptr ioThreadFactory_; - /// Event struct, used with eventBase_ for connection events - struct event serverEvent_; + // Vector of IOThread objects that will handle our IO + std::vector > ioThreads_; - /// Event struct, used with eventBase_ for task completion notification - struct event notificationEvent_; + // Index of next IO Thread to be used (for round-robin) + int nextIOThread_; + + // Synchronizes access to connection stack and similar data + Mutex connMutex_; /// Number of TConnection object we've created size_t numTConnections_; @@ -211,9 +241,6 @@ class TNonblockingServer : public TServer { /// Count of connections dropped on overload since server started uint64_t nTotalConnectionsDropped_; - /// File descriptors for pipe used for task completion notification. - evutil_socket_t notificationPipeFDs_[2]; - /** * This is a stack of all the objects that have been created but that * are NOT currently in use. When we close a connection, we place it on this @@ -234,10 +261,11 @@ class TNonblockingServer : public TServer { void init(int port) { serverSocket_ = -1; + numIOThreads_ = DEFAULT_IO_THREADS; + nextIOThread_ = 0; + useHighPriorityIOThreads_ = false; port_ = port; threadPoolProcessing_ = false; - eventBase_ = NULL; - ownEventBase_ = false; numTConnections_ = 0; numActiveProcessors_ = 0; connectionStackLimit_ = CONNECTION_STACK_LIMIT; @@ -359,6 +387,31 @@ class TNonblockingServer : public TServer { return threadManager_; } + /** + * Sets the number of IO threads used by this server. Can only be used before + * the call to serve() and has no effect afterwards. We always use a + * PosixThreadFactory for the IO worker threads, because they must joinable + * for clean shutdown. + */ + void setNumIOThreads(size_t numThreads) { + numIOThreads_ = numThreads; + } + + /** Return whether the IO threads will get high scheduling priority */ + bool useHighPriorityIOThreads() const { + return useHighPriorityIOThreads_; + } + + /** Set whether the IO threads will get high scheduling priority. */ + void setUseHighPriorityIOThreads(bool val) { + useHighPriorityIOThreads_ = val; + } + + /** Return the number of IO threads used by this server. */ + size_t getNumIOThreads() const { + return numIOThreads_; + } + /** * Get the maximum number of unused TConnection we will hold in reserve. * @@ -385,20 +438,6 @@ class TNonblockingServer : public TServer { threadManager_->add(task, 0LL, taskExpireTime_); } - event_base* getEventBase() const { - return eventBase_; - } - - /// Increment our count of the number of connected sockets. - void incrementNumConnections() { - ++numTConnections_; - } - - /// Decrement our count of the number of connected sockets. - void decrementNumConnections() { - --numTConnections_; - } - /** * Return the count of sockets currently connected to. * @@ -431,11 +470,13 @@ class TNonblockingServer : public TServer { /// Increment the count of connections currently processing. void incrementActiveProcessors() { + Guard g(connMutex_); ++numActiveProcessors_; } /// Decrement the count of connections currently processing. void decrementActiveProcessors() { + Guard g(connMutex_); if (numActiveProcessors_ > 0) { --numActiveProcessors_; } @@ -615,7 +656,7 @@ class TNonblockingServer : public TServer { idleReadBufferLimit_ = limit; } - + /** * Get the maximum size of write buffer allocated to idle TConnection objects. @@ -659,21 +700,48 @@ class TNonblockingServer : public TServer { resizeBufferEveryN_ = count; } + /** + * Main workhorse function, starts up the server listening on a port and + * loops over the libevent handler. + */ + void serve(); + + /** + * Causes the server to terminate gracefully (can be called from any thread). + */ + void stop(); + + private: + /** + * Callback function that the threadmanager calls when a task reaches + * its expiration time. It is needed to clean up the expired connection. + * + * @param task the runnable associated with the expired task. + */ + void expireClose(boost::shared_ptr task); + /// Creates a socket to listen on and binds it to the local port. + void createAndListenOnSocket(); + /** + * Takes a socket created by createAndListenOnSocket() and sets various + * options on it to prepare for use in the server. + * + * @param fd descriptor of socket to be initialized/ + */ + void listenSocket(int fd); /** * Return an initialized connection object. Creates or recovers from * pool a TConnection and initializes it with the provided socket FD * and flags. * * @param socket FD of socket associated with this connection. - * @param flags initial lib_event flags for this connection. * @param addr the sockaddr of the client * @param addrLen the length of addr * @return pointer to initialized TConnection object. */ - TConnection* createConnection(int socket, short flags, - const sockaddr* addr, socklen_t addrLen); + TConnection* createConnection(int socket, const sockaddr* addr, + socklen_t addrLen); /** * Returns a connection to pool or deletion. If the connection pool @@ -683,14 +751,67 @@ class TNonblockingServer : public TServer { * @param connection the TConection being returned. */ void returnConnection(TConnection* connection); +}; + +class TNonblockingIOThread : public Runnable { + public: + // Creates an IO thread and sets up the event base. The listenSocket should + // be a valid FD on which listen() has already been called. If the + // listenSocket is < 0, accepting will not be done. + TNonblockingIOThread(TNonblockingServer* server, + int number, + int listenSocket, + bool useHighPriority); + + ~TNonblockingIOThread(); + + // Returns the event-base for this thread. + event_base* getEventBase() const { return eventBase_; } + + // Returns the server for this thread. + TNonblockingServer* getServer() const { return server_; } + + // Returns the number of this IO thread. + int getThreadNumber() const { return number_; } + + // Returns the thread id associated with this object. This should + // only be called after the thread has been started. + pthread_t getThreadId() const { return threadId_; } + + // Returns the send-fd for task complete notifications. + int getNotificationSendFD() const { return notificationPipeFDs_[1]; } + + // Returns the read-fd for task complete notifications. + int getNotificationRecvFD() const { return notificationPipeFDs_[0]; } + + // Returns the actual thread object associated with this IO thread. + boost::shared_ptr getThread() const { return thread_; } + + // Sets the actual thread object associated with this IO thread. + void setThread(const boost::shared_ptr& t) { thread_ = t; } + + // Used by TConnection objects to indicate processing has finished. + bool notify(TNonblockingServer::TConnection* conn); + + // Enters the event loop and does not return until a call to stop(). + virtual void run(); + + // Exits the event loop as soon as possible. + void stop(); + // Ensures that the event-loop thread is fully finished and shut down. + void join(); + + private: /** - * Callback function that the threadmanager calls when a task reaches - * its expiration time. It is needed to clean up the expired connection. + * C-callable event handler for signaling task completion. Provides a + * callback that libevent can understand that will read a connection + * object's address from a pipe and call connection->transition() for + * that object. * - * @param task the runnable associated with the expired task. + * @param fd the descriptor the event occurred on. */ - void expireClose(boost::shared_ptr task); + static void notifyHandler(int fd, short which, void* v); /** * C-callable event handler for listener events. Provides a callback @@ -700,63 +821,57 @@ class TNonblockingServer : public TServer { * @param which the flags associated with the event. * @param v void* callback arg where we placed TNonblockingServer's "this". */ - static void eventHandler(evutil_socket_t fd, short which, void* v) { + static void listenHandler(evutil_socket_t fd, short which, void* v) { ((TNonblockingServer*)v)->handleEvent(fd, which); } - /// Creates a socket to listen on and binds it to the local port. - void listenSocket(); + /// Exits the loop ASAP in case of shutdown or error. + void breakLoop(bool error); - /** - * Takes a socket created by listenSocket() and sets various options on it - * to prepare for use in the server. - * - * @param fd descriptor of socket to be initialized/ - */ - void listenSocket(int fd); + /// Registers the events for the notification & listen sockets + void registerEvents(); /// Create the pipe used to notify I/O process of task completion. void createNotificationPipe(); - /** - * Get notification pipe send descriptor. - * - * @return write fd for pipe. - */ - evutil_socket_t getNotificationSendFD() const { - return notificationPipeFDs_[1]; - } + /// Unregisters our events for notification and listen sockets. + void cleanupEvents(); - /** - * Get notification pipe receive descriptor. - * - * @return read fd of pipe. - */ - evutil_socket_t getNotificationRecvFD() const { - return notificationPipeFDs_[0]; - } + /// Sets (or clears) high priority scheduling status for the current thread. + void setCurrentThreadHighPriority(bool value); - /** - * Register the core libevent events onto the proper base. - * - * @param base pointer to the event base to be initialized. - * @param ownEventBase if true, this server is responsible for - * freeing the event base memory. - */ - void registerEvents(event_base* base, bool ownEventBase = true); + private: + /// associated server + TNonblockingServer* server_; - /** - * Main workhorse function, starts up the server listening on a port and - * loops over the libevent handler. - */ - void serve(); + /// thread number (for debugging). + const int number_; - /** - * May be called from a separate thread to cause serve() to return. - */ - void stop(); + /// The actual physical thread id. + pthread_t threadId_; + + /// If listenSocket_ >= 0, adds an event on the event_base to accept conns + int listenSocket_; + + /// Sets a high scheduling priority when running + bool useHighPriority_; + + /// pointer to eventbase to be used for looping + event_base* eventBase_; + + /// Used with eventBase_ for connection events (only in listener thread) + struct event serverEvent_; + + /// Used with eventBase_ for task completion notification + struct event notificationEvent_; + + /// File descriptors for pipe used for task completion notification. + int notificationPipeFDs_[2]; + + /// Actual IO Thread + boost::shared_ptr thread_; }; }}} // apache::thrift::server -#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_ +#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_