Bring up of thread manager
facebook::thrift::concurrency::test.ThreadManagerTest::test00
Launch N tasks that block for time T, verify they all complete and that the thread manager cleans up properly
when it goes out of scope
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664725 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index d13ce7b..f8a5c22 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -1,4 +1,5 @@
#include "ThreadManager.h"
+#include "Exception.h"
#include "Monitor.h"
#include <assert.h>
@@ -10,12 +11,7 @@
/** ThreadManager class
This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
- it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
- PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
- size needs to be adjusted and call this object addThread and removeThread methods to make changes.
-
- This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
- policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
+ it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
@author marc
@version $Id:$ */
@@ -24,34 +20,18 @@
public:
- Impl(size_t highWatermark, size_t lowWatermark) :
- _hiwat(highWatermark),
- _lowat(lowWatermark) {
+ Impl() : _stopped(false) {}
+
+
+
+ ~Impl() {
+
+ if(!_stopped) {
+ stop();
+ }
}
- ~Impl() {}
-
- size_t highWatermark() const {return _hiwat;}
-
- void highWatermark(size_t value) {_hiwat = value;}
-
- size_t lowWatermark() const {return _lowat;}
-
- void lowWatermark(size_t value) {_lowat = value;}
-
- const PoolPolicy* poolPolicy() const {
-
- Synchronized s(_monitor);
-
- return _poolPolicy;
- }
-
- void poolPolicy(const PoolPolicy* value) {
-
- Synchronized s(_monitor);
-
- _poolPolicy = value;
- }
+ void stop();
const ThreadFactory* threadFactory() const {
@@ -67,9 +47,9 @@
_threadFactory = value;
}
- void addThread(size_t value);
+ void addWorker(size_t value);
- void removeThread(size_t value);
+ void removeWorker(size_t value);
size_t idleWorkerCount() const {return _idleCount;}
@@ -77,7 +57,7 @@
Synchronized s(_monitor);
- return _workers.size();
+ return _workerCount;
}
size_t pendingTaskCount() const {
@@ -91,7 +71,7 @@
Synchronized s(_monitor);
- return _tasks.size() + _workers.size() - _idleCount;
+ return _tasks.size() + _workerCount - _idleCount;
}
void add(Runnable* value);
@@ -100,13 +80,11 @@
private:
- size_t _hiwat;
-
- size_t _lowat;
+ size_t _workerCount;
size_t _idleCount;
- const PoolPolicy* _poolPolicy;;
+ bool _stopped;
const ThreadFactory* _threadFactory;
@@ -148,6 +126,8 @@
private:
Runnable* _runnable;
+
+ friend class ThreadManager::Worker;
STATE _state;
};
@@ -181,6 +161,10 @@
if(_state == STARTING) {
_state = STARTED;
}
+
+ _manager->_workerCount++;
+
+ _manager->_monitor.notify();
}
do {
@@ -207,22 +191,43 @@
}
if(_state == STARTED) {
+
+ if(!_manager->_tasks.empty()) {
- task = _manager->_tasks.front();
+ task = _manager->_tasks.front();
+
+ _manager->_tasks.pop();
+
+ if(task->_state == ThreadManager::Task::WAITING) {
+
+ task->_state = ThreadManager::Task::EXECUTING;
+ }
+ }
}
}
if(task != NULL) {
- task->run();
+ if(task->_state == ThreadManager::Task::EXECUTING) {
+ try {
+
+ task->run();
- delete task;
+ } catch(...) {
+
+ // XXX need to log this
+ }
+
+ delete task;
+ }
}
-
+
} while(_state == STARTED);
{Synchronized s(_manager->_monitor);
+ _manager->_workerCount--;
+
if(_state == STOPPING) {
_state = STOPPED;
@@ -246,35 +251,56 @@
bool _idle;
};
-void ThreadManager::Impl::addThread(size_t value) {
+void ThreadManager::Impl::addWorker(size_t value) {
- std::set<Thread*> newThreads;
+ std::set<Thread*> newThreads;
- for(size_t ix = 0; ix < value; ix++) {
+ for(size_t ix = 0; ix < value; ix++) {
- class ThreadManager::Worker;
+ class ThreadManager::Worker;
- ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+ ThreadManager::Worker* worker = new ThreadManager::Worker(this);
- newThreads.insert(_threadFactory->newThread(worker));
- }
-
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
- (*ix)->start();
- }
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
- (*ix)->start();
- }
-
- {Synchronized s(_monitor);
-
- _workers.insert(newThreads.begin(), newThreads.end());
- }
+ newThreads.insert(_threadFactory->newThread(worker));
}
-void ThreadManager::Impl::removeThread(size_t value) {
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+ ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+
+ worker->_state = ThreadManager::Worker::STARTING;
+
+ (*ix)->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+
+ while(_workerCount != _workers.size()) {
+ _monitor.wait();
+ }
+ }
+}
+
+void ThreadManager::Impl::stop() {
+
+ bool doStop = false;
+
+ {Synchronized s(_monitor);
+
+ if(!_stopped) {
+ doStop = true;
+ _stopped = true;
+ }
+ }
+
+ if(doStop) {
+ removeWorker(_workerCount);
+ }
+}
+
+void ThreadManager::Impl::removeWorker(size_t value) {
std::set<Thread*> removedThreads;
@@ -285,14 +311,19 @@
First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
and remove a sufficient number of busy threads. */
- for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+ for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
Worker* worker = (Worker*)(*workerThread)->runnable();
if(worker->_idle || !idleOnly) {
-
+
+ if(worker->_state == ThreadManager::Worker::STARTED) {
+
+ worker->_state = ThreadManager::Worker::STOPPING;
+ }
+
removedThreads.insert(*workerThread);
_workers.erase(workerThread);
@@ -320,15 +351,17 @@
Synchronized s(_monitor);
+ bool isEmpty = _tasks.empty();
+
_tasks.push(new ThreadManager::Task(value));
- /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+ /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
task in time. */
- if(_tasks.size() == 1) {
+ if(isEmpty) {
- assert(_idleCount == _workers.size());
-
+ assert(_idleCount == _workerCount);
+
_monitor.notify();
}
}
@@ -336,39 +369,54 @@
void ThreadManager::Impl::remove(Runnable* task) {
Synchronized s(_monitor);
- }
-
-ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
- return new ThreadManager::Impl(lowWatermark, highWatermark);
}
-/** Basic Pool Policy Implementation */
+class SimpleThreadManager : public ThreadManager::Impl {
-class BasicPoolPolicy::Impl : public PoolPolicy {
+public:
- public:
+ SimpleThreadManager(size_t workerCount=4) :
+ _workerCount(workerCount),
+ _firstTime(true) {
+ }
- Impl() {}
+ void add(Runnable* task) {
- ~Impl() {}
+ bool addWorkers = false;
- void onEmpty(ThreadManager* source) const {}
+ {Synchronized s(_monitor);
- void onLowWatermark(ThreadManager* source) const {}
+ if(_firstTime) {
- void onHighWatermark(ThreadManager* source) const {}
+ _firstTime = false;
+
+ addWorkers = true;
+ }
+ }
+
+ if(addWorkers) {
+
+ addWorker(_workerCount);
+ }
+
+ Impl::add(task);
+ }
+
+private:
+
+ const size_t _workerCount;
+ bool _firstTime;
+ Monitor _monitor;
};
-BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl()) {}
-BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
+ThreadManager* ThreadManager::newThreadManager() {
+ return new ThreadManager::Impl();
+}
-void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
-
-void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
-
-void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
-
+ThreadManager* ThreadManager::newSimpleThreadManager() {
+ return new SimpleThreadManager();
+}
}}} // facebook::thrift::concurrency