server_->returnConnection(this);
}
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+ if (readBufferSize_ > limit) {
+ readBufferSize_ = limit;
+ readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+ close();
+ }
+ }
+}
+
/**
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
* Returns a connection to the stack
*/
void TNonblockingServer::returnConnection(TConnection* connection) {
- connectionStack_.push(connection);
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
+ delete connection;
+ } else {
+ connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+ connectionStack_.push(connection);
+ }
}
/**
// Listen backlog
static const int LISTEN_BACKLOG = 1024;
+ // Default limit on size of idle connection pool
+ static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+ // Maximum size of buffer allocated to idle connection
+ static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
+
// Server socket file descriptor
int serverSocket_;
// Number of TConnection object we've created
size_t numTConnections_;
+ // Limit for how many TConnection objects to cache
+ size_t connectionStackLimit_;
+
+ /**
+ * Max read buffer size for an idle connection. When we place an idle
+ * TConnection into connectionStack_, we insure that its read buffer is
+ * reduced to this size to insure that idle connections don't hog memory.
+ */
+ uint32_t idleBufferMemLimit_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
port_(port),
threadPoolProcessing_(false),
eventBase_(NULL),
- numTConnections_(0) {}
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
- numTConnections_(0) {
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
- numTConnections_(0) {
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
return threadManager_;
}
+ /**
+ * Get the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @return the current limit on TConnection pool size.
+ */
+ int getConnectionStackLimit() const {
+ return connectionStackLimit_;
+ }
+
+ /**
+ * Set the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @param sz the new limit for TConnection pool size.
+ */
+ void setConnectionStackLimit(int sz) {
+ connectionStackLimit_ = sz;
+ }
+
bool isThreadPoolProcessing() const {
return threadPoolProcessing_;
}
return connectionStack_.size();
}
+ /**
+ * Get the maximum limit of memory allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will shrink buffers when idle.
+ */
+ size_t getIdleBufferMemLimit() const {
+ return idleBufferMemLimit_;
+ }
+
+ /**
+ * Set the maximum limit of memory allocated to idle TConnection objects.
+ * If a TConnection object goes idle with more than this much memory
+ * allocated to its buffer, we shrink it to this value.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when idle.
+ */
+ void setIdleBufferMemLimit(size_t limit) {
+ idleBufferMemLimit_ = limit;
+ }
+
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
server_->decrementNumConnections();
}
+ /**
+ * Check read buffer against a given limit and shrink it if exceeded.
+ *
+ * @param limit we limit buffer size to.
+ */
+ void checkIdleBufferMemLimit(uint32_t limit);
+
// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s);