From: Mark Slee Date: Thu, 1 Mar 2007 02:45:10 +0000 (+0000) Subject: Add join to the ThreadManager X-Git-Tag: 0.2.0~1435 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=7c10eaf35c200b46e26b9165be088b6a5d7dd791;p=common%2Fthrift.git Add join to the ThreadManager Summary: Now you can join against all the threads in a ThreadManager Reviewed By: marc, xp-style Test Plan: Use with new ThriftServer shutdown mechanisms git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665037 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp index 2e5472d6..1631541d 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/concurrency/ThreadManager.cpp @@ -47,7 +47,9 @@ class ThreadManager::Impl : public ThreadManager { void start(); - void stop(); + void stop() { stopImpl(false); } + + void join() { stopImpl(true); } const ThreadManager::STATE state() const { return state_; @@ -91,6 +93,8 @@ class ThreadManager::Impl : public ThreadManager { void remove(shared_ptr task); private: + void stopImpl(bool join); + size_t workerCount_; size_t workerMaxCount_; size_t idleCount_; @@ -154,10 +158,14 @@ class ThreadManager::Worker: public Runnable { ~Worker() {} + private: bool isActive() const { - return manager_->workerCount_ <= manager_->workerMaxCount_; + return + (manager_->workerCount_ <= manager_->workerMaxCount_) || + (manager_->state_ == JOINING && !manager_->tasks_.empty()); } + public: /** * Worker entry point * @@ -205,12 +213,13 @@ class ThreadManager::Worker: public Runnable { { Synchronized s(manager_->monitor_); active = isActive(); + while (active && manager_->tasks_.empty()) { manager_->idleCount_++; - idle_ = true; + idle_ = true; manager_->monitor_.wait(); active = isActive(); - idle_ = false; + idle_ = false; manager_->idleCount_--; } @@ -223,9 +232,9 @@ class ThreadManager::Worker: public Runnable { } } } else { - idle_ = true; + idle_ = true; manager_->workerCount_--; - notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; + notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_); } } @@ -309,7 +318,7 @@ void ThreadManager::Impl::start() { } } -void ThreadManager::Impl::stop() { +void ThreadManager::Impl::stopImpl(bool join) { bool doStop = false; if (state_ == ThreadManager::STOPPED) { return; @@ -317,22 +326,29 @@ void ThreadManager::Impl::stop() { { Synchronized s(monitor_); - if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) { + if (state_ != ThreadManager::STOPPING && + state_ != ThreadManager::JOINING && + state_ != ThreadManager::STOPPED) { doStop = true; - state_ = ThreadManager::STOPPING; + state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING; } } if (doStop) { removeWorker(workerCount_); - state_ = ThreadManager::STOPPING; } // XXX // should be able to block here for transition to STOPPED since we're no // using shared_ptrs + + { + Synchronized s(monitor_); + state_ = ThreadManager::STOPPED; + } + } - + void ThreadManager::Impl::removeWorker(size_t value) { std::set > removedThreads; { diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index 52bc75c2..f7c4b3c0 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -63,10 +63,16 @@ class ThreadManager { */ virtual void stop() = 0; + /** + * Joins the thread manager. This is the same as stop, except that it will + * block until all the workers have finished their work. At that point + * the ThreadManager will transition into the STOPPED state. + */ enum STATE { UNINITIALIZED, STARTING, STARTED, + JOINING, STOPPING, STOPPED };