Task(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocol> input,
boost::shared_ptr<TProtocol> output,
- int taskHandle) :
+ TConnection* connection) :
processor_(processor),
input_(input),
output_(output),
- taskHandle_(taskHandle) {}
+ connection_(connection) {}
void run() {
try {
cerr << "TNonblockingServer uncaught exception." << endl;
}
- // Signal completion back to the libevent thread via a socketpair
- int8_t b = 0;
- if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
- GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
- }
- if (-1 == ::close(taskHandle_)) {
- GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
+ // Signal completion back to the libevent thread via a pipe
+ if (!connection_->notifyServer()) {
+ throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
}
}
+ TConnection* getTConnection() {
+ return connection_;
+ }
+
private:
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<TProtocol> input_;
boost::shared_ptr<TProtocol> output_;
- int taskHandle_;
+ TConnection* connection_;
};
void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
- taskHandle_ = -1;
-
// Set flags, which also registers the event
setFlags(eventFlags);
outputTransport_->getWritePtr(4);
outputTransport_->wroteBytes(4);
+ server_->incrementActiveProcessors();
+
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
- int sv[2];
- if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- GlobalOutput.perror("TConnection::socketpair() failed ", errno);
- // Now we will fall through to the APP_WAIT_TASK block with no response
- } else {
- // Create task and dispatch to the thread manager
- boost::shared_ptr<Runnable> task =
- boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
- inputProtocol_,
- outputProtocol_,
- sv[1]));
- // The application is now waiting on the task to finish
- appState_ = APP_WAIT_TASK;
-
- // Create an event to be notified when the task finishes
- event_set(&taskEvent_,
- taskHandle_ = sv[0],
- EV_READ,
- TConnection::taskHandler,
- this);
-
- // Attach to the base
- event_base_set(server_->getEventBase(), &taskEvent_);
-
- // Add the event and start up the server
- if (-1 == event_add(&taskEvent_, 0)) {
- GlobalOutput("TNonblockingServer::serve(): coult not event_add");
- return;
- }
+
+ // Create task and dispatch to the thread manager
+ boost::shared_ptr<Runnable> task =
+ boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+ inputProtocol_,
+ outputProtocol_,
+ this));
+ // The application is now waiting on the task to finish
+ appState_ = APP_WAIT_TASK;
+
try {
server_->addTask(task);
} catch (IllegalStateException & ise) {
close();
}
- // Set this connection idle so that libevent doesn't process more
- // data on it while we're still waiting for the threadmanager to
- // finish this task
- setIdle();
- return;
- }
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
+ return;
} else {
try {
// Invoke the processor
server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &ttx) {
GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+ server_->decrementActiveProcessors();
close();
return;
} catch (TException &x) {
GlobalOutput.printf("TException: Server::process() %s", x.what());
+ server_->decrementActiveProcessors();
close();
return;
} catch (...) {
GlobalOutput.printf("Server::process() unknown exception");
+ server_->decrementActiveProcessors();
close();
return;
}
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
+ server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
return;
+ case APP_CLOSE_CONNECTION:
+ server_->decrementActiveProcessors();
+ close();
+ return;
+
default:
GlobalOutput.printf("Unexpected Application State %d", appState_);
assert(0);
return;
}
- /**
+ /*
* event_set:
*
* Prepares the event structure &event to be used in future calls to
}
// Close the socket
- if (socket_ > 0) {
+ if (socket_ >= 0) {
::close(socket_);
}
- socket_ = 0;
+ socket_ = -1;
// close any factory produced transports
factoryInputTransport_->close();
server_->returnConnection(this);
}
-void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+void TConnection::checkIdleBufferMemLimit(size_t limit) {
if (readBufferSize_ > limit) {
readBufferSize_ = limit;
readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
// one, this helps us to avoid having to go back into the libevent engine so
// many times
while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
-
+ // If we're overloaded, take action here
+ if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+ nConnectionsDropped_++;
+ nTotalConnectionsDropped_++;
+ if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+ close(clientSocket);
+ continue;
+ } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+ if (!drainPendingTask()) {
+ // Nothing left to discard, so we drop connection instead.
+ close(clientSocket);
+ continue;
+ }
+ }
+ }
// Explicitly set this socket to NONBLOCK mode
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
serverSocket_ = s;
}
+void TNonblockingServer::createNotificationPipe() {
+ if (pipe(notificationPipeFDs_) != 0) {
+ GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+ throw TException("can't create notification pipe");
+ }
+}
+
/**
* Register the core libevent events onto the proper base.
*/
if (-1 == event_add(&serverEvent_, 0)) {
throw TException("TNonblockingServer::serve(): coult not event_add");
}
+ if (threadPoolProcessing_) {
+ // Create an event to be notified when a task finishes
+ event_set(¬ificationEvent_,
+ getNotificationRecvFD(),
+ EV_READ | EV_PERSIST,
+ TConnection::taskHandler,
+ this);
+
+ // Attach to the base
+ event_base_set(eventBase_, ¬ificationEvent_);
+
+ // Add the event and start up the server
+ if (-1 == event_add(¬ificationEvent_, 0)) {
+ throw TException("TNonblockingServer::serve(): notification event_add fail");
+ }
+ }
+}
+
+bool TNonblockingServer::serverOverloaded() {
+ size_t activeConnections = numTConnections_ - connectionStack_.size();
+ if (numActiveProcessors_ > maxActiveProcessors_ ||
+ activeConnections > maxConnections_) {
+ if (!overloaded_) {
+ GlobalOutput.printf("thrift non-blocking server overload condition");
+ overloaded_ = true;
+ }
+ } else {
+ if (overloaded_ &&
+ (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+ (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+ GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+ nConnectionsDropped_, nTotalConnectionsDropped_);
+ nConnectionsDropped_ = 0;
+ overloaded_ = false;
+ }
+ }
+
+ return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+ if (threadManager_) {
+ boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+ if (task) {
+ TConnection* connection =
+ static_cast<TConnection::Task*>(task.get())->getTConnection();
+ assert(connection && connection->getServer()
+ && connection->getState() == APP_WAIT_TASK);
+ connection->forceClose();
+ return true;
+ }
+ }
+ return false;
}
/**
// Init socket
listenSocket();
+ if (threadPoolProcessing_) {
+ // Init task completion notification pipe
+ createNotificationPipe();
+ }
+
// Initialize libevent core
registerEvents(static_cast<event_base*>(event_init()));
#include <server/TServer.h>
#include <transport/TBufferTransports.h>
#include <concurrency/ThreadManager.h>
+#include <climits>
#include <stack>
#include <string>
#include <errno.h>
* operations hardcoded for use with select.
*
*/
+
+
+/// Overload condition actions.
+enum TOverloadAction {
+ T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
+ T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
+ T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
+};
+
class TNonblockingServer : public TServer {
private:
-
- // Listen backlog
+ /// Listen backlog
static const int LISTEN_BACKLOG = 1024;
- // Default limit on size of idle connection pool
+ /// Default limit on size of idle connection pool
static const size_t CONNECTION_STACK_LIMIT = 1024;
- // Maximum size of buffer allocated to idle connection
+ /// Maximum size of buffer allocated to idle connection
static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
- // Server socket file descriptor
+ /// Default limit on total number of connected sockets
+ static const int MAX_CONNECTIONS = INT_MAX;
+
+ /// Default limit on connections in handler/task processing
+ static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+
+ /// Server socket file descriptor
int serverSocket_;
- // Port server runs on
+ /// Port server runs on
int port_;
- // For processing via thread pool, may be NULL
+ /// For processing via thread pool, may be NULL
boost::shared_ptr<ThreadManager> threadManager_;
- // Is thread pool processing?
+ /// Is thread pool processing?
bool threadPoolProcessing_;
- // The event base for libevent
+ /// The event base for libevent
event_base* eventBase_;
- // Event struct, for use with eventBase_
+ /// Event struct, used with eventBase_ for connection events
struct event serverEvent_;
- // Number of TConnection object we've created
+ /// Event struct, used with eventBase_ for task completion notification
+ struct event notificationEvent_;
+
+ /// Number of TConnection object we've created
size_t numTConnections_;
- // Limit for how many TConnection objects to cache
+ /// Number of Connections processing or waiting to process
+ size_t numActiveProcessors_;
+
+ /// Limit for how many TConnection objects to cache
size_t connectionStackLimit_;
+ /// Limit for number of connections processing or waiting to process
+ size_t maxActiveProcessors_;
+
+ /// Limit for number of open connections
+ size_t maxConnections_;
+
+ /**
+ * Hysteresis for overload state. This is the fraction of the overload
+ * value that needs to be reached before the overload state is cleared;
+ * must be <= 1.0.
+ */
+ double overloadHysteresis_;
+
+ /// Action to take when we're overloaded.
+ TOverloadAction overloadAction_;
+
/**
* Max read buffer size for an idle connection. When we place an idle
* TConnection into connectionStack_, we insure that its read buffer is
* reduced to this size to insure that idle connections don't hog memory.
*/
- uint32_t idleBufferMemLimit_;
+ size_t idleBufferMemLimit_;
+
+ /// Set if we are currently in an overloaded state.
+ bool overloaded_;
+
+ /// Count of connections dropped since overload started
+ uint32_t nConnectionsDropped_;
+
+ /// Count of connections dropped on overload since server started
+ uint64_t nTotalConnectionsDropped_;
+
+ /// File descriptors for pipe used for task completion notification.
+ int notificationPipeFDs_[2];
/**
* This is a stack of all the objects that have been created but that
*/
std::stack<TConnection*> connectionStack_;
+ /**
+ * Called when server socket had something happen. We accept all waiting
+ * client connections on listen socket fd and assign TConnection objects
+ * to handle those requests.
+ *
+ * @param fd the listen socket.
+ * @param which the event flag that triggered the handler.
+ */
void handleEvent(int fd, short which);
public:
threadPoolProcessing_(false),
eventBase_(NULL),
numTConnections_(0),
+ numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
+ maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+ maxConnections_(MAX_CONNECTIONS),
+ overloadHysteresis_(0.8),
+ overloadAction_(T_OVERLOAD_NO_ACTION),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ overloaded_(false),
+ nConnectionsDropped_(0),
+ nTotalConnectionsDropped_(0) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
threadManager_(threadManager),
eventBase_(NULL),
numTConnections_(0),
+ numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+ maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+ maxConnections_(MAX_CONNECTIONS),
+ overloadHysteresis_(0.8),
+ overloadAction_(T_OVERLOAD_NO_ACTION),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ overloaded_(false),
+ nConnectionsDropped_(0),
+ nTotalConnectionsDropped_(0) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
int port,
boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
TServer(processor),
- serverSocket_(0),
+ serverSocket_(-1),
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
numTConnections_(0),
+ numActiveProcessors_(0),
connectionStackLimit_(CONNECTION_STACK_LIMIT),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+ maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+ maxConnections_(MAX_CONNECTIONS),
+ overloadHysteresis_(0.8),
+ overloadAction_(T_OVERLOAD_NO_ACTION),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ overloaded_(false),
+ nConnectionsDropped_(0),
+ nTotalConnectionsDropped_(0) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
return eventBase_;
}
+ /// Increment our count of the number of connected sockets.
void incrementNumConnections() {
++numTConnections_;
}
+ /// Decrement our count of the number of connected sockets.
void decrementNumConnections() {
--numTConnections_;
}
- size_t getNumConnections() {
+ /**
+ * Return the count of sockets currently connected to.
+ *
+ * @return count of connected sockets.
+ */
+ size_t getNumConnections() const {
return numTConnections_;
}
- size_t getNumIdleConnections() {
+ /**
+ * Return the count of connection objects allocated but not in use.
+ *
+ * @return count of idle connection objects.
+ */
+ size_t getNumIdleConnections() const {
return connectionStack_.size();
}
+ /**
+ * Return count of number of connections which are currently processing.
+ * This is defined as a connection where all data has been received and
+ * either assigned a task (when threading) or passed to a handler (when
+ * not threading), and where the handler has not yet returned.
+ *
+ * @return # of connections currently processing.
+ */
+ size_t getNumActiveProcessors() const {
+ return numActiveProcessors_;
+ }
+
+ /// Increment the count of connections currently processing.
+ void incrementActiveProcessors() {
+ ++numActiveProcessors_;
+ }
+
+ /// Decrement the count of connections currently processing.
+ void decrementActiveProcessors() {
+ if (numActiveProcessors_ > 0) {
+ --numActiveProcessors_;
+ }
+ }
+
+ /**
+ * Get the maximum # of connections allowed before overload.
+ *
+ * @return current setting.
+ */
+ size_t getMaxConnections() const {
+ return maxConnections_;
+ }
+
+ /**
+ * Set the maximum # of connections allowed before overload.
+ *
+ * @param maxConnections new setting for maximum # of connections.
+ */
+ void setMaxConnections(size_t maxConnections) {
+ maxConnections_ = maxConnections;
+ }
+
+ /**
+ * Get the maximum # of connections waiting in handler/task before overload.
+ *
+ * @return current setting.
+ */
+ size_t getMaxActiveProcessors() const {
+ return maxActiveProcessors_;
+ }
+
+ /**
+ * Set the maximum # of connections waiting in handler/task before overload.
+ *
+ * @param maxActiveProcessors new setting for maximum # of active processes.
+ */
+ void setMaxActiveProcessors(size_t maxActiveProcessors) {
+ maxActiveProcessors_ = maxActiveProcessors;
+ }
+
+ /**
+ * Get fraction of maximum limits before an overload condition is cleared.
+ *
+ * @return hysteresis fraction
+ */
+ double getOverloadHysteresis() const {
+ return overloadHysteresis_;
+ }
+
+ /**
+ * Set fraction of maximum limits before an overload condition is cleared.
+ * A good value would probably be between 0.5 and 0.9.
+ *
+ * @param hysteresisFraction fraction <= 1.0.
+ */
+ void setOverloadHysteresis(double hysteresisFraction) {
+ if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
+ overloadHysteresis_ = hysteresisFraction;
+ }
+ }
+
+ /**
+ * Get the action the server will take on overload.
+ *
+ * @return a TOverloadAction enum value for the currently set action.
+ */
+ TOverloadAction getOverloadAction() const {
+ return overloadAction_;
+ }
+
+ /**
+ * Set the action the server is to take on overload.
+ *
+ * @param overloadAction a TOverloadAction enum value for the action.
+ */
+ void setOverloadAction(TOverloadAction overloadAction) {
+ overloadAction_ = overloadAction;
+ }
+
+ /**
+ * Determine if the server is currently overloaded.
+ * This function checks the maximums for open connections and connections
+ * currently in processing, and sets an overload condition if they are
+ * exceeded. The overload will persist until both values are below the
+ * current hysteresis fraction of their maximums.
+ *
+ * @return true if an overload condition exists, false if not.
+ */
+ bool serverOverloaded();
+
+ /** Pop and discard next task on threadpool wait queue.
+ *
+ * @return true if a task was discarded, false if the wait queue was empty.
+ */
+ bool drainPendingTask();
+
/**
* Get the maximum limit of memory allocated to idle TConnection objects.
*
idleBufferMemLimit_ = limit;
}
+ /**
+ * Return an initialized connection object. Creates or recovers from
+ * pool a TConnection and initializes it with the provided socket FD
+ * and flags.
+ *
+ * @param socket FD of socket associated with this connection.
+ * @param flags initial lib_event flags for this connection.
+ * @return pointer to initialized TConnection object.
+ */
TConnection* createConnection(int socket, short flags);
+ /**
+ * Returns a connection to pool or deletion. If the connection pool
+ * (a stack) isn't full, place the connection object on it, otherwise
+ * just delete it.
+ *
+ * @param connection the TConection being returned.
+ */
void returnConnection(TConnection* connection);
+ /**
+ * C-callable event handler for listener events. Provides a callback
+ * that libevent can understand which invokes server->handleEvent().
+ *
+ * @param fd the descriptor the event occured on.
+ * @param which the flags associated with the event.
+ * @param v void* callback arg where we placed TNonblockingServer's "this".
+ */
static void eventHandler(int fd, short which, void* v) {
((TNonblockingServer*)v)->handleEvent(fd, which);
}
+ /// Creates a socket to listen on and binds it to the local port.
void listenSocket();
+ /**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ *
+ * @param fd descriptor of socket to be initialized/
+ */
void listenSocket(int fd);
+ /// Create the pipe used to notify I/O process of task completion.
+ void createNotificationPipe();
+
+ /**
+ * Get notification pipe send descriptor.
+ *
+ * @return write fd for pipe.
+ */
+ int getNotificationSendFD() const {
+ return notificationPipeFDs_[1];
+ }
+
+ /**
+ * Get notification pipe receive descriptor.
+ *
+ * @return read fd of pipe.
+ */
+ int getNotificationRecvFD() const {
+ return notificationPipeFDs_[0];
+ }
+
+ /**
+ * Register the core libevent events onto the proper base.
+ *
+ * @param base pointer to the event base to be initialized.
+ */
void registerEvents(event_base* base);
+ /**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
void serve();
};
-/**
- * Two states for sockets, recv and send mode
- */
+/// Two states for sockets, recv and send mode
enum TSocketState {
SOCKET_RECV,
SOCKET_SEND
};
/**
- * Four states for the nonblocking servr:
+ * Five states for the nonblocking servr:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
+ * 5) force immediate connection close
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
- APP_SEND_RESULT
+ APP_SEND_RESULT,
+ APP_CLOSE_CONNECTION
};
/**
class TConnection {
private:
- class Task;
+ /// Starting size for new connection buffer
+ static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
- // Server handle
+ /// Server handle
TNonblockingServer* server_;
- // Socket handle
+ /// Socket handle
int socket_;
- // Libevent object
+ /// Libevent object
struct event event_;
- // Libevent flags
+ /// Libevent flags
short eventFlags_;
- // Socket mode
+ /// Socket mode
TSocketState socketState_;
- // Application state
+ /// Application state
TAppState appState_;
- // How much data needed to read
+ /// How much data needed to read
uint32_t readWant_;
- // Where in the read buffer are we
+ /// Where in the read buffer are we
uint32_t readBufferPos_;
- // Read buffer
+ /// Read buffer
uint8_t* readBuffer_;
- // Read buffer size
+ /// Read buffer size
uint32_t readBufferSize_;
- // Write buffer
+ /// Write buffer
uint8_t* writeBuffer_;
- // Write buffer size
+ /// Write buffer size
uint32_t writeBufferSize_;
- // How far through writing are we?
+ /// How far through writing are we?
uint32_t writeBufferPos_;
- // How many times have we read since our last buffer reset?
+ /// How many times have we read since our last buffer reset?
uint32_t numReadsSinceReset_;
- // How many times have we written since our last buffer reset?
+ /// How many times have we written since our last buffer reset?
uint32_t numWritesSinceReset_;
- // Task handle
+ /// Task handle
int taskHandle_;
- // Task event
+ /// Task event
struct event taskEvent_;
- // Transport to read from
+ /// Transport to read from
boost::shared_ptr<TMemoryBuffer> inputTransport_;
- // Transport that processor writes to
+ /// Transport that processor writes to
boost::shared_ptr<TMemoryBuffer> outputTransport_;
- // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
boost::shared_ptr<TTransport> factoryInputTransport_;
boost::shared_ptr<TTransport> factoryOutputTransport_;
- // Protocol decoder
+ /// Protocol decoder
boost::shared_ptr<TProtocol> inputProtocol_;
- // Protocol encoder
+ /// Protocol encoder
boost::shared_ptr<TProtocol> outputProtocol_;
- // Go into read mode
+ /// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
}
- // Go into write mode
+ /// Go into write mode
void setWrite() {
setFlags(EV_WRITE | EV_PERSIST);
}
- // Set socket idle
+ /// Set socket idle
void setIdle() {
setFlags(0);
}
- // Set event flags
+ /**
+ * Set event flags for this connection.
+ *
+ * @param eventFlags flags we pass to libevent for the connection.
+ */
void setFlags(short eventFlags);
- // Libevent handlers
+ /**
+ * Libevent handler called (via our static wrapper) when the connection
+ * socket had something happen. Rather than use the flags libevent passed,
+ * we use the connection state to determine whether we need to read or
+ * write the socket.
+ */
void workSocket();
- // Close this client and reset
+ /// Close this connection and free or reset its resources.
void close();
public:
- // Constructor
+ class Task;
+
+ /// Constructor
TConnection(int socket, short eventFlags, TNonblockingServer *s) {
- readBuffer_ = (uint8_t*)std::malloc(1024);
+ readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
if (readBuffer_ == NULL) {
throw new apache::thrift::TException("Out of memory.");
}
- readBufferSize_ = 1024;
+ readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
numReadsSinceReset_ = 0;
numWritesSinceReset_ = 0;
*
* @param limit we limit buffer size to.
*/
- void checkIdleBufferMemLimit(uint32_t limit);
+ void checkIdleBufferMemLimit(size_t limit);
- // Initialize
+ /// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s);
- // Transition into a new state
+ /**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
void transition();
- // Handler wrapper
+ /**
+ * C-callable event handler for connection events. Provides a callback
+ * that libevent can understand which invokes connection_->workSocket().
+ *
+ * @param fd the descriptor the event occured on.
+ * @param which the flags associated with the event.
+ * @param v void* callback arg where we placed TConnection's "this".
+ */
static void eventHandler(int fd, short /* which */, void* v) {
assert(fd == ((TConnection*)v)->socket_);
((TConnection*)v)->workSocket();
}
- // Handler wrapper for task block
- static void taskHandler(int fd, short /* which */, void* v) {
- assert(fd == ((TConnection*)v)->taskHandle_);
- if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
- GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
+ /**
+ * C-callable event handler for signaling task completion. Provides a
+ * callback that libevent can understand that will read a connection
+ * object's address from a pipe and call connection->transition() for
+ * that object.
+ *
+ * @param fd the descriptor the event occured on.
+ */
+ static void taskHandler(int fd, short /* which */, void* /* v */) {
+ TConnection* connection;
+ if (read(fd, (void*)&connection, sizeof(TConnection*))
+ != sizeof(TConnection*)) {
+ GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
+ return;
+ }
+
+ connection->transition();
+ }
+
+ /**
+ * Notification to server that processing has ended on this request.
+ * Can be called either when processing is completed or when a waiting
+ * task has been preemptively terminated (on overload).
+ *
+ * @return true if successful, false if unable to notify (check errno).
+ */
+ bool notifyServer() {
+ TConnection* connection = this;
+ if (write(server_->getNotificationSendFD(), (const void*)&connection,
+ sizeof(TConnection*)) != sizeof(TConnection*)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /// Force connection shutdown for this connection.
+ void forceClose() {
+ appState_ = APP_CLOSE_CONNECTION;
+ if (!notifyServer()) {
+ throw TException("TConnection::forceClose: failed write on notify pipe");
}
- ((TConnection*)v)->transition();
}
+ /// return the server this connection was initialized for.
+ TNonblockingServer* getServer() {
+ return server_;
+ }
+
+ /// get state of connection.
+ TAppState getState() {
+ return appState_;
+ }
};
}}} // apache::thrift::server