cpp: TNonBlockingServer: Use separate monitor for max queue
authorDavid Reiss <dreiss@apache.org>
Tue, 9 Mar 2010 05:19:32 +0000 (05:19 +0000)
committerDavid Reiss <dreiss@apache.org>
Tue, 9 Mar 2010 05:19:32 +0000 (05:19 +0000)
We were using the same monitor for max queue size and empty queue, this
meant the notifies might be going to the wrong place.

This change significantly reduces the time spent in futex calls in
loaded servers.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920667 13f79535-47bb-0310-9956-ffa450edef68

lib/cpp/src/concurrency/ThreadManager.cpp

index 7bba0e6..52473c7 100644 (file)
@@ -54,7 +54,9 @@ class ThreadManager::Impl : public ThreadManager  {
     workerMaxCount_(0),
     idleCount_(0),
     pendingTaskCountMax_(0),
-    state_(ThreadManager::UNINITIALIZED) {}
+    state_(ThreadManager::UNINITIALIZED),
+    monitor_(&mutex_),
+    maxMonitor_(&mutex_) {}
 
   ~Impl() { stop(); }
 
@@ -133,7 +135,9 @@ private:
 
   friend class ThreadManager::Task;
   std::queue<shared_ptr<Task> > tasks_;
+  Mutex mutex_;
   Monitor monitor_;
+  Monitor maxMonitor_;
   Monitor workerMonitor_;
 
   friend class ThreadManager::Worker;
@@ -245,7 +249,7 @@ class ThreadManager::Worker: public Runnable {
        * the manager will see it.
        */
       {
-        Synchronized s(manager_->monitor_);
+        Guard g(manager_->mutex_);
         active = isActive();
 
         while (active && manager_->tasks_.empty()) {
@@ -269,7 +273,7 @@ class ThreadManager::Worker: public Runnable {
                thread that might be blocked on add. */
             if (manager_->pendingTaskCountMax_ != 0 &&
                 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
-              manager_->monitor_.notify();
+              manager_->maxMonitor_.notify();
             }
           }
         } else {
@@ -432,7 +436,7 @@ void ThreadManager::Impl::removeWorker(size_t value) {
   }
 
   void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();
@@ -441,7 +445,8 @@ void ThreadManager::Impl::removeWorker(size_t value) {
     if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
       if (canSleep() && timeout >= 0) {
         while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
-          monitor_.wait(timeout);
+          // This is thread safe because the mutex is shared between monitors.
+          maxMonitor_.wait(timeout);
         }
       } else {
         throw TooManyPendingTasksException();