From: Marc Slemko Date: Tue, 22 May 2007 23:59:54 +0000 (+0000) Subject: Modified facebook::thrift::concurrency::Monitor.wait: X-Git-Tag: 0.2.0~1352 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=3a3b53bc6eefa91ea60e5da2b2c91519ca1d9b2e;p=common%2Fthrift.git Modified facebook::thrift::concurrency::Monitor.wait: Throw TimedOutException on wait timeout so caller can distinguish between timeout and event. Modified facebook::thrift::concurrency::PthreadThread.start: Throw SystemrResourceException on any pthread_* function call failure rather than asserting 0. Added facebook::thrift::concurrency::Thread.id() and facebook::thrift::concurrency::ThreadFactory.currentThreadId(): Return thread-id of thread and current thread respectively. Needed for reentrancy tests in ThreadManager Added facebook::thrift::concurrency::ThreadManager.pendingTaskCountMaxN Modified facebook::thrift::concurrency::ThreadManager.add(): Now support a maximum pending task count and block if the current pending task count is max. If timeout is specified for add, TimedOutException is thrown if pending task count doesn't decrease in the timeout interval. If add() is called by a ThreadManager worker thread and the task cannot be added, a TooManyPendingTasksException is thrown rather than blocking, since deadlocks can ensue if worker threads block waiting for works threads to complete tasks. Reviewed By: mcslee, aditya Revert Plan: revertible Test Plan: concurrency/test/ThreadManagerTests.h run concurrency-test thread-manager git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665120 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h index 7735b2c3..735cd878 100644 --- a/lib/cpp/src/concurrency/Exception.h +++ b/lib/cpp/src/concurrency/Exception.h @@ -10,7 +10,7 @@ #include #include -namespace facebook { namespace thrift { namespace concurrency { +namespace facebook { namespace thrift { namespace concurrency { class NoSuchTaskException : public facebook::thrift::TException {}; @@ -22,6 +22,16 @@ class IllegalStateException : public facebook::thrift::TException {}; class TimedOutException : public facebook::thrift::TException {}; +class TooManyPendingTasksException : public facebook::thrift::TException {}; + +class SystemResourceException : public facebook::thrift::TException { +public: + SystemResourceException() {} + + SystemResourceException(const std::string& message) : + TException(message) {} +}; + }}} // facebook::thrift::concurrency #endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ diff --git a/lib/cpp/src/concurrency/Monitor.cpp b/lib/cpp/src/concurrency/Monitor.cpp index 2443a6eb..0177a0c0 100644 --- a/lib/cpp/src/concurrency/Monitor.cpp +++ b/lib/cpp/src/concurrency/Monitor.cpp @@ -4,8 +4,8 @@ // See accompanying file LICENSE or visit the Thrift site at: // http://developers.facebook.com/thrift/ -#include "Monitor.h" -#include "Exception.h" +#include "Monitor.h" +#include "Exception.h" #include "Util.h" #include @@ -15,11 +15,11 @@ #include -namespace facebook { namespace thrift { namespace concurrency { +namespace facebook { namespace thrift { namespace concurrency { /** * Monitor implementation using the POSIX pthread library - * + * * @author marc * @version $Id:$ */ @@ -30,16 +30,18 @@ class Monitor::Impl { Impl() : mutexInitialized_(false), condInitialized_(false) { - - try { - int ret = pthread_mutex_init(&pthread_mutex_, NULL); - assert(ret == 0); + + if(pthread_mutex_init(&pthread_mutex_, NULL) == 0) { mutexInitialized_ = true; - ret = pthread_cond_init(&pthread_cond_, NULL); - assert(ret == 0); - condInitialized_ = true; - } catch(...) { + + if(pthread_cond_init(&pthread_cond_, NULL) == 0) { + condInitialized_ = true; + } + } + + if(!mutexInitialized_ || !condInitialized_) { cleanup(); + throw SystemResourceException(); } } @@ -65,6 +67,7 @@ class Monitor::Impl { &abstime); if (result == ETIMEDOUT) { assert(Util::currentTime() >= (now + timeout)); + throw TimedOutException(); } } } diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h index 4c62e78e..b6a9f715 100644 --- a/lib/cpp/src/concurrency/Monitor.h +++ b/lib/cpp/src/concurrency/Monitor.h @@ -7,7 +7,9 @@ #ifndef _THRIFT_CONCURRENCY_MONITOR_H_ #define _THRIFT_CONCURRENCY_MONITOR_H_ 1 -namespace facebook { namespace thrift { namespace concurrency { +#include "Exception.h" + +namespace facebook { namespace thrift { namespace concurrency { /** * A monitor is a combination mutex and condition-event. Waiting and @@ -50,7 +52,7 @@ class Monitor { class Synchronized { public: - + Synchronized(const Monitor& value) : monitor_(value) { monitor_.lock(); diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index 675e95d2..e4334672 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -5,6 +5,7 @@ // http://developers.facebook.com/thrift/ #include "PosixThreadFactory.h" +#include "Exception.h" #include #include @@ -18,7 +19,7 @@ namespace facebook { namespace thrift { namespace concurrency { using namespace boost; /** - * The POSIX thread class. + * The POSIX thread class. * * @author marc * @version $Id:$ @@ -47,10 +48,10 @@ class PthreadThread: public Thread { weak_ptr self_; public: - - PthreadThread(int policy, int priority, int stackSize, shared_ptr runnable) : + + PthreadThread(int policy, int priority, int stackSize, shared_ptr runnable) : pthread_(0), - state_(uninitialized), + state_(uninitialized), policy_(policy), priority_(priority), stackSize_(stackSize) { @@ -68,32 +69,39 @@ class PthreadThread: public Thread { state_ = starting; pthread_attr_t thread_attr; - int ret = pthread_attr_init(&thread_attr); - assert(ret == 0); + if(pthread_attr_init(&thread_attr) != 0) { + throw SystemResourceException("pthread_attr_init failed"); + } - ret = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE); - assert(ret == 0); + if(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) { + throw SystemResourceException("pthread_attr_setdetachstate failed"); + } // Set thread stack size - ret = pthread_attr_setstacksize(&thread_attr, MB * stackSize_); - assert(ret == 0); + if(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) { + throw SystemResourceException("pthread_attr_setstacksize failed"); + } // Set thread policy - ret = pthread_attr_setschedpolicy(&thread_attr, policy_); - assert(ret == 0); + if(pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) { + throw SystemResourceException("pthread_attr_setschedpolicy failed"); + } struct sched_param sched_param; sched_param.sched_priority = priority_; // Set thread priority - ret = pthread_attr_setschedparam(&thread_attr, &sched_param); - assert(ret == 0); + if(pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) { + throw SystemResourceException("pthread_attr_setschedparam failed"); + } // Create reference shared_ptr* selfRef = new shared_ptr(); *selfRef = self_.lock(); - ret = pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef); - assert(ret == 0); + + if(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) { + throw SystemResourceException("pthread_create failed"); + } } void join() { @@ -103,6 +111,10 @@ class PthreadThread: public Thread { } } + id_t id() { + return pthread_; + } + shared_ptr runnable() const { return Thread::runnable(); } void runnable(shared_ptr value) { Thread::runnable(value); } @@ -130,7 +142,7 @@ void* PthreadThread::threadMain(void* arg) { if (thread->state_ != stopping && thread->state_ != stopped) { thread->state_ = stopping; } - + return (void*)0; } @@ -187,14 +199,14 @@ class PosixThreadFactory::Impl { public: - Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : + Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {} /** - * Creates a new POSIX thread to run the runnable object + * Creates a new POSIX thread to run the runnable object * * @param runnable A runnable object */ @@ -211,6 +223,8 @@ class PosixThreadFactory::Impl { PRIORITY priority() const { return priority_; } + Thread::id_t currentThreadId() const {return pthread_self();} + /** * Sets priority. * @@ -220,7 +234,7 @@ class PosixThreadFactory::Impl { void priority(PRIORITY value) { priority_ = value; } }; -PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : +PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const { return impl_->newThread(runnable); } @@ -233,4 +247,6 @@ PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_ void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); } +Thread::id_t PosixThreadFactory::currentThreadId() const {return impl_->currentThreadId();} + }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index ede7d79b..4e31dc50 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -57,6 +57,9 @@ class PosixThreadFactory : public ThreadFactory { // From ThreadFactory; boost::shared_ptr newThread(boost::shared_ptr runnable) const; + // From ThreadFactory; + Thread::id_t currentThreadId() const; + /** * Sets stack size for created threads * diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index 96ca6681..e928fc44 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -10,12 +10,12 @@ #include #include -namespace facebook { namespace thrift { namespace concurrency { +namespace facebook { namespace thrift { namespace concurrency { class Thread; /** - * Minimal runnable class. More or less analogous to java.lang.Runnable. + * Minimal runnable class. More or less analogous to java.lang.Runnable. * * @author marc * @version $Id:$ @@ -43,7 +43,7 @@ class Runnable { }; /** - * Minimal thread class. Returned by thread factory bound to a Runnable object + * Minimal thread class. Returned by thread factory bound to a Runnable object * and ready to start execution. More or less analogous to java.lang.Thread * (minus all the thread group, priority, mode and other baggage, since that * is difficult to abstract across platforms and is left for platform-specific @@ -52,8 +52,11 @@ class Runnable { * @see facebook::thrift::concurrency::ThreadFactory) */ class Thread { - + public: + + typedef unsigned long long id_t; + virtual ~Thread() {}; /** @@ -69,6 +72,11 @@ class Thread { */ virtual void join() = 0; + /** + * Gets the thread's platform-specific ID + */ + virtual id_t id() = 0; + /** * Gets the runnable object this thread is hosting */ @@ -79,6 +87,7 @@ class Thread { private: boost::shared_ptr _runnable; + }; /** @@ -90,6 +99,12 @@ class ThreadFactory { public: virtual ~ThreadFactory() {} virtual boost::shared_ptr newThread(boost::shared_ptr runnable) const = 0; + + /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */ + + static const Thread::id_t unknown_thread_id; + + virtual Thread::id_t currentThreadId() const = 0; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp index 1631541d..7d78edb2 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/concurrency/ThreadManager.cpp @@ -18,14 +18,13 @@ #include #endif //defined(DEBUG) -namespace facebook { namespace thrift { namespace concurrency { +namespace facebook { namespace thrift { namespace concurrency { using namespace boost; - /** * ThreadManager class - * + * * This class manages a pool of threads. It uses a ThreadFactory to create * threads. It never actually creates or destroys worker threads, rather * it maintains statistics on number of idle threads, number of active threads, @@ -37,10 +36,11 @@ using namespace boost; class ThreadManager::Impl : public ThreadManager { public: - Impl() : + Impl() : workerCount_(0), workerMaxCount_(0), idleCount_(0), + pendingTaskCountMax_(0), state_(ThreadManager::UNINITIALIZED) {} ~Impl() { stop(); } @@ -56,39 +56,51 @@ class ThreadManager::Impl : public ThreadManager { } shared_ptr threadFactory() const { - Synchronized s(monitor_); + Synchronized s(monitor_); return threadFactory_; } - - void threadFactory(shared_ptr value) { + + void threadFactory(shared_ptr value) { Synchronized s(monitor_); threadFactory_ = value; } void addWorker(size_t value); - + void removeWorker(size_t value); - + size_t idleWorkerCount() const { return idleCount_; } size_t workerCount() const { - Synchronized s(monitor_); + Synchronized s(monitor_); return workerCount_; } - + size_t pendingTaskCount() const { - Synchronized s(monitor_); + Synchronized s(monitor_); return tasks_.size(); } size_t totalTaskCount() const { - Synchronized s(monitor_); + Synchronized s(monitor_); return tasks_.size() + workerCount_ - idleCount_; } - - void add(shared_ptr value); + + size_t pendingTaskCountMax() const { + Synchronized s(monitor_); + return pendingTaskCountMax_; + } + + void pendingTaskCountMax(const size_t value) { + Synchronized s(monitor_); + pendingTaskCountMax_ = value; + } + + bool canSleep(); + + void add(shared_ptr value, long long timeout); void remove(shared_ptr task); @@ -98,6 +110,8 @@ private: size_t workerCount_; size_t workerMaxCount_; size_t idleCount_; + size_t pendingTaskCountMax_; + ThreadManager::STATE state_; shared_ptr threadFactory_; @@ -110,6 +124,7 @@ private: friend class ThreadManager::Worker; std::set > workers_; std::set > deadWorkers_; + std::map > idMap_; }; class ThreadManager::Task : public Runnable { @@ -151,7 +166,7 @@ class ThreadManager::Worker: public Runnable { }; public: - Worker(ThreadManager::Impl* manager) : + Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {} @@ -173,7 +188,7 @@ class ThreadManager::Worker: public Runnable { * execute. */ void run() { - bool active = false; + bool active = false; bool notifyManager = false; /** @@ -230,6 +245,13 @@ class ThreadManager::Worker: public Runnable { if (task->state_ == ThreadManager::Task::WAITING) { task->state_ = ThreadManager::Task::EXECUTING; } + + /* If we have a pending task max and we just dropped below it, wakeup any + thread that might be blocked on add. */ + if(manager_->pendingTaskCountMax_ != 0 && + manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) { + manager_->workerMonitor_.notify(); + } } } else { idle_ = true; @@ -237,7 +259,7 @@ class ThreadManager::Worker: public Runnable { notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_); } } - + if (task != NULL) { if (task->state_ == ThreadManager::Task::EXECUTING) { try { @@ -248,18 +270,18 @@ class ThreadManager::Worker: public Runnable { } } } - + { - Synchronized s(manager_->workerMonitor_); + Synchronized s(manager_->workerMonitor_); manager_->deadWorkers_.insert(this->thread()); if (notifyManager) { manager_->workerMonitor_.notify(); } } - + return; } - + private: ThreadManager::Impl* manager_; friend class ThreadManager::Impl; @@ -271,7 +293,7 @@ class ThreadManager::Worker: public Runnable { void ThreadManager::Impl::addWorker(size_t value) { std::set > newThreads; for (size_t ix = 0; ix < value; ix++) { - class ThreadManager::Worker; + class ThreadManager::Worker; shared_ptr worker = shared_ptr(new ThreadManager::Worker(this)); newThreads.insert(threadFactory_->newThread(worker)); } @@ -281,15 +303,16 @@ class ThreadManager::Worker: public Runnable { workerMaxCount_ += value; workers_.insert(newThreads.begin(), newThreads.end()); } - + for (std::set >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { shared_ptr worker = dynamic_pointer_cast((*ix)->runnable()); worker->state_ = ThreadManager::Worker::STARTING; (*ix)->start(); + idMap_.insert(std::pair >((*ix)->id(), *ix)); } { - Synchronized s(workerMonitor_); + Synchronized s(workerMonitor_); while (workerCount_ != workerMaxCount_) { workerMonitor_.wait(); } @@ -303,7 +326,7 @@ void ThreadManager::Impl::start() { } { - Synchronized s(monitor_); + Synchronized s(monitor_); if (state_ == ThreadManager::UNINITIALIZED) { if (threadFactory_ == NULL) { throw InvalidArgumentException(); @@ -325,7 +348,7 @@ void ThreadManager::Impl::stopImpl(bool join) { } { - Synchronized s(monitor_); + Synchronized s(monitor_); if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING && state_ != ThreadManager::STOPPED) { @@ -338,12 +361,12 @@ void ThreadManager::Impl::stopImpl(bool join) { removeWorker(workerCount_); } - // XXX + // XXX // should be able to block here for transition to STOPPED since we're no // using shared_ptrs { - Synchronized s(monitor_); + Synchronized s(monitor_); state_ = ThreadManager::STOPPED; } @@ -352,7 +375,7 @@ void ThreadManager::Impl::stopImpl(bool join) { void ThreadManager::Impl::removeWorker(size_t value) { std::set > removedThreads; { - Synchronized s(monitor_); + Synchronized s(monitor_); if (value > workerMaxCount_) { throw InvalidArgumentException(); } @@ -369,7 +392,7 @@ void ThreadManager::Impl::removeWorker(size_t value) { } { - Synchronized s(workerMonitor_); + Synchronized s(workerMonitor_); while (workerCount_ != workerMaxCount_) { workerMonitor_.wait(); @@ -377,19 +400,37 @@ void ThreadManager::Impl::removeWorker(size_t value) { for (std::set >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) { workers_.erase(*ix); + idMap_.erase((*ix)->id()); } - + deadWorkers_.clear(); } } - -void ThreadManager::Impl::add(shared_ptr value) { - Synchronized s(monitor_); + + bool ThreadManager::Impl::canSleep() { + const Thread::id_t id = threadFactory_->currentThreadId(); + return idMap_.find(id) == idMap_.end(); + } + + void ThreadManager::Impl::add(shared_ptr value, long long timeout) { + Synchronized s(monitor_); if (state_ != ThreadManager::STARTED) { throw IllegalStateException(); } + if(pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) { + + if(canSleep()) { + + while(pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) { + monitor_.wait(timeout); + } + } else { + throw TooManyPendingTasksException(); + } + } + tasks_.push(shared_ptr(new ThreadManager::Task(value))); // If idle thread is available notify it, otherwise all worker threads are @@ -400,7 +441,7 @@ void ThreadManager::Impl::add(shared_ptr value) { } void ThreadManager::Impl::remove(shared_ptr task) { - Synchronized s(monitor_); + Synchronized s(monitor_); if (state_ != ThreadManager::STARTED) { throw IllegalStateException(); } @@ -409,18 +450,21 @@ void ThreadManager::Impl::remove(shared_ptr task) { class SimpleThreadManager : public ThreadManager::Impl { public: - SimpleThreadManager(size_t workerCount=4) : + SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) : workerCount_(workerCount), + pendingTaskCountMax_(pendingTaskCountMax), firstTime_(true) { } void start() { + ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_); ThreadManager::Impl::start(); addWorker(workerCount_); } private: const size_t workerCount_; + const size_t pendingTaskCountMax_; bool firstTime_; Monitor monitor_; }; @@ -430,8 +474,9 @@ shared_ptr ThreadManager::newThreadManager() { return shared_ptr(new ThreadManager::Impl()); } -shared_ptr ThreadManager::newSimpleThreadManager(size_t count) { - return shared_ptr(new SimpleThreadManager(count)); +shared_ptr ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) { + return shared_ptr(new SimpleThreadManager(count, pendingTaskCountMax)); } }}} // facebook::thrift::concurrency + diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index f0c745fa..19f77cca 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -11,7 +11,7 @@ #include #include "Thread.h" -namespace facebook { namespace thrift { namespace concurrency { +namespace facebook { namespace thrift { namespace concurrency { /** * Thread Pool Manager and related classes @@ -56,7 +56,7 @@ class ThreadManager { * Stops the thread manager. Aborts all remaining unprocessed task, shuts * down all created worker threads, and realeases all allocated resources. * This method blocks for all worker threads to complete, thus it can - * potentially block forever if a worker thread is running a task that + * potentially block forever if a worker thread is running a task that * won't terminate. */ virtual void stop() = 0; @@ -76,7 +76,7 @@ class ThreadManager { STOPPING, STOPPED }; - + virtual const STATE state() const = 0; virtual boost::shared_ptr threadFactory() const = 0; @@ -108,11 +108,26 @@ class ThreadManager { virtual size_t totalTaskCount() const = 0; /** - * Adds a task to be execued at some time in the future by a worker thread. + * Gets the maximum pending task count. 0 indicates no maximum + */ + virtual size_t pendingTaskCountMax() const = 0; + + /** + * Adds a task to be executed at some time in the future by a worker thread. * - * @param value The task to run + * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount() + * is greater than or equalt to pendingTaskCountMax(). If this method is called in the + * context of a ThreadManager worker thread it will throw a + * TooManyPendingTasksException + * + * @param task The task to queue for execution + * + * @param timeout Time to wait in milliseconds to add a task when a pending-task-count + * is specified + * + * @throws TooManyPendingTasksException Pending task count exceeds max pending task count */ - virtual void add(boost::shared_ptrvalue) = 0; + virtual void add(boost::shared_ptrtask, long long timeout=0LL) = 0; /** * Removes a pending task @@ -122,12 +137,14 @@ class ThreadManager { static boost::shared_ptr newThreadManager(); /** - * Creates a simple thread manager the uses count number of worker threads + * Creates a simple thread manager the uses count number of worker threads and has + * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit + * on pending tasks */ - static boost::shared_ptr newSimpleThreadManager(size_t count=4); + static boost::shared_ptr newSimpleThreadManager(size_t count=4, size_t pendingTaskCountMax=0); class Task; - + class Worker; class Impl; diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp index 050885d4..8d6dd5ec 100644 --- a/lib/cpp/src/concurrency/TimerManager.cpp +++ b/lib/cpp/src/concurrency/TimerManager.cpp @@ -97,7 +97,9 @@ class TimerManager::Dispatcher: public Runnable { timeout = manager_->taskMap_.begin()->first - now; } assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0)); - manager_->monitor_.wait(timeout); + try { + manager_->monitor_.wait(timeout); + } catch(TimedOutException& e) {} now = Util::currentTime(); } diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp index 96dd795c..f4b0b627 100644 --- a/lib/cpp/src/concurrency/test/Tests.cpp +++ b/lib/cpp/src/concurrency/test/Tests.cpp @@ -29,9 +29,9 @@ int main(int argc, char** argv) { if (runAll || args[0].compare("thread-factory") == 0) { ThreadFactoryTests threadFactoryTests; - + std::cout << "ThreadFactory tests..." << std::endl; - + size_t count = 1000; std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl; @@ -61,7 +61,7 @@ int main(int argc, char** argv) { time00 = Util::currentTime(); time01 = time00; size_t count = 0; - + while (time01 < time00 + 10) { count++; time01 = Util::currentTime(); @@ -99,6 +99,11 @@ int main(int argc, char** argv) { ThreadManagerTests threadManagerTests; assert(threadManagerTests.loadTest(taskCount, delay, workerCount)); + + std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl; + + assert(threadManagerTests.blockTest(delay, workerCount)); + } } diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h index 90d0e4f7..2e2dbddd 100644 --- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -4,6 +4,7 @@ // See accompanying file LICENSE or visit the Thrift site at: // http://developers.facebook.com/thrift/ +#include #include #include #include @@ -19,7 +20,7 @@ using boost::shared_ptr; using namespace facebook::thrift::concurrency; /** - * ThreadManagerTests class + * ThreadManagerTests class * * @author marc * @version $Id:$ @@ -29,7 +30,7 @@ class ThreadFactoryTests { public: static const double ERROR; - + class Task: public Runnable { public: @@ -65,20 +66,20 @@ public: * Reap N threads */ class ReapNTask: public Runnable { - + public: - + ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {} - + void run() { Synchronized s(_monitor); - + _count--; - + //std::cout << "\t\t\tthread count: " << _count << std::endl; - + if (_count == 0) { _monitor.notify(); } @@ -128,7 +129,7 @@ public: class SynchStartTask: public Runnable { public: - + enum STATE { UNINITIALIZED, STARTING, @@ -171,9 +172,9 @@ public: bool synchStartTest() { Monitor monitor; - + SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED; - + shared_ptr task = shared_ptr(new SynchStartTask(monitor, state)); PosixThreadFactory threadFactory = PosixThreadFactory(); @@ -199,7 +200,10 @@ public: { Synchronized s(monitor); - monitor.wait(100); + try { + monitor.wait(100); + } catch(TimedOutException& e) { + } if (state == SynchStartTask::STARTED) { @@ -207,7 +211,7 @@ public: monitor.notify(); } - + while (state == SynchStartTask::STOPPING) { monitor.wait(); } @@ -233,7 +237,10 @@ public: for (size_t ix = 0; ix < count; ix++) { { Synchronized s(monitor); - monitor.wait(timeout); + try { + monitor.wait(timeout); + } catch(TimedOutException& e) { + } } } diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h index 9f044354..89e68438 100644 --- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -21,7 +21,7 @@ namespace facebook { namespace thrift { namespace concurrency { namespace test { using namespace facebook::thrift::concurrency; /** - * ThreadManagerTests class + * ThreadManagerTests class * * @author marc * @version $Id:$ @@ -35,8 +35,8 @@ public: class Task: public Runnable { public: - - Task(Monitor& monitor, size_t& count, long long timeout) : + + Task(Monitor& monitor, size_t& count, long long timeout) : _monitor(monitor), _count(count), _timeout(timeout), @@ -49,27 +49,33 @@ public: { Synchronized s(_sleep); - _sleep.wait(_timeout); + try { + _sleep.wait(_timeout); + } catch(TimedOutException& e) { + ; + }catch(...) { + assert(0); + } } _endTime = Util::currentTime(); _done = true; - + { Synchronized s(_monitor); // std::cout << "Thread " << _count << " completed " << std::endl; - + _count--; if (_count == 0) { - + _monitor.notify(); } } } - + Monitor& _monitor; size_t& _count; long long _timeout; @@ -95,11 +101,11 @@ public: shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); threadFactory->priority(PosixThreadFactory::HIGHEST); - + threadManager->threadFactory(threadFactory); threadManager->start(); - + std::set > tasks; for (size_t ix = 0; ix < count; ix++) { @@ -118,7 +124,7 @@ public: Synchronized s(monitor); while(activeCount > 0) { - + monitor.wait(); } } @@ -133,7 +139,7 @@ public: long long maxTime = 0; for (std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { - + shared_ptr task = *ix; long long delta = task->_endTime - task->_startTime; @@ -158,7 +164,7 @@ public: averageTime+= delta; } - + averageTime /= count; std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl; @@ -177,6 +183,167 @@ public: return success; } + + class BlockTask: public Runnable { + + public: + + BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) : + _monitor(monitor), + _bmonitor(bmonitor), + _count(count) {} + + void run() { + { + Synchronized s(_bmonitor); + + _bmonitor.wait(); + + } + + { + Synchronized s(_monitor); + + _count--; + + if (_count == 0) { + + _monitor.notify(); + } + } + } + + Monitor& _monitor; + Monitor& _bmonitor; + size_t& _count; + }; + + /** + * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the + * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */ + + bool blockTest(long long timeout=100LL, size_t workerCount=2) { + + bool success = false; + + try { + + Monitor bmonitor; + Monitor monitor; + + size_t pendingTaskMaxCount = workerCount; + + size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1}; + + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount); + + shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + + threadFactory->priority(PosixThreadFactory::HIGHEST); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + std::set > tasks; + + for (size_t ix = 0; ix < workerCount; ix++) { + + tasks.insert(shared_ptr(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0]))); + } + + for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) { + + tasks.insert(shared_ptr(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1]))); + } + + for (std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + threadManager->add(*ix); + } + + if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) { + throw TException("Unexpected pending task count"); + } + + shared_ptr extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2])); + + try { + threadManager->add(extraTask, 1); + throw TException("Unexpected success adding task in excess of pending task count"); + } catch(TimedOutException& e) { + } + + std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; + + { + Synchronized s(bmonitor); + + bmonitor.notifyAll(); + } + + { + Synchronized s(monitor); + + while(activeCounts[0] != 0) { + monitor.wait(); + } + } + + std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; + + try { + threadManager->add(extraTask, 1); + } catch(TimedOutException& e) { + std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl; + throw TException("Unexpected timeout adding task"); + + } catch(TooManyPendingTasksException& e) { + std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl; + throw TException("Unexpected timeout adding task"); + } + + // Wake up tasks that were pending before and wait for them to complete + + { + Synchronized s(bmonitor); + + bmonitor.notifyAll(); + } + + { + Synchronized s(monitor); + + while(activeCounts[1] != 0) { + monitor.wait(); + } + } + + // Wake up the extra task and wait for it to complete + + { + Synchronized s(bmonitor); + + bmonitor.notifyAll(); + } + + { + Synchronized s(monitor); + + while(activeCounts[2] != 0) { + monitor.wait(); + } + } + + if(!(success = (threadManager->totalTaskCount() == 0))) { + throw TException("Unexpected pending task count"); + } + + } catch(TException& e) { + } + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl; + return success; + } }; const double ThreadManagerTests::ERROR = .20; diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index 69ca7d57..a2719349 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -68,7 +68,7 @@ TThreadPoolServer::TThreadPoolServer(shared_ptr processor, shared_ptr threadManager) : TServer(processor, serverTransport, transportFactory, protocolFactory), threadManager_(threadManager), - stop_(false) {} + stop_(false), timeout_(0) {} TThreadPoolServer::TThreadPoolServer(shared_ptr processor, shared_ptr serverTransport, @@ -80,7 +80,7 @@ TThreadPoolServer::TThreadPoolServer(shared_ptr processor, TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory), threadManager_(threadManager), - stop_(false) {} + stop_(false), timeout_(0) {} TThreadPoolServer::~TThreadPoolServer() {} @@ -118,7 +118,7 @@ void TThreadPoolServer::serve() { outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); // Add to threadmanager pool - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol))); + threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)), timeout_); } catch (TTransportException& ttx) { if (inputTransport != NULL) { inputTransport->close(); } @@ -156,4 +156,7 @@ void TThreadPoolServer::serve() { } +long long TThreadPoolServer::timeout() const {return timeout_;} +void TThreadPoolServer::timeout(long long value) {timeout_ = value;} + }}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h index fd745015..b8b64f22 100644 --- a/lib/cpp/src/server/TThreadPoolServer.h +++ b/lib/cpp/src/server/TThreadPoolServer.h @@ -41,6 +41,9 @@ class TThreadPoolServer : public TServer { virtual ~TThreadPoolServer(); virtual void serve(); + + virtual long long timeout() const; + virtual void timeout(long long value); virtual void stop() { stop_ = true; @@ -52,6 +55,8 @@ class TThreadPoolServer : public TServer { boost::shared_ptr threadManager_; volatile bool stop_; + + volatile long long timeout_; }; diff --git a/test/threads/ThreadsServer.cpp b/test/threads/ThreadsServer.cpp index 7d17ecab..a293cfbd 100644 --- a/test/threads/ThreadsServer.cpp +++ b/test/threads/ThreadsServer.cpp @@ -68,7 +68,10 @@ protected: Monitor m; for (int i = 0; i < seconds; ++i) { fprintf(stderr, "Thread %d: sleep %d\n", thread, i); - m.wait(1000); + try { + m.wait(1000); + } catch(TimedOutException& e) { + } } fprintf(stderr, "THREAD %d DONE\n", thread); }