Add join to the ThreadManager
authorMark Slee <mcslee@apache.org>
Thu, 1 Mar 2007 02:45:10 +0000 (02:45 +0000)
committerMark Slee <mcslee@apache.org>
Thu, 1 Mar 2007 02:45:10 +0000 (02:45 +0000)
Summary: Now you can join against all the threads in a ThreadManager

Reviewed By: marc, xp-style

Test Plan: Use with new ThriftServer shutdown mechanisms

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665037 13f79535-47bb-0310-9956-ffa450edef68

lib/cpp/src/concurrency/ThreadManager.cpp
lib/cpp/src/concurrency/ThreadManager.h

index 2e5472d..1631541 100644 (file)
@@ -47,7 +47,9 @@ class ThreadManager::Impl : public ThreadManager  {
 
   void start();
 
-  void stop();
+  void stop() { stopImpl(false); }
+
+  void join() { stopImpl(true); }
 
   const ThreadManager::STATE state() const {
     return state_;
@@ -91,6 +93,8 @@ class ThreadManager::Impl : public ThreadManager  {
   void remove(shared_ptr<Runnable> task);
 
 private:
+  void stopImpl(bool join);
+
   size_t workerCount_;
   size_t workerMaxCount_;
   size_t idleCount_;
@@ -154,10 +158,14 @@ class ThreadManager::Worker: public Runnable {
 
   ~Worker() {}
 
+ private:
   bool isActive() const {
-    return manager_->workerCount_ <= manager_->workerMaxCount_;
+    return
+      (manager_->workerCount_ <= manager_->workerMaxCount_) ||
+      (manager_->state_ == JOINING && !manager_->tasks_.empty());
   }
 
+ public:
   /**
    * Worker entry point
    *
@@ -205,12 +213,13 @@ class ThreadManager::Worker: public Runnable {
       {
         Synchronized s(manager_->monitor_);
        active = isActive();
+
        while (active && manager_->tasks_.empty()) {
           manager_->idleCount_++;
-         idle_ = true;
+          idle_ = true;
           manager_->monitor_.wait();
           active = isActive();
-         idle_ = false;
+          idle_ = false;
           manager_->idleCount_--;
        }
 
@@ -223,9 +232,9 @@ class ThreadManager::Worker: public Runnable {
            }
          }
        } else {
-         idle_ = true;  
+         idle_ = true;
          manager_->workerCount_--;
-          notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+          notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
        }
       }
       
@@ -309,7 +318,7 @@ void ThreadManager::Impl::start() {
   }
 }
 
-void ThreadManager::Impl::stop() {
+void ThreadManager::Impl::stopImpl(bool join) {
   bool doStop = false;
   if (state_ == ThreadManager::STOPPED) {
     return;
@@ -317,22 +326,29 @@ void ThreadManager::Impl::stop() {
 
   {
     Synchronized s(monitor_); 
-    if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) {
+    if (state_ != ThreadManager::STOPPING &&
+        state_ != ThreadManager::JOINING &&
+        state_ != ThreadManager::STOPPED) {
       doStop = true;
-      state_ = ThreadManager::STOPPING;
+      state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
     }
   }
 
   if (doStop) {
     removeWorker(workerCount_);
-    state_ = ThreadManager::STOPPING;
   }
 
   // XXX 
   // should be able to block here for transition to STOPPED since we're no
   // using shared_ptrs
+
+  {
+    Synchronized s(monitor_); 
+    state_ = ThreadManager::STOPPED;
+  }
+
 }
-  
+
 void ThreadManager::Impl::removeWorker(size_t value) {
   std::set<shared_ptr<Thread> > removedThreads;
   {
index 52bc75c..f7c4b3c 100644 (file)
@@ -63,10 +63,16 @@ class ThreadManager {
    */
   virtual void stop() = 0;
 
+  /**
+   * Joins the thread manager. This is the same as stop, except that it will
+   * block until all the workers have finished their work. At that point
+   * the ThreadManager will transition into the STOPPED state.
+   */
   enum STATE {
     UNINITIALIZED,
     STARTING,
     STARTED,
+    JOINING,
     STOPPING,
     STOPPED
   };