writeBufferPos_ = 0;
largestWriteBufferSize_ = 0;
- socketState_ = SOCKET_RECV;
+ socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_INIT;
callsForResize_ = 0;
uint32_t fetch = 0;
switch (socketState_) {
- case SOCKET_RECV:
- // It is an error to be in this state if we already have all the data
- assert(readBufferPos_ < readWant_);
-
- // Double the buffer size until it is big enough
- if (readWant_ > readBufferSize_) {
- uint32_t newSize = readBufferSize_;
- while (readWant_ > newSize) {
- newSize *= 2;
- }
- uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
- if (newBuffer == NULL) {
- GlobalOutput("TConnection::workSocket() realloc");
+ case SOCKET_RECV_FRAMING:
+ union {
+ uint8_t buf[sizeof(uint32_t)];
+ int32_t size;
+ } framing;
+
+ // if we've already received some bytes we kept them here
+ framing.size = readWant_;
+ // determine size of this frame
+ try {
+ // Read from the socket
+ fetch = tSocket_->read(&framing.buf[readBufferPos_],
+ uint32_t(sizeof(framing.size) - readBufferPos_));
+ if (fetch == 0) {
+ // Whenever we get here it means a remote disconnect
close();
return;
}
- readBuffer_ = newBuffer;
- readBufferSize_ = newSize;
+ readBufferPos_ += fetch;
+ } catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
+ }
+
+ if (readBufferPos_ < sizeof(framing.size)) {
+ // more needed before frame size is known -- save what we have so far
+ readWant_ = framing.size;
+ return;
+ }
+
+ readWant_ = ntohl(framing.size);
+ if (static_cast<int>(readWant_) <= 0) {
+ GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
+ close();
+ return;
}
+ // size known; now get the rest of the frame
+ transition();
+ return;
+
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
try {
// Read from the socket
writeBufferPos_ = 0;
writeBufferSize_ = 0;
- // Set up read buffer for getting 4 bytes
- readBufferPos_ = 0;
- readWant_ = 4;
-
// Into read4 state we go
- socketState_ = SOCKET_RECV;
+ socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_READ_FRAME_SIZE;
+ readBufferPos_ = 0;
+
// Register read event
setRead();
return;
case APP_READ_FRAME_SIZE:
- // We just read the request length, deserialize it
- sz = *(int32_t*)readBuffer_;
- sz = (int32_t)ntohl(sz);
+ // We just read the request length
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ if (readBufferSize_ == 0) {
+ readBufferSize_ = 1;
+ }
+ uint32_t newSize = readBufferSize_;
+ while (readWant_ > newSize) {
+ newSize *= 2;
+ }
- if (sz <= 0) {
- GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
- close();
- return;
+ uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+ if (newBuffer == NULL) {
+ // nothing else to be done...
+ throw std::bad_alloc();
+ }
+ readBuffer_ = newBuffer;
+ readBufferSize_ = newSize;
}
- // Reset the read buffer
- readWant_ = (uint32_t)sz;
readBufferPos_= 0;
// Move into read request state
+ socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
// Work the socket right away
void TConnection::checkIdleBufferMemLimit(size_t readLimit,
size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
- readBufferSize_ = readLimit;
- readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
- if (readBuffer_ == NULL) {
- GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
- close();
- }
+ free(readBuffer_);
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
}
if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
// just start over
- outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+ outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
largestWriteBufferSize_ = 0;
}
}
/// Default limit on connections in handler/task processing
static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+ /// Default size of write buffer
+ static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
+
/// Maximum size of read buffer allocated to idle connection (0 = unlimited)
static const int IDLE_READ_BUFFER_LIMIT = 1024;
/// Action to take when we're overloaded.
TOverloadAction overloadAction_;
+ /**
+ * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
+ * and found to be exceeded, reinitialized) to this size.
+ */
+ size_t writeBufferDefaultSize_;
+
/**
* Max read buffer size for an idle TConnection. When we place an idle
* TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
- * we insure that its read buffer is reduced to this size to insure that
- * idle connections don't hog memory. 0 disables this check.
+ * we will free the buffer (such that it will be reinitialized by the next
+ * received frame) if it has exceeded this limit. 0 disables this check.
*/
size_t idleReadBufferLimit_;
* Max write buffer size for an idle connection. When we place an idle
* TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
* we insure that its write buffer is <= to this size; otherwise we
- * replace it with a new one to insure that idle connections don't hog
- * memory. 0 disables this check.
+ * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
+ * idle connections don't hog memory. 0 disables this check.
*/
size_t idleWriteBufferLimit_;
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
+ writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
+ writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
+ writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
*/
bool drainPendingTask();
+ /**
+ * Get the starting size of a TConnection object's write buffer.
+ *
+ * @return # bytes we initialize a TConnection object's write buffer to.
+ */
+ size_t getWriteBufferDefaultSize() const {
+ return writeBufferDefaultSize_;
+ }
+
+ /**
+ * Set the starting size of a TConnection object's write buffer.
+ *
+ * @param size # bytes we initialize a TConnection object's write buffer to.
+ */
+ void setWriteBufferDefaultSize(size_t size) {
+ writeBufferDefaultSize_ = size;
+ }
+
/**
* Get the maximum size of read buffer allocated to idle TConnection objects.
*
- * @return # bytes beyond which we will shrink buffers when idle.
+ * @return # bytes beyond which we will dealloc idle buffer.
*/
size_t getIdleReadBufferLimit() const {
return idleReadBufferLimit_;
* [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
* Get the maximum size of read buffer allocated to idle TConnection objects.
*
- * @return # bytes beyond which we will shrink buffers when idle.
+ * @return # bytes beyond which we will dealloc idle buffer.
*/
size_t getIdleBufferMemLimit() const {
return idleReadBufferLimit_;
* Set the maximum size read buffer allocated to idle TConnection objects.
* If a TConnection object is found (either on connection close or between
* calls when resizeBufferEveryN_ is set) with more than this much memory
- * allocated to its read buffer, we shrink it to this value.
+ * allocated to its read buffer, we free it and allow it to be reinitialized
+ * on the next received frame.
*
* @param limit of bytes beyond which we will shrink buffers when checked.
*/
* Set the maximum size read buffer allocated to idle TConnection objects.
* If a TConnection object is found (either on connection close or between
* calls when resizeBufferEveryN_ is set) with more than this much memory
- * allocated to its read buffer, we shrink it to this value.
+ * allocated to its read buffer, we free it and allow it to be reinitialized
+ * on the next received frame.
*
* @param limit of bytes beyond which we will shrink buffers when checked.
*/
* Set the maximum size write buffer allocated to idle TConnection objects.
* If a TConnection object is found (either on connection close or between
* calls when resizeBufferEveryN_ is set) with more than this much memory
- * allocated to its write buffer, we destroy and construct that buffer.
+ * allocated to its write buffer, we destroy and construct that buffer with
+ * writeBufferDefaultSize_ bytes.
*
* @param limit of bytes beyond which we will shrink buffers when idle.
*/
void serve();
};
-/// Two states for sockets, recv and send mode
+/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState {
+ SOCKET_RECV_FRAMING,
SOCKET_RECV,
SOCKET_SEND
};
class TConnection {
private:
- /// Starting size for new connection buffer
- static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
-
/// Server handle
TNonblockingServer* server_;
/// Constructor
TConnection(int socket, short eventFlags, TNonblockingServer *s,
const sockaddr* addr, socklen_t addrLen) {
- readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
- if (readBuffer_ == NULL) {
- throw new apache::thrift::TException("Out of memory.");
- }
- readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
// Allocate input and output tranpsorts
// these only need to be allocated once per TConnection (they don't need to be
// reallocated on init() call)
inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
- outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+ outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
tSocket_.reset(new TSocket());
init(socket, eventFlags, s, addr, addrLen);