writeBuffer_ = NULL;
writeBufferSize_ = 0;
writeBufferPos_ = 0;
+ largestWriteBufferSize_ = 0;
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
+ callsForResize_ = 0;
// Set flags, which also registers the event
setFlags(eventFlags);
goto LABEL_APP_INIT;
case APP_SEND_RESULT:
+ // it's now safe to perform buffer size housekeeping.
+ if (writeBufferSize_ > largestWriteBufferSize_) {
+ largestWriteBufferSize_ = writeBufferSize_;
+ }
+ if (server_->getResizeBufferEveryN() > 0
+ && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+ checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+ server_->getIdleWriteBufferLimit());
+ callsForResize_ = 0;
+ }
// N.B.: We also intentionally fall through here into the INIT state!
server_->returnConnection(this);
}
-void TConnection::checkIdleBufferMemLimit(size_t limit) {
- if (readBufferSize_ > limit) {
- readBufferSize_ = limit;
+void TConnection::checkIdleBufferMemLimit(size_t readLimit,
+ size_t writeLimit) {
+ if (readLimit > 0 && readBufferSize_ > readLimit) {
+ readBufferSize_ = readLimit;
readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
if (readBuffer_ == NULL) {
GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
close();
}
}
+
+ if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+ // just start over
+ outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+ largestWriteBufferSize_ = 0;
+ }
}
TNonblockingServer::~TNonblockingServer() {
(connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
} else {
- connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+ connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
connectionStack_.push(connection);
}
}
/// Default limit on size of idle connection pool
static const size_t CONNECTION_STACK_LIMIT = 1024;
- /// Maximum size of buffer allocated to idle connection
- static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
-
/// Default limit on total number of connected sockets
static const int MAX_CONNECTIONS = INT_MAX;
/// Default limit on connections in handler/task processing
static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+ /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
+ static const int IDLE_READ_BUFFER_LIMIT = 1024;
+
+ /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
+ static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
+
+ /// # of calls before resizing oversized buffers (0 = check only on close)
+ static const int RESIZE_BUFFER_EVERY_N = 512;
+
/// Server socket file descriptor
int serverSocket_;
TOverloadAction overloadAction_;
/**
- * Max read buffer size for an idle connection. When we place an idle
- * TConnection into connectionStack_, we insure that its read buffer is
- * reduced to this size to insure that idle connections don't hog memory.
+ * Max read buffer size for an idle TConnection. When we place an idle
+ * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we insure that its read buffer is reduced to this size to insure that
+ * idle connections don't hog memory. 0 disables this check.
+ */
+ size_t idleReadBufferLimit_;
+
+ /**
+ * Max write buffer size for an idle connection. When we place an idle
+ * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we insure that its write buffer is <= to this size; otherwise we
+ * replace it with a new one to insure that idle connections don't hog
+ * memory. 0 disables this check.
*/
- size_t idleBufferMemLimit_;
+ size_t idleWriteBufferLimit_;
+
+ /**
+ * Every N calls we check the buffer size limits on a connected TConnection.
+ * 0 disables (i.e. the checks are only done when a connection closes).
+ */
+ int32_t resizeBufferEveryN_;
/// Set if we are currently in an overloaded state.
bool overloaded_;
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+ idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+ resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {}
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+ idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+ resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+ idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+ resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
bool drainPendingTask();
/**
- * Get the maximum limit of memory allocated to idle TConnection objects.
+ * Get the maximum size of read buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will shrink buffers when idle.
+ */
+ size_t getIdleReadBufferLimit() const {
+ return idleReadBufferLimit_;
+ }
+
+ /**
+ * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
+ * Get the maximum size of read buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will shrink buffers when idle.
*/
size_t getIdleBufferMemLimit() const {
- return idleBufferMemLimit_;
+ return idleReadBufferLimit_;
}
/**
- * Set the maximum limit of memory allocated to idle TConnection objects.
- * If a TConnection object goes idle with more than this much memory
- * allocated to its buffer, we shrink it to this value.
+ * Set the maximum size read buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its read buffer, we shrink it to this value.
*
- * @param limit of bytes beyond which we will shrink buffers when idle.
+ * @param limit of bytes beyond which we will shrink buffers when checked.
+ */
+ void setIdleReadBufferLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
+
+ /**
+ * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
+ * Set the maximum size read buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its read buffer, we shrink it to this value.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when checked.
*/
void setIdleBufferMemLimit(size_t limit) {
- idleBufferMemLimit_ = limit;
+ idleReadBufferLimit_ = limit;
}
+
+
+ /**
+ * Get the maximum size of write buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will reallocate buffers when checked.
+ */
+ size_t getIdleWriteBufferLimit() const {
+ return idleWriteBufferLimit_;
+ }
+
+ /**
+ * Set the maximum size write buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its write buffer, we destroy and construct that buffer.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when idle.
+ */
+ void setIdleWriteBufferLimit(size_t limit) {
+ idleWriteBufferLimit_ = limit;
+ }
+
+ /**
+ * Get # of calls made between buffer size checks. 0 means disabled.
+ *
+ * @return # of calls between buffer size checks.
+ */
+ int32_t getResizeBufferEveryN() const {
+ return resizeBufferEveryN_;
+ }
+
+ /**
+ * Check buffer sizes every "count" calls. This allows buffer limits
+ * to be enforced for persistant connections with a controllable degree
+ * of overhead. 0 disables checks except at connection close.
+ *
+ * @param count the number of calls between checks, or 0 to disable
+ */
+ void setResizeBufferEveryN(int32_t count) {
+ resizeBufferEveryN_ = count;
+ }
+
+
+
/**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
* Represents a connection that is handled via libevent. This connection
* essentially encapsulates a socket that has some associated libevent state.
*/
- class TConnection {
+class TConnection {
private:
/// Starting size for new connection buffer
/// How far through writing are we?
uint32_t writeBufferPos_;
+ /// Largest size of write buffer seen since buffer was constructed
+ size_t largestWriteBufferSize_;
+
+ /// Count of the number of calls for use with getResizeBufferEveryN().
+ int32_t callsForResize_;
+
/// Task handle
int taskHandle_;
server_->decrementNumConnections();
}
- /**
- * Check read buffer against a given limit and shrink it if exceeded.
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
*
- * @param limit we limit buffer size to.
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
*/
- void checkIdleBufferMemLimit(size_t limit);
+ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s,