Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads
Client: cpp
Patch: Dave Watson
Ads multiple IO threads to TNonblocking Server
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1210737 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index c331eda..ba029a9 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -24,6 +24,7 @@
#include "TNonblockingServer.h"
#include <concurrency/Exception.h>
#include <transport/TSocket.h>
+#include <concurrency/PosixThreadFactory.h>
#include <iostream>
@@ -50,6 +51,7 @@
#include <errno.h>
#include <assert.h>
+#include <sched.h>
#ifndef AF_LOCAL
#define AF_LOCAL AF_UNIX
@@ -63,6 +65,7 @@
using namespace std;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState {
@@ -94,6 +97,8 @@
*/
class TNonblockingServer::TConnection {
private:
+ /// Server IO Thread handling this connection
+ TNonblockingIOThread* ioThread_;
/// Server handle
TNonblockingServer* server_;
@@ -209,25 +214,25 @@
class Task;
/// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s,
+ TConnection(int socket, TNonblockingIOThread* ioThread,
const sockaddr* addr, socklen_t addrLen) {
readBuffer_ = NULL;
readBufferSize_ = 0;
- // Allocate input and output transports
- // these only need to be allocated once per TConnection (they don't need to be
- // reallocated on init() call)
- inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
- outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
- tSocket_.reset(new TSocket());
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
- init(socket, eventFlags, s, addr, addrLen);
- server_->incrementNumConnections();
+ // Allocate input and output transports these only need to be allocated
+ // once per TConnection (they don't need to be reallocated on init() call)
+ inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+ outputTransport_.reset(new TMemoryBuffer(
+ server_->getWriteBufferDefaultSize()));
+ tSocket_.reset(new TSocket());
+ init(socket, ioThread, addr, addrLen);
}
~TConnection() {
std::free(readBuffer_);
- server_->decrementNumConnections();
}
/**
@@ -239,7 +244,7 @@
void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
- void init(int socket, short eventFlags, TNonblockingServer *s,
+ void init(int socket, TNonblockingIOThread* ioThread,
const sockaddr* addr, socklen_t addrLen);
/**
@@ -263,60 +268,41 @@
}
/**
- * C-callable event handler for signaling task completion. Provides a
- * callback that libevent can understand that will read a connection
- * object's address from a pipe and call connection->transition() for
- * that object.
- *
- * @param fd the descriptor the event occurred on.
- */
- static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
- TConnection* connection;
- ssize_t nBytes;
- while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
- == sizeof(TConnection*)) {
- connection->transition();
- }
- if (nBytes > 0) {
- throw TException("TConnection::taskHandler unexpected partial read");
- }
- if (errno != EWOULDBLOCK && errno != EAGAIN) {
- GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
- }
- }
-
- /**
* Notification to server that processing has ended on this request.
* Can be called either when processing is completed or when a waiting
* task has been preemptively terminated (on overload).
*
+ * Don't call this from the IO thread itself.
+ *
* @return true if successful, false if unable to notify (check errno).
*/
- bool notifyServer() {
- TConnection* connection = this;
- if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
- sizeof(TConnection*), 0) != sizeof(TConnection*)) {
- return false;
- }
+ bool notifyIOThread() {
+ return ioThread_->notify(this);
+ }
- return true;
+ /*
+ * Returns the number of this connection's currently assigned IO
+ * thread.
+ */
+ int getIOThreadNumber() const {
+ return ioThread_->getThreadNumber();
}
/// Force connection shutdown for this connection.
void forceClose() {
appState_ = APP_CLOSE_CONNECTION;
- if (!notifyServer()) {
+ if (!notifyIOThread()) {
throw TException("TConnection::forceClose: failed write on notify pipe");
}
}
/// return the server this connection was initialized for.
- TNonblockingServer* getServer() {
+ TNonblockingServer* getServer() const {
return server_;
}
/// get state of connection.
- TAppState getState() {
+ TAppState getState() const {
return appState_;
}
@@ -362,19 +348,20 @@
}
}
} catch (const TTransportException& ttx) {
- GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
+ GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
} catch (const bad_alloc&) {
- GlobalOutput("TNonblockingServer caught bad_alloc exception.");
+ GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
exit(-1);
} catch (const std::exception& x) {
- GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
+ GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
typeid(x).name(), x.what());
} catch (...) {
- GlobalOutput("TNonblockingServer uncaught exception.");
+ GlobalOutput.printf(
+ "TNonblockingServer: unknown exception while processing.");
}
// Signal completion back to the libevent thread via a pipe
- if (!connection_->notifyServer()) {
+ if (!connection_->notifyIOThread()) {
throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
@@ -392,14 +379,15 @@
void* connectionContext_;
};
-void TNonblockingServer::TConnection::init(int socket, short eventFlags,
- TNonblockingServer* s,
+void TNonblockingServer::TConnection::init(int socket,
+ TNonblockingIOThread* ioThread,
const sockaddr* addr,
socklen_t addrLen) {
tSocket_->setSocketFD(socket);
tSocket_->setCachedAddress(addr, addrLen);
- server_ = s;
+ ioThread_ = ioThread;
+ server_ = ioThread->getServer();
appState_ = APP_INIT;
eventFlags_ = 0;
@@ -412,30 +400,31 @@
largestWriteBufferSize_ = 0;
socketState_ = SOCKET_RECV_FRAMING;
- appState_ = APP_INIT;
callsForResize_ = 0;
- // Set flags, which also registers the event
- setFlags(eventFlags);
-
// get input/transports
- factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
- factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
+ factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+ inputTransport_);
+ factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+ outputTransport_);
// Create protocol
- inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
- outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+ factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+ factoryOutputTransport_);
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
if (serverEventHandler_ != NULL) {
- connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
+ connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+ outputProtocol_);
} else {
connectionContext_ = NULL;
}
// Get the processor
- processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+ processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
}
void TNonblockingServer::TConnection::workSocket() {
@@ -500,7 +489,7 @@
return;
}
-
+
if (got > 0) {
// Move along in the buffer
readBufferPos_ += got;
@@ -565,6 +554,9 @@
* to, or finished receiving the data that it needed to.
*/
void TNonblockingServer::TConnection::transition() {
+ // ensure this connection is active right now
+ assert(ioThread_);
+ assert(server_);
// Switch upon the state that we are currently in and move to a new state
switch (appState_) {
@@ -800,7 +792,7 @@
*/
event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
TConnection::eventHandler, this);
- event_base_set(server_->getEventBase(), &event_);
+ event_base_set(ioThread_->getEventBase(), &event_);
// Add the event
if (event_add(&event_, 0) == -1) {
@@ -820,6 +812,7 @@
if (serverEventHandler_ != NULL) {
serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
}
+ ioThread_ = NULL;
// Close the socket
tSocket_->close();
@@ -861,14 +854,6 @@
connectionStack_.pop();
delete connection;
}
-
- if (eventBase_ && ownEventBase_) {
- event_base_free(eventBase_);
- }
-
- if (serverSocket_ >= 0) {
- close(serverSocket_);
- }
}
/**
@@ -876,27 +861,41 @@
* by allocating a new one entirely
*/
TNonblockingServer::TConnection* TNonblockingServer::createConnection(
- int socket, short flags,
- const sockaddr* addr,
- socklen_t addrLen) {
+ int socket, const sockaddr* addr, socklen_t addrLen) {
// Check the stack
+ Guard g(connMutex_);
+
+ // pick an IO thread to handle this connection -- currently round robin
+ assert(nextIOThread_ >= 0);
+ assert(nextIOThread_ < ioThreads_.size());
+ int selectedThreadIdx = nextIOThread_;
+ nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+ TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+ // Check the connection stack to see if we can re-use
+ TConnection* result = NULL;
if (connectionStack_.empty()) {
- return new TConnection(socket, flags, this, addr, addrLen);
+ result = new TConnection(socket, ioThread, addr, addrLen);
+ ++numTConnections_;
} else {
- TConnection* result = connectionStack_.top();
+ result = connectionStack_.top();
connectionStack_.pop();
- result->init(socket, flags, this, addr, addrLen);
- return result;
+ result->init(socket, ioThread, addr, addrLen);
}
+ return result;
}
/**
* Returns a connection to the stack
*/
void TNonblockingServer::returnConnection(TConnection* connection) {
+ Guard g(connMutex_);
+
if (connectionStackLimit_ &&
(connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
+ --numTConnections_;
} else {
connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
connectionStack_.push(connection);
@@ -927,6 +926,7 @@
while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
// If we're overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+ Guard g(connMutex_);
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
@@ -940,6 +940,7 @@
}
}
}
+
// Explicitly set this socket to NONBLOCK mode
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
@@ -951,7 +952,7 @@
// Create a new TConnection for this client socket.
TConnection* clientConnection =
- createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
+ createConnection(clientSocket, addrp, addrLen);
// Fail fast if we could not create a TConnection object
if (clientConnection == NULL) {
@@ -960,13 +961,29 @@
return;
}
- // Put this client connection into the proper state
- clientConnection->transition();
+ /*
+ * Either notify the ioThread that is assigned this connection to
+ * start processing, or if it is us, we'll just ask this
+ * connection to do its initial state change here.
+ *
+ * (We need to avoid writing to our own notification pipe, to
+ * avoid possible deadlocks if the pipe is full.)
+ *
+ * The IO thread #0 is the only one that handles these listen
+ * events, so unless the connection has been assigned to thread #0
+ * we know it's not on our thread.
+ */
+ if (clientConnection->getIOThreadNumber() == 0) {
+ clientConnection->transition();
+ } else {
+ clientConnection->notifyIOThread();
+ }
// addrLen is written by the accept() call, so needs to be set before the next call.
addrLen = sizeof(addrStorage);
}
+
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (errno != EAGAIN && errno != EWOULDBLOCK) {
@@ -977,8 +994,9 @@
/**
* Creates a socket to listen on and binds it to the local port.
*/
-void TNonblockingServer::listenSocket() {
+void TNonblockingServer::createAndListenOnSocket() {
int s;
+
struct addrinfo hints, *res, *res0;
int error;
@@ -1082,63 +1100,6 @@
serverSocket_ = s;
}
-void TNonblockingServer::createNotificationPipe() {
- if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
- GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
- throw TException("can't create notification pipe");
- }
- if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
- evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
- close(notificationPipeFDs_[0]);
- close(notificationPipeFDs_[1]);
- throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
- }
-}
-
-/**
- * Register the core libevent events onto the proper base.
- */
-void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
- assert(serverSocket_ != -1);
- assert(!eventBase_);
- eventBase_ = base;
- ownEventBase_ = ownEventBase;
-
- // Print some libevent stats
- GlobalOutput.printf("libevent %s method %s",
- event_get_version(),
- event_base_get_method(eventBase_));
-
- // Register the server event
- event_set(&serverEvent_,
- serverSocket_,
- EV_READ | EV_PERSIST,
- TNonblockingServer::eventHandler,
- this);
- event_base_set(eventBase_, &serverEvent_);
-
- // Add the event and start up the server
- if (-1 == event_add(&serverEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): coult not event_add");
- }
- if (threadPoolProcessing_) {
- // Create an event to be notified when a task finishes
- event_set(¬ificationEvent_,
- getNotificationRecvFD(),
- EV_READ | EV_PERSIST,
- TConnection::taskHandler,
- this);
-
- // Attach to the base
- event_base_set(eventBase_, ¬ificationEvent_);
-
- // Add the event and start up the server
- if (-1 == event_add(¬ificationEvent_, 0)) {
- throw TException("TNonblockingServer::serve(): notification event_add fail");
- }
- }
-}
-
void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
threadManager_ = threadManager;
if (threadManager != NULL) {
@@ -1154,14 +1115,15 @@
if (numActiveProcessors_ > maxActiveProcessors_ ||
activeConnections > maxConnections_) {
if (!overloaded_) {
- GlobalOutput.printf("thrift non-blocking server overload condition");
+ GlobalOutput.printf("TNonblockingServer: overload condition begun.");
overloaded_ = true;
}
} else {
if (overloaded_ &&
(numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
(activeConnections <= overloadHysteresis_ * maxConnections_)) {
- GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+ GlobalOutput.printf("TNonblockingServer: overload ended; "
+ "%u dropped (%llu total)",
nConnectionsDropped_, nTotalConnectionsDropped_);
nConnectionsDropped_ = 0;
overloaded_ = false;
@@ -1189,73 +1151,361 @@
void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
TConnection* connection =
static_cast<TConnection::Task*>(task.get())->getTConnection();
- assert(connection && connection->getServer()
- && connection->getState() == APP_WAIT_TASK);
+ assert(connection && connection->getServer() &&
+ connection->getState() == APP_WAIT_TASK);
connection->forceClose();
}
+void TNonblockingServer::stop() {
+ // Breaks the event loop in all threads so that they end ASAP.
+ for (int i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->stop();
+ }
+}
+
/**
* Main workhorse function, starts up the server listening on a port and
* loops over the libevent handler.
*/
void TNonblockingServer::serve() {
- // Init socket
- listenSocket();
+ // init listen socket
+ createAndListenOnSocket();
- if (threadPoolProcessing_) {
- // Init task completion notification pipe
- createNotificationPipe();
+ // set up the IO threads
+ assert(ioThreads_.empty());
+ if (!numIOThreads_) {
+ numIOThreads_ = DEFAULT_IO_THREADS;
}
- // Initialize libevent core
- registerEvents(static_cast<event_base*>(event_base_new()), true);
+ for (int id = 0; id < numIOThreads_; ++id) {
+ // the first IO thread also does the listening on server socket
+ int listenFd = (id == 0 ? serverSocket_ : -1);
- // Run the preServe event
+ shared_ptr<TNonblockingIOThread> thread(
+ new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+ ioThreads_.push_back(thread);
+ }
+
+ // Notify handler of the preServe event
if (eventHandler_ != NULL) {
eventHandler_->preServe();
}
- // Run libevent engine, invokes calls to eventHandler
- // Only returns if stop() is called.
- event_base_loop(eventBase_, 0);
+ // Start all of our helper IO threads. Note that the threads run forever,
+ // only terminating if stop() is called.
+ assert(ioThreads_.size() == numIOThreads_);
+ assert(ioThreads_.size() > 0);
+
+ GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+ port_, ioThreads_.size());
+
+ // Launch all the secondary IO threads in separate threads
+ if (ioThreads_.size() > 1) {
+ ioThreadFactory_.reset(new PosixThreadFactory(
+ PosixThreadFactory::OTHER, // scheduler
+ PosixThreadFactory::NORMAL, // priority
+ 1, // stack size (MB)
+ false // detached
+ ));
+
+ assert(ioThreadFactory_.get());
+
+ // intentionally starting at thread 1, not 0
+ for (int i = 1; i < ioThreads_.size(); ++i) {
+ shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+ ioThreads_[i]->setThread(thread);
+ thread->start();
+ }
+ }
+
+ // Run the primary (listener) IO thread loop in our main thread; this will
+ // only return when the server is shutting down.
+ ioThreads_[0]->run();
+
+ // Ensure all threads are finished before exiting serve()
+ for (int i = 0; i < ioThreads_.size(); ++i) {
+ ioThreads_[i]->join();
+ GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+ }
}
-void TNonblockingServer::stop() {
- if (!eventBase_) {
- return;
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+ int number,
+ int listenSocket,
+ bool useHighPriority)
+ : server_(server)
+ , number_(number)
+ , listenSocket_(listenSocket)
+ , useHighPriority_(useHighPriority)
+ , eventBase_(NULL) {
+ notificationPipeFDs_[0] = -1;
+ notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+ // make sure our associated thread is fully finished
+ join();
+
+ if (eventBase_) {
+ event_base_free(eventBase_);
}
- // Call event_base_loopbreak() to tell libevent to exit the loop
- //
- // (The libevent documentation doesn't explicitly state that this function is
- // safe to call from another thread. However, all it does is set a variable,
- // in the event_base, so it should be fine.)
+ if (listenSocket_ >= 0) {
+ if (0 != close(listenSocket_)) {
+ GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+ errno);
+ }
+ listenSocket_ = TNonblockingServer::INVALID_SOCKET;
+ }
+
+ for (int i = 0; i < 2; ++i) {
+ if (notificationPipeFDs_[i] >= 0) {
+ if (0 != ::close(notificationPipeFDs_[i])) {
+ GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+ errno);
+ }
+ notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
+ }
+ }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+ if (pipe(notificationPipeFDs_) != 0) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+ throw TException("can't create notification pipe");
+ }
+ int flags;
+ if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
+ fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+ close(notificationPipeFDs_[0]);
+ close(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
+ }
+ for (int i = 0; i < 2; ++i) {
+ if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+ fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+ close(notificationPipeFDs_[0]);
+ close(notificationPipeFDs_[1]);
+ throw TException("TNonblockingServer::createNotificationPipe() "
+ "FD_CLOEXEC");
+ }
+ }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+ if (listenSocket_ >= 0) {
+ // Register the server event
+ event_set(&serverEvent_,
+ listenSocket_,
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::listenHandler,
+ server_);
+ event_base_set(eventBase_, &serverEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(&serverEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on server listen event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+ number_);
+ }
+
+ createNotificationPipe();
+
+ // Create an event to be notified when a task finishes
+ event_set(¬ificationEvent_,
+ getNotificationRecvFD(),
+ EV_READ | EV_PERSIST,
+ TNonblockingIOThread::notifyHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(eventBase_, ¬ificationEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(¬ificationEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): "
+ "event_add() failed on task-done notification event");
+ }
+ GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+ number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+ int fd = getNotificationSendFD();
+ if (fd < 0) {
+ return false;
+ }
+
+ const int kSize = sizeof(conn);
+ if (write(fd, &conn, kSize) != kSize) {
+ return false;
+ }
+
+ return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
+ TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+ assert(ioThread);
+
+ while (true) {
+ TNonblockingServer::TConnection* connection = 0;
+ const int kSize = sizeof(connection);
+ int nBytes = read(fd, &connection, kSize);
+ if (nBytes == kSize) {
+ if (connection == NULL) {
+ // this is the command to stop our thread, exit the handler!
+ return;
+ }
+ connection->transition();
+ } else if (nBytes > 0) {
+ // throw away these bytes and hope that next time we get a solid read
+ GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+ nBytes, kSize);
+ ioThread->breakLoop(true);
+ return;
+ } else if (nBytes == 0) {
+ GlobalOutput.printf("notifyHandler: Notify socket closed!");
+ // exit the loop
+ break;
+ } else { // nBytes < 0
+ if (errno != EWOULDBLOCK && errno != EAGAIN) {
+ GlobalOutput.perror(
+ "TNonblocking: notifyHandler read() failed: ", errno);
+ ioThread->breakLoop(true);
+ return;
+ }
+ // exit the loop
+ break;
+ }
+ }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+ if (error) {
+ GlobalOutput.printf(
+ "TNonblockingServer: IO thread #%d exiting with error.", number_);
+ // TODO: figure out something better to do here, but for now kill the
+ // whole process.
+ GlobalOutput.printf("TNonblockingServer: aborting process.");
+ ::abort();
+ }
+
+ // sets a flag so that the loop exits on the next event
event_base_loopbreak(eventBase_);
- // event_base_loopbreak() only causes the loop to exit the next time it wakes
- // up. We need to force it to wake up, in case there are no real events
- // it needs to process.
+ // event_base_loopbreak() only causes the loop to exit the next time
+ // it wakes up. We need to force it to wake up, in case there are
+ // no real events it needs to process.
//
- // Attempt to connect to the server socket. If anything fails,
- // we'll just have to wait until libevent wakes up on its own.
- //
- // First create a socket
- int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (fd < 0) {
- return;
+ // If we're running in the same thread, we can't use the notify(0)
+ // mechanism to stop the thread, but happily if we're running in the
+ // same thread, this means the thread can't be blocking in the event
+ // loop either.
+ if (!pthread_equal(pthread_self(), threadId_)) {
+ notify(NULL);
+ }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+ // Start out with a standard, low-priority setup for the sched params.
+ struct sched_param sp;
+ bzero((void*) &sp, sizeof(sp));
+ int policy = SCHED_OTHER;
+
+ // If desired, set up high-priority sched params structure.
+ if (value) {
+ // FIFO scheduler, ranked above default SCHED_OTHER queue
+ policy = SCHED_FIFO;
+ // The priority only compares us to other SCHED_FIFO threads, so we
+ // just pick a random priority halfway between min & max.
+ const int priority = (sched_get_priority_max(policy) +
+ sched_get_priority_min(policy)) / 2;
+
+ sp.sched_priority = priority;
}
- // Set up the address
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
- addr.sin_port = htons(port_);
+ // Actually set the sched params for the current thread.
+ if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+ GlobalOutput.printf(
+ "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+ } else {
+ GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
+ }
+}
- // Finally do the connect().
- // We don't care about the return value;
- // we're just going to close the socket either way.
- connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
- close(fd);
+void TNonblockingIOThread::run() {
+ threadId_ = pthread_self();
+
+ assert(eventBase_ == 0);
+ eventBase_ = event_base_new();
+
+ // Print some libevent stats
+ if (number_ == 0) {
+ GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+ event_get_version(),
+ event_base_get_method(eventBase_));
+ }
+
+
+ registerEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+ number_);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(true);
+ }
+
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
+
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(false);
+ }
+
+ // cleans up our registered events
+ cleanupEvents();
+
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+ number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+ // stop the listen socket, if any
+ if (listenSocket_ >= 0) {
+ if (event_del(&serverEvent_) == -1) {
+ GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
+ }
+ }
+
+ event_del(¬ificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+ // This should cause the thread to fall out of its event loop ASAP.
+ breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+ // If this was a thread created by a factory (not the thread that called
+ // serve()), we join() it to make sure we shut down fully.
+ if (thread_) {
+ try {
+ // Note that it is safe to both join() ourselves twice, as well as join
+ // the current thread as the pthread implementation checks for deadlock.
+ thread_->join();
+ } catch(...) {
+ // swallow everything
+ }
+ }
}
}}} // apache::thrift::server