From: Mark Slee Date: Thu, 23 Aug 2007 01:43:20 +0000 (+0000) Subject: Fix TNonBlockingServer libevent issue in ThreadPool mode X-Git-Tag: 0.2.0~1255 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=402ee28f87dd855d8825182b79617891a7ac4b93;p=common%2Fthrift.git Fix TNonBlockingServer libevent issue in ThreadPool mode Summary: If using TNonBlockingServer with a ThreadManager, when you send a task off to the threadmanager you need to cancel the event that you have set on that client socket. Otherwise, when you give control back to libevent, it might trigger more read events if there are more requests coming down the pipe. This is an issue, because the server will be in the wrong state at that point and will have no way of handling reading more data if it is still in the WAIT_TASK state trying to see if it should write something back to the client. So, when we hit that control flow, we must setIdle() on the TConnection so that libevent doesn't trigger it anymore. Later, after the result is written, we'll setRead() and go back to the init state. Reviewed By: akhil Test Plan: Akhil's async + TNonBlocking karma server git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665217 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index ceba960a..fcfe797a 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -118,7 +118,7 @@ void TConnection::workSocket() { // Read from the socket fetch = readWant_ - readBufferPos_; got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0); - + if (got > 0) { // Move along in the buffer readBufferPos_ += got; @@ -215,7 +215,7 @@ void TConnection::transition() { // and get back some data from the dispatch function inputTransport_->resetBuffer(readBuffer_, readBufferPos_); outputTransport_->resetBuffer(); - + if (server_->isThreadPoolProcessing()) { // We are setting up a Task to do this work and we will wait on it int sv[2]; @@ -242,6 +242,11 @@ void TConnection::transition() { return; } server_->addTask(task); + + // Set this connection idle so that libevent doesn't process more + // data on it while we're still waiting for the threadmanager to + // finish this task + setIdle(); return; } } else { @@ -263,6 +268,9 @@ void TConnection::transition() { } } + // Intentionally fall through here, the call to process has written into + // the writeBuffer_ + case APP_WAIT_TASK: // We have now finished processing a task and the result has been written // into the outputTransport_, so we grab its contents and place them into @@ -392,6 +400,11 @@ void TConnection::setFlags(short eventFlags) { // Update in memory structure eventFlags_ = eventFlags; + // Do not call event_set if there are no flags + if (!eventFlags_) { + return; + } + /** * event_set: * diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 6997c45d..5470ad4e 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -267,6 +267,11 @@ class TConnection { setFlags(EV_WRITE | EV_PERSIST); } + // Set socket idle + void setIdle() { + setFlags(0); + } + // Set event flags void setFlags(short eventFlags);