From: Marc Slemko Date: Thu, 20 Jul 2006 21:16:27 +0000 (+0000) Subject: More test code added... X-Git-Tag: 0.2.0~1743 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=fe5ba12e4a014935defac0464def9ad1986ad572;p=common%2Fthrift.git More test code added... more bugs found facebook::thrift::concurrency::ThreadManager::add Fixed dispatch error that resulted in only one of N worker threads ever getting notified of work facebook::thrift::concurrency::ThreadManager Cleaned up addWorker/removeWorker and stop logic so that adding/removing workers doesn't wake up all blocked workers. facebook::thrift::concurrency::Thread facebook::thrift::concurrency::Runnable Fixed initialization logic so that runnable can return the thread that runs it git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664729 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index 3587c034..00b2613d 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -36,9 +36,6 @@ private: int _stackSize; - Runnable* _runnable; - - public: PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) : @@ -46,9 +43,10 @@ public: _state(uninitialized), _policy(policy), _priority(priority), - _stackSize(stackSize), - _runnable(runnable) - {} + _stackSize(stackSize) { + + this->Thread::runnable(runnable); + } void start() { @@ -92,7 +90,9 @@ public: } } - Runnable* runnable() const {return _runnable;} + Runnable* runnable() const {return Thread::runnable();} + + void runnable(Runnable* value) {Thread::runnable(value);} }; @@ -107,7 +107,7 @@ void* PthreadThread::threadMain(void* arg) { thread->_state = starting; - thread->_runnable->run(); + thread->runnable()->run(); if(thread->_state != stopping && thread->_state != stopped) { thread->_state = stopping; diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index 8237091a..2416887e 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -17,6 +17,18 @@ class Runnable { virtual ~Runnable() {}; virtual void run() = 0; + + virtual Thread* thread() {return _thread;} + + private: + + /** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */ + + friend class Thread; + + virtual void thread(Thread* value) {_thread = value;} + + Thread* _thread; }; /** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread @@ -43,7 +55,15 @@ class Thread { /** Gets the runnable object this thread is hosting */ - virtual Runnable* runnable() const = 0; + virtual Runnable* runnable() const {return _runnable;} + + protected: + + virtual void runnable(Runnable* value, bool x=false) {_runnable = value; _runnable->thread(this);} + + private: + + Runnable* _runnable; }; /** Factory to create platform-specific thread object and bind them to Runnable object for execution */ diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc index 0aa4bef4..ca2bbb5f 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cc +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -8,6 +8,7 @@ namespace facebook { namespace thrift { namespace concurrency { + /** ThreadManager class This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather @@ -20,19 +21,18 @@ class ThreadManager::Impl : public ThreadManager { public: - Impl() : _stopped(false) {} - + Impl() : _state(ThreadManager::UNINITIALIZED) {} + ~Impl() {stop();} - ~Impl() { - - if(!_stopped) { - stop(); - } - } + void start(); void stop(); + const ThreadManager::STATE state() const { + return _state; + }; + const ThreadFactory* threadFactory() const { Synchronized s(_monitor); @@ -82,9 +82,11 @@ private: size_t _workerCount; + size_t _workerMaxCount; + size_t _idleCount; - bool _stopped; + ThreadManager::STATE _state; const ThreadFactory* _threadFactory; @@ -94,9 +96,13 @@ private: Monitor _monitor; + Monitor _workerMonitor; + friend class ThreadManager::Worker; std::set _workers; + + std::set _deadWorkers; }; class ThreadManager::Task : public Runnable { @@ -133,6 +139,7 @@ public: }; class ThreadManager::Worker: public Runnable { + enum STATE { UNINITIALIZED, STARTING, @@ -142,6 +149,7 @@ class ThreadManager::Worker: public Runnable { }; public: + Worker(ThreadManager::Impl* manager) : _manager(manager), _state(UNINITIALIZED), @@ -150,59 +158,94 @@ class ThreadManager::Worker: public Runnable { ~Worker() {} + bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;} + /** Worker entry point As long as worker thread is running, pull tasks off the task queue and execute. */ void run() { + bool active = false; + + bool notifyManager = false; + + /** Increment worker semaphore and notify manager if worker count reached desired max + + Note + We have to release the monitor and acquire the workerMonitor since that is what the manager + blocks on for worker add/remove */ + {Synchronized s(_manager->_monitor); - if(_state == STARTING) { - _state = STARTED; + active = _manager->_workerCount < _manager->_workerMaxCount; + + if(active) { + + _manager->_workerCount++; + + notifyManager = _manager->_workerCount == _manager->_workerMaxCount; } + } + + if(notifyManager) { - _manager->_workerCount++; + Synchronized s(_manager->_workerMonitor); - _manager->_monitor.notifyAll(); + _manager->_workerMonitor.notify(); + + notifyManager = false; } - do { + while(active) { ThreadManager::Task* task = NULL; /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop). - Once the queue is non-empty, dequeue a task, release monitor, and execute. */ + Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented + such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying + the next blocked thread but eventually the manager will see it. */ {Synchronized s(_manager->_monitor); - while(_state == STARTED && _manager->_tasks.empty()) { + active = isActive(); + + while(active && _manager->_tasks.empty()) { _manager->_idleCount++; _idle = true; _manager->_monitor.wait(); + + active = isActive(); _idle = false; _manager->_idleCount--; } - - if(_state == STARTED) { + + if(active) { if(!_manager->_tasks.empty()) { task = _manager->_tasks.front(); _manager->_tasks.pop(); - + if(task->_state == ThreadManager::Task::WAITING) { - + task->_state = ThreadManager::Task::EXECUTING; } } + } else { + + _idle = true; + + _manager->_workerCount--; + + notifyManager = _manager->_workerCount == _manager->_workerMaxCount; } } @@ -212,31 +255,29 @@ class ThreadManager::Worker: public Runnable { try { task->run(); - + } catch(...) { // XXX need to log this } delete task; + + task = NULL; } } - - } while(_state == STARTED); - - {Synchronized s(_manager->_monitor); - - _manager->_workerCount--; - - if(_state == STOPPING) { + } - _state = STOPPED; + {Synchronized s(_manager->_workerMonitor); - _manager->_monitor.notify(); + _manager->_deadWorkers.insert(this->thread()); + + if(notifyManager) { + _manager->_workerMonitor.notify(); } } - + return; } @@ -263,6 +304,13 @@ void ThreadManager::Impl::addWorker(size_t value) { newThreads.insert(_threadFactory->newThread(worker)); } + + {Synchronized s(_monitor); + + _workerMaxCount+= value; + + _workers.insert(newThreads.begin(), newThreads.end()); + } for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { @@ -273,11 +321,33 @@ void ThreadManager::Impl::addWorker(size_t value) { (*ix)->start(); } - {Synchronized s(_monitor); + {Synchronized s(_workerMonitor); - _workers.insert(newThreads.begin(), newThreads.end()); + while(_workerCount != _workerMaxCount) { + _workerMonitor.wait(); + } + } +} + +void ThreadManager::Impl::start() { + + if(_state == ThreadManager::STOPPED) { + return; + } + + {Synchronized s(_monitor); + + if(_state == ThreadManager::UNINITIALIZED) { + + if(_threadFactory == NULL) {throw InvalidArgumentException();} + + _state = ThreadManager::STARTED; + + _monitor.notifyAll(); + } + + while(_state == STARTING) { - while(_workerCount != _workers.size()) { _monitor.wait(); } } @@ -287,78 +357,90 @@ void ThreadManager::Impl::stop() { bool doStop = false; + if(_state == ThreadManager::STOPPED) { + return; + } + {Synchronized s(_monitor); - if(!_stopped) { + if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) { + doStop = true; - _stopped = true; + + _state = ThreadManager::STOPPING; } } if(doStop) { + removeWorker(_workerCount); + + _state = ThreadManager::STOPPING; } + + // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid + } void ThreadManager::Impl::removeWorker(size_t value) { std::set removedThreads; - {Synchronized s(_monitor); - - /* Overly clever loop - - First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0) - and remove a sufficient number of busy threads. */ - - for(int idleOnly = 1; idleOnly >= 0; idleOnly--) { - - for(std::set::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) { + {Synchronized s(_monitor); - Worker* worker = (Worker*)(*workerThread)->runnable(); + if(value > _workerMaxCount) { - if(worker->_idle || !idleOnly) { + throw InvalidArgumentException(); + } - if(worker->_state == ThreadManager::Worker::STARTED) { + _workerMaxCount-= value; - worker->_state = ThreadManager::Worker::STOPPING; - } + if(_idleCount < value) { + + for(size_t ix = 0; ix < _idleCount; ix++) { - removedThreads.insert(*workerThread); - - _workers.erase(workerThread); - } - } + _monitor.notify(); } - + } else { + _monitor.notifyAll(); } + } - - // Join removed threads and free worker + {Synchronized s(_workerMonitor); - for(std::set::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) { + while(_workerCount != _workerMaxCount) { + _workerMonitor.wait(); + } - Worker* worker = (Worker*)(*workerThread)->runnable(); + for(std::set::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { - (*workerThread)->join(); - - delete worker; + _workers.erase(*ix); + + delete (*ix)->runnable(); + + delete (*ix); } + + _deadWorkers.clear(); } +} void ThreadManager::Impl::add(Runnable* value) { Synchronized s(_monitor); - bool isEmpty = _tasks.empty(); + if(_state != ThreadManager::STARTED) { + + throw IllegalStateException(); + } _tasks.push(new ThreadManager::Task(value)); - /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this + /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this task in time. */ - if(isEmpty && _idleCount > 0) { + if(_idleCount > 0) { _monitor.notify(); } @@ -367,6 +449,11 @@ void ThreadManager::Impl::add(Runnable* value) { void ThreadManager::Impl::remove(Runnable* task) { Synchronized s(_monitor); + + if(_state != ThreadManager::STARTED) { + + throw IllegalStateException(); + } } class SimpleThreadManager : public ThreadManager::Impl { @@ -378,26 +465,10 @@ public: _firstTime(true) { } - void add(Runnable* task) { - - bool addWorkers = false; - - {Synchronized s(_monitor); - - if(_firstTime) { - - _firstTime = false; - - addWorkers = true; - } - } - - if(addWorkers) { - - addWorker(_workerCount); - } + void start() { + ThreadManager::Impl::start(); - Impl::add(task); + addWorker(_workerCount); } private: diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index 596471d4..aa5a98a9 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -32,6 +32,26 @@ class ThreadManager { virtual ~ThreadManager() {} + /** Starts the thread manager. Verifies all attributes have been properly initialized, then allocates necessary resources to begin operation */ + + virtual void start() = 0; + + /** Stops the thread manager. Aborts all remaining unprocessed task, shuts down all created worker threads, and realeases all allocated resources. + This method blocks for all worker threads to complete, thus it can potentially block forever if a worker thread is running a task that + won't terminate. */ + + virtual void stop() = 0; + + enum STATE { + UNINITIALIZED, + STARTING, + STARTED, + STOPPING, + STOPPED + }; + + virtual const STATE state() const = 0; + virtual const ThreadFactory* threadFactory() const = 0; virtual void threadFactory(const ThreadFactory* value) = 0; diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc index d139f55c..2174bf43 100644 --- a/lib/cpp/src/concurrency/test/Tests.cc +++ b/lib/cpp/src/concurrency/test/Tests.cc @@ -30,13 +30,13 @@ int main(int argc, char** argv) { assert(threadFactoryTests.helloWorldTest()); - size_t count = 10000; + size_t count = 1000; std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl; assert(threadFactoryTests.reapNThreads(count)); - std::cout << "\t\tThreadFactory synchrous start test" << std::endl; + std::cout << "\t\tThreadFactory synchronous start test" << std::endl; assert(threadFactoryTests.synchStartTest()); } @@ -56,11 +56,17 @@ int main(int argc, char** argv) { std::cout << "ThreadManager tests..." << std::endl; - std::cout << "\t\tThreadManager test00" << std::endl; + size_t workerCount = 10; + + size_t taskCount = 10000; + + long long delay = 10LL; + + std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl; ThreadManagerTests threadManagerTests; - assert(threadManagerTests.test00()); + assert(threadManagerTests.loadTest(taskCount, delay, workerCount)); } } diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h index aad63320..72d67771 100644 --- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -28,27 +28,35 @@ public: _monitor(monitor), _count(count), _timeout(timeout), - _addTime(Util::currentTime()), - _success(false), _done(false) {} void run() { - _startTime = Util::currentTime(); - Monitor sleep; {Synchronized s(sleep); + long long time00 = Util::currentTime(); + sleep.wait(_timeout); - } - _endTime = Util::currentTime(); + long long time01 = Util::currentTime(); + + double error = ((time01 - time00) - _timeout) / (double)_timeout; + + if(error < 0.0) { + + error*= -1.0; + } + + if(error > .10) { + + assert(false); + } + } _done = true; - _success = true; - {Synchronized s(_monitor); // std::cout << "Thread " << _count << " completed " << std::endl; @@ -65,17 +73,13 @@ public: Monitor& _monitor; size_t& _count; long long _timeout; - long long _addTime; - long long _startTime; - long long _endTime; - bool _success; bool _done; }; /** Dispatch count tasks, each of which blocks for timeout milliseconds then completes. Verify that all tasks completed and that thread manager cleans up properly on delete. */ - bool test00(size_t count=100, long long timeout=100LL, size_t workerCount=4) { + bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) { Monitor monitor; @@ -84,6 +88,8 @@ public: ThreadManager* threadManager = ThreadManager::newSimpleThreadManager(workerCount); threadManager->threadFactory(new PosixThreadFactory()); + + threadManager->start(); std::set tasks; @@ -92,6 +98,8 @@ public: tasks.insert(new ThreadManagerTests::Task(monitor, activeCount, timeout)); } + long long time00 = Util::currentTime(); + for(std::set::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { threadManager->add(*ix); @@ -105,19 +113,27 @@ public: } } - bool success; + long long time01 = Util::currentTime(); for(std::set::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { - success = success || (*ix)->_success; - delete *ix; } + double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout; + + double error = ((time01 - time00) - expectedTime) / expectedTime; + + if(error < 0) { + error*= -1.0; + } + + bool success = error < .10; + delete threadManager; - std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl; + std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl; return true; }