From 9f27a4eb7d329f80a7143b1850655f1c59bf4f46 Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Wed, 19 Jul 2006 20:02:22 +0000 Subject: [PATCH] More bullet proofing of timer manager facebook::thrift::concurrency::TimerManager::stop Added proper cleanup of unprocessed tasks and shutdown of dispatcher thread to stop facebook::thrift::concurrency::TimerManager::~TimerManager Call stop if manager wasn't explicitly stopped facebook::thrift::concurrency::test.TimerManagerTest Calculate error margin for timeout expiration and verify it's within bounds Verify manager stops properly when it goes out of scope git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664724 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/concurrency/Monitor.cc | 10 ++- lib/cpp/src/concurrency/TimerManager.cc | 50 +++++++++++-- lib/cpp/src/concurrency/Util.h | 35 ++++------ .../src/concurrency/test/TimerManagerTests.h | 70 +++++++++++++++---- 4 files changed, 119 insertions(+), 46 deletions(-) diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc index f6144ba7..b1d7b726 100644 --- a/lib/cpp/src/concurrency/Monitor.cc +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -59,6 +59,8 @@ class Monitor::Impl { // XXX Need to assert that caller owns mutex + assert(timeout >= 0LL); + if(timeout == 0LL) { assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0); @@ -67,13 +69,15 @@ class Monitor::Impl { struct timespec abstime; + long long now = Util::currentTime(); + + Util::toTimespec(abstime, now + timeout); + int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); if(result == ETIMEDOUT) { - // XXX Add assert once currentTime is fixed to have ms resolution or better - - // assert(Util::currentTime() >= (now + timeout)); + assert(Util::currentTime() >= (now + timeout)); } } } diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc index 015ffba5..93b0dc54 100644 --- a/lib/cpp/src/concurrency/TimerManager.cc +++ b/lib/cpp/src/concurrency/TimerManager.cc @@ -82,22 +82,25 @@ public: {Synchronized s(_manager->_monitor); - /* Update next timeout if necessary */ - task_iterator expiredTaskEnd; + long long now = Util::currentTime(); + while(_manager->_state == TimerManager::STARTED && - (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) { + (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) { long long timeout = 0LL; if(!_manager->_taskMap.empty()) { - timeout = Util::currentTime() - _manager->_taskMap.begin()->first; + timeout = _manager->_taskMap.begin()->first - now; } + + assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0)); _manager->_monitor.wait(timeout); - + + now = Util::currentTime(); } if(_manager->_state == TimerManager::STARTED) { @@ -151,13 +154,29 @@ public: }; TimerManager::TimerManager() : + _taskCount(0), _state(TimerManager::UNINITIALIZED), _dispatcher(new Dispatcher(this)) { } TimerManager::~TimerManager() { - delete _dispatcher; + + /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since + stop already takes care of reentrancy. */ + + if(_state != STOPPED) { + + try { + + stop(); + + } catch(...) { + + // uhoh + + } + } } void TimerManager::start() { @@ -196,6 +215,8 @@ void TimerManager::start() { void TimerManager::stop() { + bool doStop = false; + {Synchronized s(_monitor); if(_state == TimerManager::UNINITIALIZED) { @@ -204,6 +225,8 @@ void TimerManager::stop() { } else if(_state != STOPPING && _state != STOPPED) { + doStop = true; + _state = STOPPING; _monitor.notifyAll(); @@ -214,6 +237,21 @@ void TimerManager::stop() { _monitor.wait(); } } + + if(doStop) { + + // Clean up any outstanding tasks + + for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) { + + delete ix->second; + + _taskMap.erase(ix); + } + + delete _dispatcher; + } + } const ThreadFactory* TimerManager::threadFactory() const { diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h index 3ab89293..d04435d5 100644 --- a/lib/cpp/src/concurrency/Util.h +++ b/lib/cpp/src/concurrency/Util.h @@ -2,7 +2,7 @@ #define _concurrency_Util_h_ 1 #include -#include +#include namespace facebook { namespace thrift { namespace concurrency { @@ -19,45 +19,34 @@ class Util { public: - /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure - specifying current time plus timeout + /** Converts timespec to milliseconds - @param struct timespec& current time plus timeout result - @param timeout time to delay in milliseconds */ + @param struct timespec& result + @param time or duration in milliseconds */ - static const void toAbsoluteTimespec(struct timespec& result, long long value) { + static void toTimespec(struct timespec& result, long long value) { - // XXX Darwin doesn't seem to have any readily useable hi-res clock. + result.tv_sec = value / 1000; // ms to s - time_t seconds; - - assert(time(&seconds) != (time_t)-1); - - seconds+= (value / 1000); - - long nanoseconds = (value % 1000) * 1000000; - - result.tv_sec = seconds + (nanoseconds / 1000000000); - - result.tv_nsec = nanoseconds % 1000000000; + result.tv_nsec = (value % 1000) * 1000000; // ms to ns } - /** Converts absolute timespec to milliseconds from epoch */ + /** Converts timespec to milliseconds */ static const void toMilliseconds(long long& result, const struct timespec& value) { - result = value.tv_sec * 1000 + value.tv_nsec * 1000000; + result = value.tv_sec * 1000 + value.tv_nsec / 1000000; } /** Get current time as milliseconds from epoch */ static const long long currentTime() { - time_t now; + struct timeval now; - time(&now); + assert(gettimeofday(&now, NULL) == 0); - return (long long)now * 1000; + return ((long long)now.tv_sec) * 1000LL + now.tv_usec / 1000; } }; diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h index 24f79643..f34f8b0b 100644 --- a/lib/cpp/src/concurrency/test/TimerManagerTests.h +++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -20,12 +21,30 @@ class TimerManagerTests { public: - Task(Monitor& monitor) : + Task(Monitor& monitor, long long timeout) : + _timeout(timeout), + _startTime(Util::currentTime()), _monitor(monitor), + _success(false), _done(false) {} void run() { + _endTime = Util::currentTime(); + + // Figure out error percentage + + long long delta = _endTime - _startTime; + + + delta = delta > _timeout ? delta - _timeout : _timeout - delta; + + float error = delta / _timeout; + + if(error < .10) { + _success = true; + } + std::cout << "\t\t\tHello World" << std::endl; _done = true; @@ -34,37 +53,60 @@ class TimerManagerTests { _monitor.notifyAll(); } } - + + + long long _timeout; + long long _startTime; + long long _endTime; Monitor& _monitor; + bool _success; bool _done; }; public: - bool test00() { + /** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that + the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its + destructor is called. */ - TimerManager* timerManager = new TimerManager(); + bool test00(long long timeout=1000LL) { - timerManager->threadFactory(new PosixThreadFactory()); + TimerManagerTests::Task* orphanTask = new TimerManagerTests::Task(_monitor, 10 * timeout); - timerManager->start(); + { - assert(timerManager->state() == TimerManager::STARTED); + TimerManager timerManager; + + timerManager.threadFactory(new PosixThreadFactory()); + + timerManager.start(); + + assert(timerManager.state() == TimerManager::STARTED); + + TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor, timeout); + + {Synchronized s(_monitor); + + timerManager.add(orphanTask, 10 * timeout); + + timerManager.add(task, timeout); + + _monitor.wait(); + } - TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor); + assert(task->_done); - {Synchronized s(_monitor); - timerManager->add(task, 1000LL); + std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl; - _monitor.wait(); + delete task; } - assert(task->_done); + // timerManager.stop(); This is where it happens via destructor - delete task; + assert(!orphanTask->_done); - std::cout << "\t\t\tSuccess!" << std::endl; + delete orphanTask; return true; } -- 2.17.1