From 66949879ffaa724ff2789bd62a2b8820bf45f13f Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Sat, 15 Jul 2006 01:52:39 +0000 Subject: [PATCH] Checkpoint of initial cut at thread pool manager for thrift and related concurrency classes. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664721 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/concurrency/Monitor.cc | 138 ++++++++ lib/cpp/src/concurrency/Monitor.h | 59 ++++ lib/cpp/src/concurrency/Mutex.cc | 38 +++ lib/cpp/src/concurrency/Mutex.h | 43 +++ lib/cpp/src/concurrency/PosixThreadFactory.cc | 215 +++++++++++++ lib/cpp/src/concurrency/PosixThreadFactory.h | 76 +++++ lib/cpp/src/concurrency/Thread.h | 59 ++++ lib/cpp/src/concurrency/ThreadManager.cc | 303 ++++++++++++++++++ lib/cpp/src/concurrency/ThreadManager.h | 122 +++++++ 9 files changed, 1053 insertions(+) create mode 100644 lib/cpp/src/concurrency/Monitor.cc create mode 100644 lib/cpp/src/concurrency/Monitor.h create mode 100644 lib/cpp/src/concurrency/Mutex.cc create mode 100644 lib/cpp/src/concurrency/Mutex.h create mode 100644 lib/cpp/src/concurrency/PosixThreadFactory.cc create mode 100644 lib/cpp/src/concurrency/PosixThreadFactory.h create mode 100644 lib/cpp/src/concurrency/Thread.h create mode 100644 lib/cpp/src/concurrency/ThreadManager.cc create mode 100644 lib/cpp/src/concurrency/ThreadManager.h diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc new file mode 100644 index 00000000..d1b83d16 --- /dev/null +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -0,0 +1,138 @@ +#include "Monitor.h" + +#include +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { + +/** Monitor implementation using the POSIX pthread library + + @author marc + @version $Id$ */ + +class Monitor::Impl { + + public: + + Impl() : + mutexInitialized(false) { + + /* XXX + Need to fix this to handle failures without leaking. */ + + assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); + + mutexInitialized = true; + + assert(pthread_cond_init(&_pthread_cond, NULL) == 0); + } + + ~Impl() { + + if(mutexInitialized) { + + mutexInitialized = false; + + assert(pthread_mutex_destroy(&_pthread_mutex) == 0); + } + + if(condInitialized) { + + condInitialized = false; + + assert(pthread_cond_destroy(&_pthread_cond) == 0); + } + } + + void lock() const {pthread_mutex_lock(&_pthread_mutex);} + + void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} + + void wait(long long timeout) const { + + // XXX Need to assert that caller owns mutex + + if(timeout == 0LL) { + + pthread_cond_wait(&_pthread_cond, &_pthread_mutex); + + } else { + + struct timespec abstime; + + toAbsoluteTimespec(abstime, timeout); + + int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); + + if(result == ETIMEDOUT) { + + // XXX If result is timeout need to throw timeout exception + } + } + } + + void notify() { + + // XXX Need to assert that caller owns mutex + + assert(pthread_cond_signal(&_pthread_cond) == 0); + } + + void notifyAll() { + + // XXX Need to assert that caller owns mutex + + assert(pthread_cond_broadcast(&_pthread_cond) == 0); + } + +private: + + /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure + specifying current time plus timeout + + @param timeout time to delay in milliseconds + @return struct timespec current time plus timeout */ + + static const void toAbsoluteTimespec(struct timespec& result, long long timeout) { + + // XXX Darwin doesn't seem to have any readily useable hi-res clock. + + time_t seconds; + + assert(time(&seconds) != (time_t)-1); + + seconds+= (timeout / 1000); + + long nanoseconds = (timeout % 1000) * 1000000; + + result.tv_sec = seconds + (nanoseconds / 1000000000); + + result.tv_nsec = nanoseconds % 1000000000; + } + + mutable pthread_mutex_t _pthread_mutex; + + mutable bool mutexInitialized; + + mutable pthread_cond_t _pthread_cond; + + mutable bool condInitialized; +}; + +Monitor::Monitor() : _impl(new Monitor::Impl()) {} + + Monitor::~Monitor() { delete _impl;} + +void Monitor::lock() const {_impl->lock();} + +void Monitor::unlock() const {_impl->unlock();} + +void Monitor::wait(long long timeout) const {_impl->wait(timeout);} + +void Monitor::notify() const {_impl->notify();} + +void Monitor::notifyAll() const {_impl->notifyAll();} + +}}} // facebook::thrift::concurrency + diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h new file mode 100644 index 00000000..82544f12 --- /dev/null +++ b/lib/cpp/src/concurrency/Monitor.h @@ -0,0 +1,59 @@ +#if !defined(_concurrency_mutex_h_) +#define _concurrency_mutex_h_ 1 + +namespace facebook { namespace thrift { namespace concurrency { + +/** A monitor is a combination mutex and condition-event. Waiting and notifying condition events requires that the caller own the mutex. Mutex + lock and unlock operations can be performed independently of condition events. This is more or less analogous to java.lang.Object multi-thread + operations + + Note that all methods are const. Monitors implement logical constness, not bit constness. This allows const methods to call monitor + methods without needing to cast away constness or change to non-const signatures. + + @author marc + @version $Id$ */ + +class Monitor { + + public: + + Monitor(); + + virtual ~Monitor(); + + virtual void lock() const; + + virtual void unlock() const; + + virtual void wait(long long timeout=0LL) const; + + virtual void notify() const; + + virtual void notifyAll() const; + + private: + + class Impl; + + Impl* _impl; +}; + +class Synchronized { + public: + + Synchronized(const Monitor& value) : _monitor(value) { + _monitor.lock(); + } + + ~Synchronized() { + _monitor.unlock(); + } + + private: + const Monitor& _monitor; +}; + + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_mutex_h_) diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc new file mode 100644 index 00000000..39d768e5 --- /dev/null +++ b/lib/cpp/src/concurrency/Mutex.cc @@ -0,0 +1,38 @@ +#include "Mutex.h" + +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { + +class Mutex::impl { +public: + impl() : initialized(false) { + assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); + initialized = true; + } + + ~impl() { + if(initialized) { + initialized = false; + assert(pthread_mutex_destroy(&_pthread_mutex) == 0); + } + } + + void lock() const {pthread_mutex_lock(&_pthread_mutex);} + + void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} + +private: + mutable pthread_mutex_t _pthread_mutex; + mutable bool initialized; +}; + +Mutex::Mutex() : _impl(new Mutex::impl()) {} + +void Mutex::lock() const {_impl->lock();} + +void Mutex::unlock() const {_impl->unlock();} + +}}} // facebook::thrift::concurrency + diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h new file mode 100644 index 00000000..e8371ea2 --- /dev/null +++ b/lib/cpp/src/concurrency/Mutex.h @@ -0,0 +1,43 @@ +#if !defined(_concurrency_mutex_h_) +#define _concurrency_mutex_h_ 1 + +namespace facebook { namespace thrift { namespace concurrency { + +class Mutex { + + public: + + Mutex(); + + virtual ~Mutex() {} + + virtual void lock() const; + + virtual void unlock() const; + + private: + + class impl; + + impl* _impl; +}; + +class MutexMonitor { + public: + + MutexMonitor(const Mutex& value) : _mutex(value) { + _mutex.lock(); + } + + ~MutexMonitor() { + _mutex.unlock(); + } + + private: + const Mutex& _mutex; +}; + + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_mutex_h_) diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc new file mode 100644 index 00000000..e9d52f06 --- /dev/null +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -0,0 +1,215 @@ +#include "PosixThreadFactory.h" + +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { + +/** The POSIX thread class. */ + +class PthreadThread: public Thread { + +public: + enum STATE {uninitialized, + starting, + started, + stopping, + stopped + }; + + static const int MB = 1024 * 1024; + +private: + + pthread_t _pthread; + + STATE _state; + + int _policy; + + int _priority; + + int _stackSize; + + Runnable* _runnable; + + static void* threadMain(void* arg) { + + // XXX need a lock here when testing thread state + + PthreadThread* thread = (PthreadThread*)arg; + + if(thread->_state != starting) { + return (void*)0; + } + + thread->_state = starting; + + thread->_runnable->run(); + + if(thread->_state != stopping && thread->_state != stopped) { + thread->_state = stopping; + } + + return (void*)0; + } + +public: + + PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) : + _pthread(0), + _state(uninitialized), + _policy(policy), + _priority(priority), + _stackSize(stackSize), + _runnable(runnable) + {} + + void start() { + + if(_state != uninitialized) { + return; + } + + _state = starting; + + pthread_attr_t thread_attr; + + assert(pthread_attr_init(&thread_attr) == 0); + + assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); + + // Set thread stack size + + assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); + + // Set thread policy + + assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); + + struct sched_param sched_param; + sched_param.sched_priority = _priority; + + // Set thread priority + + assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); + + assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0); + } + + void join() { + + if(_state != stopped) { + + void* ignore; + + pthread_join(_pthread, &ignore); + } + } + + const Runnable* runnable() const {return _runnable;} + +}; + +/** POSIX Thread factory implementation */ + +class PosixThreadFactory::Impl { + +private: + + POLICY _policy; + + PRIORITY _priority; + + int _stackSize; + + bool _detached; + + /** Converts generic posix thread schedule policy enums into pthread API values. */ + + static int toPthreadPolicy(POLICY policy) { + switch(policy) { + case OTHER: return SCHED_OTHER; break; + case FIFO: return SCHED_FIFO; break; + case ROUND_ROBIN: return SCHED_RR; break; + default: return SCHED_OTHER; break; + } + } + + /** Converts relative thread priorities to absolute value based on posix thread scheduler policy + + The idea is simply to divide up the priority range for the given policy into the correpsonding relative + priority level (lowest..highest) and then prorate accordingly. */ + + static int toPthreadPriority(POLICY policy, PRIORITY priority) { + + int pthread_policy = toPthreadPolicy(policy); + + int min_priority = sched_get_priority_min(pthread_policy); + + int max_priority = sched_get_priority_max(pthread_policy); + + int quanta = (HIGHEST - LOWEST) + 1; + + float stepsperquanta = (max_priority - min_priority) / quanta; + + if(priority <= HIGHEST) { + + return (int)(min_priority + stepsperquanta * priority); + } else { + + // should never get here for priority increments. + + assert(false); + + return (int)(min_priority + stepsperquanta * NORMAL); + } + } + +public: + + Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : + _policy(policy), + _priority(priority), + _stackSize(stackSize), + _detached(detached) { + } + + /** Creates a new POSIX thread to run the runnable object + + @param runnable A runnable object */ + + Thread* newThread(Runnable* runnable) const { + + return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable); + } + + int stackSize() const { return _stackSize;} + + void stackSize(int value) { _stackSize = value;} + + PRIORITY priority() const { return _priority;} + + /** Sets priority. + + XXX + Need to handle incremental priorities properl. */ + + void priority(PRIORITY value) { _priority = value;} + +}; + +PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : + _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} + +Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);} + +int PosixThreadFactory::stackSize() const {return _impl->stackSize();} + +void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);} + +PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();} + +void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);} + +}}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h new file mode 100644 index 00000000..88a08882 --- /dev/null +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -0,0 +1,76 @@ +#if !defined(_concurrency_PosixThreadFactory_h_) +#define _concurrency_PosixThreadFactory_h_ 1 + +#include "Thread.h" + +namespace facebook { namespace thrift { namespace concurrency { + +/** A thread factory to create posix threads + + @author marc */ + +class PosixThreadFactory : public ThreadFactory { + + public: + + /** POSIX Thread scheduler policies */ + + enum POLICY { + OTHER, + FIFO, + ROUND_ROBIN + }; + + /** POSIX Thread scheduler relative priorities, + + Absolute priority is determined by scheduler policy and OS. This enumeration specifies relative priorities such that one can + specify a priority withing a giving scheduler policy without knowing the absolute value of the priority. */ + + enum PRIORITY { + LOWEST = 0, + LOWER = 1, + LOW = 2, + NORMAL = 3, + HIGH = 4, + HIGHER = 5, + HIGHEST = 6, + INCREMENT = 7, + DECREMENT = 8 + }; + + PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false); + + // From ThreadFactory; + + Thread* newThread(Runnable* runnable) const; + + /** Sets stack size for created threads + + @param value size in megabytes */ + + virtual void stackSize(int value); + + /** Gets stack size for created threads + + @return int size in megabytes */ + + virtual int stackSize() const; + + /** Sets priority relative to current policy */ + + virtual void priority(PRIORITY priority); + + /** Gets priority relative to current policy */ + + virtual PRIORITY priority() const; + + private: + + class Impl; + + Impl* _impl; +}; + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_PosixThreadFactory_h_) diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h new file mode 100644 index 00000000..befb4feb --- /dev/null +++ b/lib/cpp/src/concurrency/Thread.h @@ -0,0 +1,59 @@ +#if !defined(_concurrency_Thread_h_) +#define _concurrency_Thread_h_ 1 + +namespace facebook { namespace thrift { namespace concurrency { + +class Thread; + +/** Minimal runnable class. More or less analogous to java.lang.Runnable. */ + +class Runnable { + + public: + + virtual ~Runnable() {}; + + virtual void run() = 0; +}; + +/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread + (minus all the thread group, priority, mode and other baggage, since that is difficult to abstract across platforms and is left for platform-specific + ThreadFactory implemtations to deal with - @see facebook::thrift::concurrency::ThreadFactory) */ + + +class Thread { + + public: + + virtual ~Thread() {}; + + /** Starts the thread. Does platform specific thread creation and configuration then invokes the run method of the Runnable object bound to this + thread. */ + + virtual void start() = 0; + + /** Join this thread + + Current thread blocks until this target thread completes. */ + + virtual void join() = 0; + + /** Gets the runnable object this thread is hosting */ + + virtual const Runnable* runnable() const = 0; +}; + +/** Factory to create platform-specific thread object and bind them to Runnable object for execution */ + +class ThreadFactory { + + public: + + virtual ~ThreadFactory() {} + + virtual Thread* newThread(Runnable* runnable) const = 0; +}; + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_Thread_h_) diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc new file mode 100644 index 00000000..c4ca2b18 --- /dev/null +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -0,0 +1,303 @@ +#include "ThreadManager.h" + +#include + +namespace facebook { namespace thrift { namespace concurrency { + +/** ThreadManager class + + This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather + it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the + PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool + size needs to be adjusted and call this object addThread and removeThread methods to make changes. + + This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on + policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. + + @author marc + @version $Id */ + +class ThreadManager::Task : public Runnable { + +public: + enum STATE { + WAITING, + EXECUTING, + CANCELLED, + COMPLETE + }; + + Task(Runnable* runnable) : + _runnable(runnable), + _state(WAITING) + {} + + ~Task() {}; + + void run() { + if(_state == EXECUTING) { + _runnable->run(); + _state = COMPLETE; + } + } + + private: + + Runnable* _runnable; + + STATE _state; +}; + +class ThreadManager::Worker: public Runnable { + + enum STATE { + UNINITIALIZED, + STARTING, + STARTED, + STOPPING, + STOPPED + }; + + public: + Worker(ThreadManager* manager) : + _manager(manager), + _state(UNINITIALIZED), + _idle(false) + {} + + ~Worker() {} + + /** Worker entry point + + As long as worker thread is running, pull tasks off the task queue and execute. */ + + void run() { + + {Synchronized(_manager->_monitor); + + if(_state == STARTING) { + _state = STARTED; + } + } + + do { + + ThreadManager::Task* task = NULL; + + /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop). + + Once the queue is non-empty, dequeue a task, release monitor, and execute. */ + + {Synchronized(_manager->_monitor); + + while(_state == STARTED && _manager->_tasks.empty()) { + + _manager->_idleCount++; + + _idle = true; + + _manager->_monitor.wait(); + + _idle = false; + + _manager->_idleCount--; + } + + if(_state == STARTED) { + + task = _manager->_tasks.front(); + } + } + + if(task != NULL) { + + task->run(); + + delete task; + } + + } while(_state == STARTED); + + {Synchronized(_manager->_monitor); + + if(_state == STOPPING) { + + _state = STOPPED; + + _manager->_monitor.notify(); + + } + } + + return; + } + + private: + + ThreadManager* _manager; + + friend class ThreadManager; + + STATE _state; + + bool _idle; +}; + +ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) : + _hiwat(highWatermark), + _lowat(lowWatermark) { +} + +ThreadManager::~ThreadManager() {} + +size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;} + +void ThreadManager::highWatermark(size_t value) {_hiwat = value;} + +size_t ThreadManager::lowWatermark() const {return _lowat;} + +void ThreadManager::lowWatermark(size_t value) {_lowat = value;} + +const PoolPolicy* ThreadManager::poolPolicy() const { + + Synchronized s(_monitor); + + return _poolPolicy; +} + +void ThreadManager::poolPolicy(const PoolPolicy* value) { + + Synchronized s(_monitor); + + _poolPolicy = value; +} + +const ThreadFactory* ThreadManager::threadFactory() const { + + Synchronized s(_monitor); + + return _threadFactory; +} + +void ThreadManager::threadFactory(const ThreadFactory* value) { + + Synchronized s(_monitor); + + _threadFactory = value; +} + +void ThreadManager::addThread(size_t value) { + + std::set newThreads; + + for(size_t ix = 0; ix < value; ix++) { + + ThreadManager::Worker* worker = new ThreadManager::Worker(this); + + newThreads.insert(_threadFactory->newThread(worker)); + } + + for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { + + (*ix)->start(); + } + for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { + + (*ix)->start(); + } + + {Synchronized s(_monitor); + + _workers.insert(newThreads.begin(), newThreads.end()); + } +} + +void ThreadManager::removeThread(size_t value) { + + std::set removedThreads; + + {Synchronized s(_monitor); + + /* Overly clever loop + + First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0) + and remove a sufficient number of busy threads. */ + + for(int idleOnly = 1; idleOnly <= 0; idleOnly--) { + + for(std::set::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) { + + Worker* worker = (Worker*)(*workerThread)->runnable(); + + if(worker->_idle || !idleOnly) { + + removedThreads.insert(*workerThread); + + _workers.erase(workerThread); + } + } + } + + _monitor.notifyAll(); + } + + + // Join removed threads and free worker + + for(std::set::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) { + + Worker* worker = (Worker*)(*workerThread)->runnable(); + + (*workerThread)->join(); + + delete worker; + } +} + +size_t ThreadManager::idleWorkerCount() const {return _idleCount;} + +size_t ThreadManager::workerCount() const { + + Synchronized s(_monitor); + + return _workers.size(); +} + +size_t ThreadManager::pendingTaskCount() const { + + Synchronized s(_monitor); + + return _tasks.size(); +} + +size_t ThreadManager::totalTaskCount() const { + + Synchronized s(_monitor); + + return _tasks.size() + _workers.size() - _idleCount; +} + +void ThreadManager::add(Runnable* value) { + + Synchronized s(_monitor); + + _tasks.push(new ThreadManager::Task(value)); + + /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this + task in time. */ + + if(_tasks.size() == 1) { + + assert(_idleCount == _workers.size()); + + _monitor.notify(); + } +} + +void ThreadManager::remove(Runnable* task) { + + Synchronized s(_monitor); +} + +}}} // facebook::thrift::concurrency + diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h new file mode 100644 index 00000000..17428818 --- /dev/null +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -0,0 +1,122 @@ +#if !defined(_concurrency_ThreadManager_h_) +#define _concurrency_ThreadManager_h_ 1 + +#include "Monitor.h" +#include "Thread.h" + +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { + +class ThreadManager; + +/** PoolPolicy class + + Tracks performance of ThreadManager object and makes desired changes in thread pool count if any. */ + +class PoolPolicy { + + public: + + virtual ~PoolPolicy() = 0; + + virtual void onlowWatermark(ThreadManager* source) const = 0; + + virtual void onhighWatermark(ThreadManager* source) const = 0; + +}; + +/** ThreadManager class + + This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather + it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the + PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool + size needs to be adjusted and call this object addThread and removeThread methods to make changes. + + This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on + policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */ + +class ThreadManager { + + public: + + ThreadManager(size_t highWatermark=4, size_t lowWatermark=2); + + virtual ~ThreadManager() = 0; + + virtual const PoolPolicy* poolPolicy() const = 0; + + virtual void poolPolicy(const PoolPolicy* value) = 0; + + virtual const ThreadFactory* threadFactory() const = 0; + + virtual void threadFactory(const ThreadFactory* value) = 0; + + virtual size_t highWatermark() const = 0; + + virtual void highWatermark(size_t value) = 0; + + virtual size_t lowWatermark() const = 0; + + virtual void lowWatermark(size_t value) = 0; + + virtual void addThread(size_t value=1) = 0; + + virtual void removeThread(size_t value=1) = 0; + + /** Gets the current number of idle worker threads */ + + virtual size_t idleWorkerCount() const = 0; + + /** Gets the current number of total worker threads */ + + virtual size_t workerCount() const = 0; + + /** Gets the current number of pending tasks */ + + virtual size_t pendingTaskCount() const = 0; + + /** Gets the current number of pending and executing tasks */ + + virtual size_t totalTaskCount() const = 0; + + /** Adds a task to be execued at some time in the future by a worker thread. */ + + virtual void add(Runnable* value) = 0; + + /** Removes a pending task */ + + virtual void remove(Runnable* task) = 0; + + private: + + size_t _hiwat; + + size_t _lowat; + + size_t _idleCount; + + const PoolPolicy* _poolPolicy;; + + const ThreadFactory* _threadFactory;; + + class Task; + + friend class Task; + + std::queue _tasks; + + Monitor _monitor; + + class Worker; + + friend class Worker; + + std::set _workers; + +}; + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_ThreadManager_h_) -- 2.17.1