Another checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.
Added TimerManager - I can't live without one after all.
Added Util - handy place for common time operations et al.
Initial test code
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664722 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
new file mode 100644
index 0000000..bd68264
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -0,0 +1,211 @@
+#include "TimerManager.h"
+#include "Util.h"
+
+#include <assert.h>
+
+#include <set>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** TimerManager class
+
+ @author marc
+ @version $Id:$ */
+
+typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
+typedef std::pair<task_iterator, task_iterator> task_range;
+
+class TimerManager::Task : public Runnable {
+
+public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(Runnable* runnable) :
+ _runnable(runnable),
+ _state(WAITING)
+ {}
+
+ ~Task() {};
+
+ void run() {
+ if(_state == EXECUTING) {
+ _runnable->run();
+ _state = COMPLETE;
+ }
+ }
+
+ private:
+
+ Runnable* _runnable;
+
+ STATE _state;
+};
+
+class TimerManager::Dispatcher: public Runnable {
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+public:
+ Dispatcher(TimerManager* manager) :
+ _manager(manager),
+ _state(UNINITIALIZED)
+ {}
+
+ ~Dispatcher() {}
+
+ /** Dispatcher entry point
+
+ As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
+
+ void run() {
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STARTING) {
+ _state = STARTED;
+ }
+ }
+
+ do {
+
+ std::set<TimerManager::Task*> expiredTasks;
+
+ {Synchronized(_manager->_monitor);
+
+ long long now = Util::currentTime();
+
+ task_iterator expiredTaskEnd;
+
+ while(_state == STARTED &&
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
+
+ _manager->_monitor.wait(_manager->_nextTimeout - now);
+
+ }
+
+ if(_state == STARTED) {
+
+ for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
+
+ TimerManager::Task* task = ix->second;
+
+ expiredTasks.insert(task);
+
+ _manager->_taskCount--;
+ }
+
+ _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
+ }
+ }
+
+ for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+
+ (*ix)->run();
+
+ delete *ix;
+ }
+
+ } while(_state == STARTED);
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STOPPING) {
+
+ _state = STOPPED;
+
+ _manager->_monitor.notify();
+
+ }
+ }
+
+ return;
+ }
+
+ private:
+
+ TimerManager* _manager;
+
+ friend class TimerManager;
+
+ STATE _state;
+};
+
+TimerManager::TimerManager() {}
+
+TimerManager::~TimerManager() {}
+
+const ThreadFactory* TimerManager::threadFactory() const {
+
+ Synchronized s(_monitor);
+
+ return _threadFactory;
+}
+
+void TimerManager::threadFactory(const ThreadFactory* value) {
+
+ Synchronized s(_monitor);
+
+ _threadFactory = value;
+}
+
+void TimerManager::add(Runnable* task, long long timeout) {
+
+ long long now = Util::currentTime();
+
+ timeout += now;
+
+ {Synchronized s(_monitor);
+
+ _taskCount++;
+
+ _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
+
+ /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
+ kick the dispatcher so it can update its timeout */
+
+ if(_taskCount == 1 || timeout < _nextTimeout) {
+
+ _monitor.notify();
+ }
+
+ if(timeout < _nextTimeout) {
+
+ _nextTimeout = timeout;
+ }
+ }
+}
+
+void TimerManager::add(Runnable* task, const struct timespec& value) {
+
+ long long expiration;
+
+ Util::toMilliseconds(expiration, value);
+
+ /* XXX
+ Need to convert this to an explicit exception */
+
+ long long now = Util::currentTime();
+
+ assert(expiration < now);
+
+ add(task, expiration - now);
+}
+
+
+void TimerManager::remove(Runnable* task) {
+
+}
+
+}}} // facebook::thrift::concurrency
+