Another checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.
Added TimerManager - I can't live without one after all.
Added Util - handy place for common time operations et al.
Initial test code
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664722 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index c4ca2b1..b5d02e6 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -1,6 +1,9 @@
#include "ThreadManager.h"
+#include "Monitor.h"
#include <assert.h>
+#include <queue>
+#include <set>
namespace facebook { namespace thrift { namespace concurrency {
@@ -15,7 +18,108 @@
policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
@author marc
- @version $Id */
+ @version $Id:$ */
+
+class ThreadManager::Impl : public ThreadManager {
+
+ public:
+
+ Impl(size_t highWatermark, size_t lowWatermark) :
+ _hiwat(highWatermark),
+ _lowat(lowWatermark) {
+ }
+
+ ~Impl() {}
+
+ size_t highWatermark() const {return _hiwat;}
+
+ void highWatermark(size_t value) {_hiwat = value;}
+
+ size_t lowWatermark() const {return _lowat;}
+
+ void lowWatermark(size_t value) {_lowat = value;}
+
+ const PoolPolicy* poolPolicy() const {
+
+ Synchronized s(_monitor);
+
+ return _poolPolicy;
+ }
+
+ void poolPolicy(const PoolPolicy* value) {
+
+ Synchronized s(_monitor);
+
+ _poolPolicy = value;
+ }
+
+ const ThreadFactory* threadFactory() const {
+
+ Synchronized s(_monitor);
+
+ return _threadFactory;
+ }
+
+ void threadFactory(const ThreadFactory* value) {
+
+ Synchronized s(_monitor);
+
+ _threadFactory = value;
+ }
+
+ void addThread(size_t value);
+
+ void removeThread(size_t value);
+
+ size_t idleWorkerCount() const {return _idleCount;}
+
+ size_t workerCount() const {
+
+ Synchronized s(_monitor);
+
+ return _workers.size();
+ }
+
+ size_t pendingTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size();
+ }
+
+ size_t totalTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size() + _workers.size() - _idleCount;
+ }
+
+ void add(Runnable* value);
+
+ void remove(Runnable* task);
+
+private:
+
+ size_t _hiwat;
+
+ size_t _lowat;
+
+ size_t _idleCount;
+
+ const PoolPolicy* _poolPolicy;;
+
+ const ThreadFactory* _threadFactory;
+
+ friend class ThreadManager::Task;
+
+ std::queue<Task*> _tasks;
+
+ Monitor _monitor;
+
+ friend class ThreadManager::Worker;
+
+ std::set<Thread*> _workers;
+};
class ThreadManager::Task : public Runnable {
@@ -49,7 +153,6 @@
};
class ThreadManager::Worker: public Runnable {
-
enum STATE {
UNINITIALIZED,
STARTING,
@@ -59,7 +162,7 @@
};
public:
- Worker(ThreadManager* manager) :
+ Worker(ThreadManager::Impl* manager) :
_manager(manager),
_state(UNINITIALIZED),
_idle(false)
@@ -134,170 +237,138 @@
private:
- ThreadManager* _manager;
+ ThreadManager::Impl* _manager;
- friend class ThreadManager;
+ friend class ThreadManager::Impl;
STATE _state;
bool _idle;
};
-ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) :
- _hiwat(highWatermark),
- _lowat(lowWatermark) {
-}
-
-ThreadManager::~ThreadManager() {}
-
-size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
-
-void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
-
-size_t ThreadManager::lowWatermark() const {return _lowat;}
-
-void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
-
-const PoolPolicy* ThreadManager::poolPolicy() const {
-
- Synchronized s(_monitor);
-
- return _poolPolicy;
-}
-
-void ThreadManager::poolPolicy(const PoolPolicy* value) {
-
- Synchronized s(_monitor);
-
- _poolPolicy = value;
-}
-
-const ThreadFactory* ThreadManager::threadFactory() const {
-
- Synchronized s(_monitor);
-
- return _threadFactory;
-}
-
-void ThreadManager::threadFactory(const ThreadFactory* value) {
+void ThreadManager::Impl::addThread(size_t value) {
- Synchronized s(_monitor);
+ std::set<Thread*> newThreads;
- _threadFactory = value;
-}
+ for(size_t ix = 0; ix < value; ix++) {
-void ThreadManager::addThread(size_t value) {
+ class ThreadManager::Worker;
+
+ ThreadManager::Worker* worker = new ThreadManager::Worker(this);
- std::set<Thread*> newThreads;
+ newThreads.insert(_threadFactory->newThread(worker));
+ }
- for(size_t ix = 0; ix < value; ix++) {
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+ (*ix)->start();
+ }
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- newThreads.insert(_threadFactory->newThread(worker));
+ (*ix)->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+ }
}
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
- (*ix)->start();
- }
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
- (*ix)->start();
- }
-
- {Synchronized s(_monitor);
-
- _workers.insert(newThreads.begin(), newThreads.end());
- }
-}
-
-void ThreadManager::removeThread(size_t value) {
+void ThreadManager::Impl::removeThread(size_t value) {
std::set<Thread*> removedThreads;
- {Synchronized s(_monitor);
+ {Synchronized s(_monitor);
- /* Overly clever loop
+ /* Overly clever loop
+
+ First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
+ and remove a sufficient number of busy threads. */
- First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
- and remove a sufficient number of busy threads. */
-
- for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+ for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
- for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+ for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
- Worker* worker = (Worker*)(*workerThread)->runnable();
+ Worker* worker = (Worker*)(*workerThread)->runnable();
- if(worker->_idle || !idleOnly) {
+ if(worker->_idle || !idleOnly) {
- removedThreads.insert(*workerThread);
+ removedThreads.insert(*workerThread);
- _workers.erase(workerThread);
+ _workers.erase(workerThread);
+ }
}
}
+
+ _monitor.notifyAll();
}
+
- _monitor.notifyAll();
+ // Join removed threads and free worker
+
+ for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
+
+ Worker* worker = (Worker*)(*workerThread)->runnable();
+
+ (*workerThread)->join();
+
+ delete worker;
+ }
+ }
+
+void ThreadManager::Impl::add(Runnable* value) {
+
+ Synchronized s(_monitor);
+
+ _tasks.push(new ThreadManager::Task(value));
+
+ /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+ task in time. */
+
+ if(_tasks.size() == 1) {
+
+ assert(_idleCount == _workers.size());
+
+ _monitor.notify();
+ }
}
+void ThreadManager::Impl::remove(Runnable* task) {
- // Join removed threads and free worker
-
- for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
- Worker* worker = (Worker*)(*workerThread)->runnable();
-
- (*workerThread)->join();
-
- delete worker;
+ Synchronized s(_monitor);
}
+
+ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
+ return new ThreadManager::Impl(lowWatermark, highWatermark);
}
-size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
+/** Basic Pool Policy Implementation */
-size_t ThreadManager::workerCount() const {
+class BasicPoolPolicy::Impl : public PoolPolicy {
- Synchronized s(_monitor);
+ public:
- return _workers.size();
-}
+ Impl() {}
-size_t ThreadManager::pendingTaskCount() const {
+ ~Impl() {}
- Synchronized s(_monitor);
+ void onEmpty(ThreadManager* source) const {}
- return _tasks.size();
-}
+ void onLowWatermark(ThreadManager* source) const {}
-size_t ThreadManager::totalTaskCount() const {
+ void onHighWatermark(ThreadManager* source) const {}
+};
- Synchronized s(_monitor);
+BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl()) {}
- return _tasks.size() + _workers.size() - _idleCount;
-}
+BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
-void ThreadManager::add(Runnable* value) {
+void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
- Synchronized s(_monitor);
+void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
- _tasks.push(new ThreadManager::Task(value));
+void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
- /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
- task in time. */
-
- if(_tasks.size() == 1) {
-
- assert(_idleCount == _workers.size());
-
- _monitor.notify();
- }
-}
-
-void ThreadManager::remove(Runnable* task) {
-
- Synchronized s(_monitor);
-}
}}} // facebook::thrift::concurrency