Added thread factory test - problems in thread
Fixed stupid typo in TimerManager::start
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664723 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index bd68264..015ffba 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -1,8 +1,9 @@
#include "TimerManager.h"
+#include "Exception.h"
#include "Util.h"
#include <assert.h>
-
+#include <iostream>
#include <set>
namespace facebook { namespace thrift { namespace concurrency {
@@ -43,24 +44,19 @@
Runnable* _runnable;
+ class TimerManager::Dispatcher;
+
+ friend class TimerManager::Dispatcher;
+
STATE _state;
};
class TimerManager::Dispatcher: public Runnable {
- enum STATE {
- UNINITIALIZED,
- STARTING,
- STARTED,
- STOPPING,
- STOPPED
- };
-
public:
Dispatcher(TimerManager* manager) :
- _manager(manager),
- _state(UNINITIALIZED)
- {}
+ _manager(manager) {
+}
~Dispatcher() {}
@@ -70,10 +66,13 @@
void run() {
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
- if(_state == STARTING) {
- _state = STARTED;
+ if(_manager->_state == TimerManager::STARTING) {
+
+ _manager->_state = TimerManager::STARTED;
+
+ _manager->_monitor.notifyAll();
}
}
@@ -81,26 +80,38 @@
std::set<TimerManager::Task*> expiredTasks;
- {Synchronized(_manager->_monitor);
-
- long long now = Util::currentTime();
+ {Synchronized s(_manager->_monitor);
+
+ /* Update next timeout if necessary */
task_iterator expiredTaskEnd;
-
- while(_state == STARTED &&
- (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
-
- _manager->_monitor.wait(_manager->_nextTimeout - now);
+
+ while(_manager->_state == TimerManager::STARTED &&
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+
+ long long timeout = 0LL;
+
+ if(!_manager->_taskMap.empty()) {
+
+ timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+ }
+
+ _manager->_monitor.wait(timeout);
}
- if(_state == STARTED) {
+ if(_manager->_state == TimerManager::STARTED) {
for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
TimerManager::Task* task = ix->second;
expiredTasks.insert(task);
+
+ if(task->_state == TimerManager::Task::WAITING) {
+
+ task->_state = TimerManager::Task::EXECUTING;
+ }
_manager->_taskCount--;
}
@@ -116,13 +127,13 @@
delete *ix;
}
- } while(_state == STARTED);
+ } while(_manager->_state == TimerManager::STARTED);
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
- if(_state == STOPPING) {
+ if(_manager->_state == TimerManager::STOPPING) {
- _state = STOPPED;
+ _manager->_state = TimerManager::STOPPED;
_manager->_monitor.notify();
@@ -137,13 +148,73 @@
TimerManager* _manager;
friend class TimerManager;
-
- STATE _state;
};
-TimerManager::TimerManager() {}
+TimerManager::TimerManager() :
+ _state(TimerManager::UNINITIALIZED),
+ _dispatcher(new Dispatcher(this)) {
+}
-TimerManager::~TimerManager() {}
+
+TimerManager::~TimerManager() {
+ delete _dispatcher;
+}
+
+void TimerManager::start() {
+
+ bool doStart = false;
+
+ {Synchronized s(_monitor);
+
+ if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+ if(_state == TimerManager::UNINITIALIZED) {
+
+ _state = TimerManager::STARTING;
+
+ doStart = true;
+ }
+ }
+
+ if(doStart) {
+
+ _dispatcherThread = _threadFactory->newThread(_dispatcher);
+
+ _dispatcherThread->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ while(_state == TimerManager::STARTING) {
+
+ _monitor.wait();
+ }
+
+ assert(_state != TimerManager::STARTING);
+ }
+}
+
+void TimerManager::stop() {
+
+ {Synchronized s(_monitor);
+
+ if(_state == TimerManager::UNINITIALIZED) {
+
+ _state = TimerManager::STOPPED;
+
+ } else if(_state != STOPPING && _state != STOPPED) {
+
+ _state = STOPPING;
+
+ _monitor.notifyAll();
+ }
+
+ while(_state != STOPPED) {
+
+ _monitor.wait();
+ }
+ }
+}
const ThreadFactory* TimerManager::threadFactory() const {
@@ -159,6 +230,11 @@
_threadFactory = value;
}
+size_t TimerManager::taskCount() const {
+
+ return _taskCount;
+}
+
void TimerManager::add(Runnable* task, long long timeout) {
long long now = Util::currentTime();
@@ -167,6 +243,10 @@
{Synchronized s(_monitor);
+ if(_state != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+
_taskCount++;
_taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
@@ -174,15 +254,10 @@
/* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
kick the dispatcher so it can update its timeout */
- if(_taskCount == 1 || timeout < _nextTimeout) {
+ if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
_monitor.notify();
}
-
- if(timeout < _nextTimeout) {
-
- _nextTimeout = timeout;
- }
}
}
@@ -192,20 +267,26 @@
Util::toMilliseconds(expiration, value);
- /* XXX
- Need to convert this to an explicit exception */
-
long long now = Util::currentTime();
- assert(expiration < now);
+ if(expiration < now) {
+ throw InvalidArgumentException();
+ }
add(task, expiration - now);
}
void TimerManager::remove(Runnable* task) {
+ {Synchronized s(_monitor);
+ if(_state != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+ }
}
+const TimerManager::STATE TimerManager::state() const { return _state;}
+
}}} // facebook::thrift::concurrency