From 8a40a76b271f376a45202d1569d819d12f3940cd Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Wed, 19 Jul 2006 17:46:50 +0000 Subject: [PATCH] Added thread factory test - problems in thread Fixed stupid typo in TimerManager::start git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664723 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/concurrency/Monitor.cc | 14 +- lib/cpp/src/concurrency/PosixThreadFactory.cc | 46 ++-- lib/cpp/src/concurrency/ThreadManager.cc | 6 +- lib/cpp/src/concurrency/TimerManager.cc | 165 +++++++++---- lib/cpp/src/concurrency/TimerManager.h | 51 +++- lib/cpp/src/concurrency/test/Tests.cc | 52 ++++ .../src/concurrency/test/ThreadFactoryTests.h | 228 ++++++++++++++++++ .../src/concurrency/test/TimerManagerTests.cc | 33 --- .../src/concurrency/test/TimerManagerTests.h | 81 +++++++ 9 files changed, 560 insertions(+), 116 deletions(-) create mode 100644 lib/cpp/src/concurrency/test/Tests.cc create mode 100644 lib/cpp/src/concurrency/test/ThreadFactoryTests.h delete mode 100644 lib/cpp/src/concurrency/test/TimerManagerTests.cc create mode 100644 lib/cpp/src/concurrency/test/TimerManagerTests.h diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc index a2e42760..f6144ba7 100644 --- a/lib/cpp/src/concurrency/Monitor.cc +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -1,8 +1,12 @@ #include "Monitor.h" +#include "Exception.h" #include "Util.h" #include #include + +#include + #include @@ -57,19 +61,19 @@ class Monitor::Impl { if(timeout == 0LL) { - pthread_cond_wait(&_pthread_cond, &_pthread_mutex); + assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0); } else { struct timespec abstime; - Util::toAbsoluteTimespec(abstime, timeout); - int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); if(result == ETIMEDOUT) { - // XXX If result is timeout need to throw timeout exception + // XXX Add assert once currentTime is fixed to have ms resolution or better + + // assert(Util::currentTime() >= (now + timeout)); } } } @@ -101,7 +105,7 @@ private: Monitor::Monitor() : _impl(new Monitor::Impl()) {} - Monitor::~Monitor() { delete _impl;} +Monitor::~Monitor() { delete _impl;} void Monitor::lock() const {_impl->lock();} diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index e7f84cdd..bac122de 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -22,6 +22,8 @@ public: static const int MB = 1024 * 1024; + static void* threadMain(void* arg); + private: pthread_t _pthread; @@ -36,26 +38,6 @@ private: Runnable* _runnable; - static void* threadMain(void* arg) { - - // XXX need a lock here when testing thread state - - PthreadThread* thread = (PthreadThread*)arg; - - if(thread->_state != starting) { - return (void*)0; - } - - thread->_state = starting; - - thread->_runnable->run(); - - if(thread->_state != stopping && thread->_state != stopped) { - thread->_state = stopping; - } - - return (void*)0; - } public: @@ -95,9 +77,9 @@ public: // Set thread priority - assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); + // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); - assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0); + assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0); } void join() { @@ -114,6 +96,26 @@ public: }; +void* PthreadThread::threadMain(void* arg) { + // XXX need a lock here when testing thread state + + PthreadThread* thread = (PthreadThread*)arg; + + if(thread->_state != starting) { + return (void*)0; + } + + thread->_state = starting; + + thread->_runnable->run(); + + if(thread->_state != stopping && thread->_state != stopped) { + thread->_state = stopping; + } + + return (void*)0; +} + /** POSIX Thread factory implementation */ class PosixThreadFactory::Impl { diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc index b5d02e64..d13ce7ba 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cc +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -176,7 +176,7 @@ class ThreadManager::Worker: public Runnable { void run() { - {Synchronized(_manager->_monitor); + {Synchronized s(_manager->_monitor); if(_state == STARTING) { _state = STARTED; @@ -191,7 +191,7 @@ class ThreadManager::Worker: public Runnable { Once the queue is non-empty, dequeue a task, release monitor, and execute. */ - {Synchronized(_manager->_monitor); + {Synchronized s(_manager->_monitor); while(_state == STARTED && _manager->_tasks.empty()) { @@ -221,7 +221,7 @@ class ThreadManager::Worker: public Runnable { } while(_state == STARTED); - {Synchronized(_manager->_monitor); + {Synchronized s(_manager->_monitor); if(_state == STOPPING) { diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc index bd68264c..015ffba5 100644 --- a/lib/cpp/src/concurrency/TimerManager.cc +++ b/lib/cpp/src/concurrency/TimerManager.cc @@ -1,8 +1,9 @@ #include "TimerManager.h" +#include "Exception.h" #include "Util.h" #include - +#include #include namespace facebook { namespace thrift { namespace concurrency { @@ -43,24 +44,19 @@ public: Runnable* _runnable; + class TimerManager::Dispatcher; + + friend class TimerManager::Dispatcher; + STATE _state; }; class TimerManager::Dispatcher: public Runnable { - enum STATE { - UNINITIALIZED, - STARTING, - STARTED, - STOPPING, - STOPPED - }; - public: Dispatcher(TimerManager* manager) : - _manager(manager), - _state(UNINITIALIZED) - {} + _manager(manager) { +} ~Dispatcher() {} @@ -70,10 +66,13 @@ public: void run() { - {Synchronized(_manager->_monitor); + {Synchronized s(_manager->_monitor); + + if(_manager->_state == TimerManager::STARTING) { + + _manager->_state = TimerManager::STARTED; - if(_state == STARTING) { - _state = STARTED; + _manager->_monitor.notifyAll(); } } @@ -81,26 +80,38 @@ public: std::set expiredTasks; - {Synchronized(_manager->_monitor); - - long long now = Util::currentTime(); + {Synchronized s(_manager->_monitor); + + /* Update next timeout if necessary */ task_iterator expiredTaskEnd; - - while(_state == STARTED && - (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) { - - _manager->_monitor.wait(_manager->_nextTimeout - now); + + while(_manager->_state == TimerManager::STARTED && + (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) { + + long long timeout = 0LL; + + if(!_manager->_taskMap.empty()) { + + timeout = Util::currentTime() - _manager->_taskMap.begin()->first; + } + + _manager->_monitor.wait(timeout); } - if(_state == STARTED) { + if(_manager->_state == TimerManager::STARTED) { for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { TimerManager::Task* task = ix->second; expiredTasks.insert(task); + + if(task->_state == TimerManager::Task::WAITING) { + + task->_state = TimerManager::Task::EXECUTING; + } _manager->_taskCount--; } @@ -116,13 +127,13 @@ public: delete *ix; } - } while(_state == STARTED); + } while(_manager->_state == TimerManager::STARTED); - {Synchronized(_manager->_monitor); + {Synchronized s(_manager->_monitor); - if(_state == STOPPING) { + if(_manager->_state == TimerManager::STOPPING) { - _state = STOPPED; + _manager->_state = TimerManager::STOPPED; _manager->_monitor.notify(); @@ -137,13 +148,73 @@ public: TimerManager* _manager; friend class TimerManager; - - STATE _state; }; -TimerManager::TimerManager() {} +TimerManager::TimerManager() : + _state(TimerManager::UNINITIALIZED), + _dispatcher(new Dispatcher(this)) { +} + + +TimerManager::~TimerManager() { + delete _dispatcher; +} + +void TimerManager::start() { + + bool doStart = false; + + {Synchronized s(_monitor); + + if(_threadFactory == NULL) {throw InvalidArgumentException();} + + if(_state == TimerManager::UNINITIALIZED) { + + _state = TimerManager::STARTING; + + doStart = true; + } + } + + if(doStart) { + + _dispatcherThread = _threadFactory->newThread(_dispatcher); + + _dispatcherThread->start(); + } + + {Synchronized s(_monitor); + + while(_state == TimerManager::STARTING) { + + _monitor.wait(); + } + + assert(_state != TimerManager::STARTING); + } +} + +void TimerManager::stop() { + + {Synchronized s(_monitor); + + if(_state == TimerManager::UNINITIALIZED) { + + _state = TimerManager::STOPPED; + + } else if(_state != STOPPING && _state != STOPPED) { + + _state = STOPPING; + + _monitor.notifyAll(); + } -TimerManager::~TimerManager() {} + while(_state != STOPPED) { + + _monitor.wait(); + } + } +} const ThreadFactory* TimerManager::threadFactory() const { @@ -159,6 +230,11 @@ void TimerManager::threadFactory(const ThreadFactory* value) { _threadFactory = value; } +size_t TimerManager::taskCount() const { + + return _taskCount; +} + void TimerManager::add(Runnable* task, long long timeout) { long long now = Util::currentTime(); @@ -167,6 +243,10 @@ void TimerManager::add(Runnable* task, long long timeout) { {Synchronized s(_monitor); + if(_state != TimerManager::STARTED) { + throw IllegalStateException(); + } + _taskCount++; _taskMap.insert(std::pair(timeout, new Task(task))); @@ -174,15 +254,10 @@ void TimerManager::add(Runnable* task, long long timeout) { /* If the task map was empty, or if we have an expiration that is earlier than any previously seen, kick the dispatcher so it can update its timeout */ - if(_taskCount == 1 || timeout < _nextTimeout) { + if(_taskCount == 1 || timeout < _taskMap.begin()->first) { _monitor.notify(); } - - if(timeout < _nextTimeout) { - - _nextTimeout = timeout; - } } } @@ -192,20 +267,26 @@ void TimerManager::add(Runnable* task, const struct timespec& value) { Util::toMilliseconds(expiration, value); - /* XXX - Need to convert this to an explicit exception */ - long long now = Util::currentTime(); - assert(expiration < now); + if(expiration < now) { + throw InvalidArgumentException(); + } add(task, expiration - now); } void TimerManager::remove(Runnable* task) { + {Synchronized s(_monitor); + if(_state != TimerManager::STARTED) { + throw IllegalStateException(); + } + } } +const TimerManager::STATE TimerManager::state() const { return _state;} + }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h index 002460ad..2681a5af 100644 --- a/lib/cpp/src/concurrency/TimerManager.h +++ b/lib/cpp/src/concurrency/TimerManager.h @@ -1,6 +1,7 @@ #if !defined(_concurrency_TimerManager_h_) #define _concurrency_TimerManager_h_ 1 +#include "Exception.h" #include "Monitor.h" #include "Thread.h" @@ -9,7 +10,7 @@ #include namespace facebook { namespace thrift { namespace concurrency { - + /** Timer Manager This class dispatches timer tasks when they fall due. @@ -23,31 +24,56 @@ class TimerManager { TimerManager(); - virtual ~TimerManager() = 0; + virtual ~TimerManager(); + + virtual const ThreadFactory* threadFactory() const; + + virtual void threadFactory(const ThreadFactory* value); - virtual const ThreadFactory* threadFactory() const = 0; + /** Starts the timer manager service - virtual void threadFactory(const ThreadFactory* value) = 0; + @throws IllegalArgumentException Missing thread factory attribute */ - virtual size_t taskCount() const = 0; + virtual void start(); + + /** Stops the timer manager service */ + + virtual void stop(); + + virtual size_t taskCount() const ; /** Adds a task to be executed at some time in the future by a worker thread. @param task The task to execute @param timeout Time in milliseconds to delay before executing task */ - virtual void add(Runnable* task, long long timeout) = 0; + virtual void add(Runnable* task, long long timeout); /** Adds a task to be executed at some time in the future by a worker thread. @param task The task to execute @param timeout Absolute time in the future to execute task. */ - virtual void add(Runnable* task, const struct timespec& timeout) = 0; + virtual void add(Runnable* task, const struct timespec& timeout); + + /** Removes a pending task - /** Removes a pending task */ + @throws NoSuchTaskException Specified task doesn't exist. It was either processed already or this call was made for a task that + was never added to this timer - virtual void remove(Runnable* task) = 0; + @throws UncancellableTaskException Specified task is already being executed or has completed execution. */ + + virtual void remove(Runnable* task); + + enum STATE { + UNINITIALIZED = 1000, + STARTING = 1001, + STARTED = 1002, + STOPPING = 1003, + STOPPED = 1004 + }; + + virtual const STATE state() const; private: @@ -61,15 +87,18 @@ class TimerManager { size_t _taskCount; - long long _nextTimeout; - Monitor _monitor; + STATE _state; + class Dispatcher; friend class Dispatcher; Dispatcher* _dispatcher; + + Thread* _dispatcherThread; + }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc new file mode 100644 index 00000000..36f29ff0 --- /dev/null +++ b/lib/cpp/src/concurrency/test/Tests.cc @@ -0,0 +1,52 @@ +#include +#include + +#include "ThreadFactoryTests.h" +#include "TimerManagerTests.h" + +int main(int argc, char** argv) { + + std::string arg; + + if(argc < 2) { + + arg = "all"; + + } else { + + arg = std::string(argv[1]); + } + + bool runAll = arg.compare("all") == 0; + + if(runAll || arg.compare("thread-factory") == 0) { + + ThreadFactoryTests threadFactoryTests; + + std::cout << "ThreadFactory tests..." << std::endl; + + std::cout << "\tThreadFactory hello-world test" << std::endl; + + assert(threadFactoryTests.helloWorldTest()); + + std::cout << "\t\tThreadFactory reap N threads test: N = 100" << std::endl; + + assert(threadFactoryTests.reapNThreads(100)); + + std::cout << "\t\tThreadFactory synchrous start test" << std::endl; + + assert(threadFactoryTests.synchStartTest()); + } + + if(runAll || arg.compare("timer-manager") == 0) { + + std::cout << "TimerManager tests..." << std::endl; + + std::cout << "\t\tTimerManager test00" << std::endl; + + TimerManagerTests timerManagerTests; + + assert(timerManagerTests.test00()); + } +} + diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h new file mode 100644 index 00000000..0d935648 --- /dev/null +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -0,0 +1,228 @@ +#include +#include +#include + +#include +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { namespace test { + +using namespace facebook::thrift::concurrency; + +/** ThreadManagerTests class + + @author marc + @version $Id:$ */ + +class ThreadFactoryTests { + + class Task: public Runnable { + + public: + + Task() {} + + void run() { + std::cout << "\t\t\tHello World" << std::endl; + } + }; + +public: + + /** Hello world test */ + + bool helloWorldTest() { + + PosixThreadFactory threadFactory = PosixThreadFactory(); + + Task* task = new ThreadFactoryTests::Task(); + + Thread* thread = threadFactory.newThread(task); + + thread->start(); + + thread->join(); + + delete thread; + + delete task; + + std::cout << "\t\t\tSuccess!" << std::endl; + + return true; + } + + /** Reap N threads */ + + class ReapNTask: public Runnable { + + public: + + ReapNTask(Monitor& monitor, int& activeCount) : + _monitor(monitor), + _count(activeCount) { + } + + void run() { + + {Synchronized s(_monitor); + + _count--; + + //std::cout << "\t\t\tthread count: " << _count << std::endl; + + if(_count == 0) { + _monitor.notify(); + } + } + } + + Monitor& _monitor; + + int& _count; + }; + + bool reapNThreads(int count=100) { + + Monitor* monitor = new Monitor(); + + int* activeCount = new int(count); + + PosixThreadFactory threadFactory = PosixThreadFactory(); + + std::set threads; + + for(int ix = 0; ix < count; ix++) { + threads.insert(threadFactory.newThread(new ReapNTask(*monitor, *activeCount))); + } + + for(std::set::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + + (*thread)->start(); + } + + + {Synchronized s(*monitor); + + while(*activeCount > 0) { + monitor->wait(1000); + } + } + + for(std::set::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + + delete (*thread)->runnable(); + + delete *thread; + } + + std::cout << "\t\t\tSuccess!" << std::endl; + + return true; + } + + class SynchStartTask: public Runnable { + + public: + + enum STATE { + UNINITIALIZED = 1000, + STARTING = 1001, + STARTED = 1002, + STOPPING = 1003, + STOPPED = 1004 + }; + + SynchStartTask(Monitor& monitor, + volatile STATE& state) : + _monitor(monitor), + _state(state) { + } + + void run() { + + {Synchronized s(_monitor); + + if(_state == SynchStartTask::STARTING) { + _state = SynchStartTask::STARTED; + _monitor.notify(); + } + } + + {Synchronized s(_monitor); + + while(_state == SynchStartTask::STARTED) { + _monitor.wait(); + } + + if(_state == SynchStartTask::STOPPING) { + + _state = SynchStartTask::STOPPED; + + _monitor.notifyAll(); + } + } + } + + private: + Monitor& _monitor; + volatile STATE& _state; + }; + + bool synchStartTest() { + + Monitor monitor; + + SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED; + + SynchStartTask* task = new SynchStartTask(monitor, state); + + PosixThreadFactory threadFactory = PosixThreadFactory(); + + Thread* thread = threadFactory.newThread(task); + + if(state == SynchStartTask::UNINITIALIZED) { + + state = SynchStartTask::STARTING; + + thread->start(); + } + + {Synchronized s(monitor); + + while(state == SynchStartTask::STARTING) { + monitor.wait(); + } + } + + assert(state != SynchStartTask::STARTING); + + {Synchronized s(monitor); + + monitor.wait(100); + + if(state == SynchStartTask::STARTED) { + + state = SynchStartTask::STOPPING; + + monitor.notify(); + } + + while(state == SynchStartTask::STOPPING) { + monitor.wait(); + } + } + + assert(state == SynchStartTask::STOPPED); + + return true; + } + +}; + + +}}}} // facebook::thrift::concurrency + +using namespace facebook::thrift::concurrency::test; + diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.cc b/lib/cpp/src/concurrency/test/TimerManagerTests.cc deleted file mode 100644 index abd0e95f..00000000 --- a/lib/cpp/src/concurrency/test/TimerManagerTests.cc +++ /dev/null @@ -1,33 +0,0 @@ -#include -#include - -#include - -namespace facebook { namespace thrift { namespace concurrency { namespace test { - -/** ThreadManagerTests class */ - - -class ThreadManagerTests { - - void init() { - - ThreadManager* threadManager = ThreadManager::newThreadManager(); - - threadManager->poolPolicy(new BasicPoolPolicy()); - - threadManager->threadFactory(new PosixThreadFactory()); - - threadManager->poolPolicy(new BasicPoolPolicy()); - } -}; - - -}}}} // facebook::thrift::concurrency - -int main(int argc, char** argv) { - - return 0; - -} - diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h new file mode 100644 index 00000000..24f79643 --- /dev/null +++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h @@ -0,0 +1,81 @@ +#include +#include +#include + +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { namespace test { + +using namespace facebook::thrift::concurrency; + +/** ThreadManagerTests class + + @author marc + @version $Id:$ */ + +class TimerManagerTests { + + class Task: public Runnable { + + public: + + Task(Monitor& monitor) : + _monitor(monitor), + _done(false) {} + + void run() { + + std::cout << "\t\t\tHello World" << std::endl; + + _done = true; + + {Synchronized s(_monitor); + _monitor.notifyAll(); + } + } + + Monitor& _monitor; + bool _done; + }; + +public: + + bool test00() { + + TimerManager* timerManager = new TimerManager(); + + timerManager->threadFactory(new PosixThreadFactory()); + + timerManager->start(); + + assert(timerManager->state() == TimerManager::STARTED); + + TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor); + + {Synchronized s(_monitor); + + timerManager->add(task, 1000LL); + + _monitor.wait(); + } + + assert(task->_done); + + delete task; + + std::cout << "\t\t\tSuccess!" << std::endl; + + return true; + } + + friend class TestTask; + + Monitor _monitor; +}; + + +}}}} // facebook::thrift::concurrency + +using namespace facebook::thrift::concurrency::test; + -- 2.17.1