From: Kevin Clark Date: Thu, 19 Mar 2009 03:50:05 +0000 (+0000) Subject: Thrift-357. cpp: Fix buffer and connection bloat in TNonBlockingServer X-Git-Tag: 0.2.0~257 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=cbcd63ab4984336ecebae485bd8b3186a0444d39;p=common%2Fthrift.git Thrift-357. cpp: Fix buffer and connection bloat in TNonBlockingServer Author: Anthony Giardullo git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@755824 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index cfaf3be9..583877b7 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -495,6 +495,17 @@ void TConnection::close() { server_->returnConnection(this); } +void TConnection::checkIdleBufferMemLimit(uint32_t limit) { + if (readBufferSize_ > limit) { + readBufferSize_ = limit; + readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_); + if (readBuffer_ == NULL) { + GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc"); + close(); + } + } +} + /** * Creates a new connection either by reusing an object off the stack or * by allocating a new one entirely @@ -515,7 +526,13 @@ TConnection* TNonblockingServer::createConnection(int socket, short flags) { * Returns a connection to the stack */ void TNonblockingServer::returnConnection(TConnection* connection) { - connectionStack_.push(connection); + if (connectionStackLimit_ && + (connectionStack_.size() >= connectionStackLimit_)) { + delete connection; + } else { + connection->checkIdleBufferMemLimit(idleBufferMemLimit_); + connectionStack_.push(connection); + } } /** diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 40ec5741..a47a2c00 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -43,6 +43,12 @@ class TNonblockingServer : public TServer { // Listen backlog static const int LISTEN_BACKLOG = 1024; + // Default limit on size of idle connection pool + static const size_t CONNECTION_STACK_LIMIT = 1024; + + // Maximum size of buffer allocated to idle connection + static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192; + // Server socket file descriptor int serverSocket_; @@ -64,6 +70,16 @@ class TNonblockingServer : public TServer { // Number of TConnection object we've created size_t numTConnections_; + // Limit for how many TConnection objects to cache + size_t connectionStackLimit_; + + /** + * Max read buffer size for an idle connection. When we place an idle + * TConnection into connectionStack_, we insure that its read buffer is + * reduced to this size to insure that idle connections don't hog memory. + */ + uint32_t idleBufferMemLimit_; + /** * This is a stack of all the objects that have been created but that * are NOT currently in use. When we close a connection, we place it on this @@ -82,7 +98,9 @@ class TNonblockingServer : public TServer { port_(port), threadPoolProcessing_(false), eventBase_(NULL), - numTConnections_(0) {} + numTConnections_(0), + connectionStackLimit_(CONNECTION_STACK_LIMIT), + idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {} TNonblockingServer(boost::shared_ptr processor, boost::shared_ptr protocolFactory, @@ -93,7 +111,9 @@ class TNonblockingServer : public TServer { port_(port), threadManager_(threadManager), eventBase_(NULL), - numTConnections_(0) { + numTConnections_(0), + connectionStackLimit_(CONNECTION_STACK_LIMIT), + idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); setOutputTransportFactory(boost::shared_ptr(new TTransportFactory())); setInputProtocolFactory(protocolFactory); @@ -113,7 +133,9 @@ class TNonblockingServer : public TServer { port_(port), threadManager_(threadManager), eventBase_(NULL), - numTConnections_(0) { + numTConnections_(0), + connectionStackLimit_(CONNECTION_STACK_LIMIT), + idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) { setInputTransportFactory(inputTransportFactory); setOutputTransportFactory(outputTransportFactory); setInputProtocolFactory(inputProtocolFactory); @@ -132,6 +154,24 @@ class TNonblockingServer : public TServer { return threadManager_; } + /** + * Get the maximum number of unused TConnection we will hold in reserve. + * + * @return the current limit on TConnection pool size. + */ + int getConnectionStackLimit() const { + return connectionStackLimit_; + } + + /** + * Set the maximum number of unused TConnection we will hold in reserve. + * + * @param sz the new limit for TConnection pool size. + */ + void setConnectionStackLimit(int sz) { + connectionStackLimit_ = sz; + } + bool isThreadPoolProcessing() const { return threadPoolProcessing_; } @@ -160,6 +200,26 @@ class TNonblockingServer : public TServer { return connectionStack_.size(); } + /** + * Get the maximum limit of memory allocated to idle TConnection objects. + * + * @return # bytes beyond which we will shrink buffers when idle. + */ + size_t getIdleBufferMemLimit() const { + return idleBufferMemLimit_; + } + + /** + * Set the maximum limit of memory allocated to idle TConnection objects. + * If a TConnection object goes idle with more than this much memory + * allocated to its buffer, we shrink it to this value. + * + * @param limit of bytes beyond which we will shrink buffers when idle. + */ + void setIdleBufferMemLimit(size_t limit) { + idleBufferMemLimit_ = limit; + } + TConnection* createConnection(int socket, short flags); void returnConnection(TConnection* connection); @@ -327,6 +387,13 @@ class TConnection { server_->decrementNumConnections(); } + /** + * Check read buffer against a given limit and shrink it if exceeded. + * + * @param limit we limit buffer size to. + */ + void checkIdleBufferMemLimit(uint32_t limit); + // Initialize void init(int socket, short eventFlags, TNonblockingServer *s);