From 5ace1780abe8f62e6e6be789ae53637f3a35b195 Mon Sep 17 00:00:00 2001 From: Kevin Clark Date: Wed, 4 Mar 2009 21:10:58 +0000 Subject: [PATCH] THRIFT-265. cpp: Reset buffers every 512 calls in TNonblockingServer Author: Erik Frey git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@750153 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TNonblockingServer.cpp | 34 ++++++++++++++++++++++- lib/cpp/src/server/TNonblockingServer.h | 9 ++++++ lib/cpp/src/transport/TBufferTransports.h | 18 +++++++++++- 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 86445266..cfaf3be9 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -216,8 +216,24 @@ void TConnection::transition() { case APP_READ_REQUEST: // We are done reading the request, package the read buffer into transport // and get back some data from the dispatch function + // If we've used these transport buffers enough times, reset them to avoid bloating + inputTransport_->resetBuffer(readBuffer_, readBufferPos_); - outputTransport_->resetBuffer(); + ++numReadsSinceReset_; + if (numWritesSinceReset_ < 512) { + outputTransport_->resetBuffer(); + } else { + // reset the capacity of the output transport if we used it enough times that it might be bloated + try { + outputTransport_->resetBuffer(true); + numWritesSinceReset_ = 0; + } catch (TTransportException &ttx) { + GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what()); + close(); + return; + } + } + // Prepend four bytes of blank space to the buffer so we can // write the frame size there later. outputTransport_->getWritePtr(4); @@ -327,11 +343,27 @@ void TConnection::transition() { case APP_SEND_RESULT: + ++numWritesSinceReset_; + // N.B.: We also intentionally fall through here into the INIT state! LABEL_APP_INIT: case APP_INIT: + // reset the input buffer if we used it enough times that it might be bloated + if (numReadsSinceReset_ > 512) + { + void * new_buffer = std::realloc(readBuffer_, 1024); + if (new_buffer == NULL) { + GlobalOutput("TConnection::transition() realloc"); + close(); + return; + } + readBuffer_ = (uint8_t*) new_buffer; + readBufferSize_ = 1024; + numReadsSinceReset_ = 0; + } + // Clear write buffer variables writeBuffer_ = NULL; writeBufferPos_ = 0; diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index cf024ed5..40ec5741 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -248,6 +248,12 @@ class TConnection { // How far through writing are we? uint32_t writeBufferPos_; + // How many times have we read since our last buffer reset? + uint32_t numReadsSinceReset_; + + // How many times have we written since our last buffer reset? + uint32_t numWritesSinceReset_; + // Task handle int taskHandle_; @@ -304,6 +310,9 @@ class TConnection { } readBufferSize_ = 1024; + numReadsSinceReset_ = 0; + numWritesSinceReset_ = 0; + // Allocate input and output tranpsorts // these only need to be allocated once per TConnection (they don't need to be // reallocated on init() call) diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h index f0298df2..9d9510d6 100644 --- a/lib/cpp/src/transport/TBufferTransports.h +++ b/lib/cpp/src/transport/TBufferTransports.h @@ -552,7 +552,23 @@ class TMemoryBuffer : public TBufferBase { str.append((char*)buf, sz); } - void resetBuffer() { + void resetBuffer(bool reset_capacity = false) { + if (reset_capacity) + { + assert(owner_); + + void* new_buffer = std::realloc(buffer_, defaultSize); + + if (new_buffer == NULL) { + throw TTransportException("Out of memory."); + } + + buffer_ = (uint8_t*) new_buffer; + bufferSize_ = defaultSize; + + wBound_ = buffer_ + bufferSize_; + } + rBase_ = buffer_; rBound_ = buffer_; wBase_ = buffer_; -- 2.17.1