More bullet proofing of timer manager
facebook::thrift::concurrency::TimerManager::stop
Added proper cleanup of unprocessed tasks and shutdown of dispatcher thread to stop
facebook::thrift::concurrency::TimerManager::~TimerManager
Call stop if manager wasn't explicitly stopped
facebook::thrift::concurrency::test.TimerManagerTest
Calculate error margin for timeout expiration and verify it's within bounds
Verify manager stops properly when it goes out of scope
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664724 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index 015ffba..93b0dc5 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -82,22 +82,25 @@
{Synchronized s(_manager->_monitor);
- /* Update next timeout if necessary */
-
task_iterator expiredTaskEnd;
+ long long now = Util::currentTime();
+
while(_manager->_state == TimerManager::STARTED &&
- (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
long long timeout = 0LL;
if(!_manager->_taskMap.empty()) {
- timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+ timeout = _manager->_taskMap.begin()->first - now;
}
+
+ assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
_manager->_monitor.wait(timeout);
-
+
+ now = Util::currentTime();
}
if(_manager->_state == TimerManager::STARTED) {
@@ -151,13 +154,29 @@
};
TimerManager::TimerManager() :
+ _taskCount(0),
_state(TimerManager::UNINITIALIZED),
_dispatcher(new Dispatcher(this)) {
}
TimerManager::~TimerManager() {
- delete _dispatcher;
+
+ /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
+ stop already takes care of reentrancy. */
+
+ if(_state != STOPPED) {
+
+ try {
+
+ stop();
+
+ } catch(...) {
+
+ // uhoh
+
+ }
+ }
}
void TimerManager::start() {
@@ -196,6 +215,8 @@
void TimerManager::stop() {
+ bool doStop = false;
+
{Synchronized s(_monitor);
if(_state == TimerManager::UNINITIALIZED) {
@@ -204,6 +225,8 @@
} else if(_state != STOPPING && _state != STOPPED) {
+ doStop = true;
+
_state = STOPPING;
_monitor.notifyAll();
@@ -214,6 +237,21 @@
_monitor.wait();
}
}
+
+ if(doStop) {
+
+ // Clean up any outstanding tasks
+
+ for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
+
+ delete ix->second;
+
+ _taskMap.erase(ix);
+ }
+
+ delete _dispatcher;
+ }
+
}
const ThreadFactory* TimerManager::threadFactory() const {