From 0e53ccdb8bf0cee0d17fed331d32dd012cec2552 Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Mon, 17 Jul 2006 23:51:05 +0000 Subject: [PATCH] Another checkpoint of initial cut at thread pool manager for thrift and related concurrency classes. Added TimerManager - I can't live without one after all. Added Util - handy place for common time operations et al. Initial test code git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664722 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile | 70 ++++- lib/cpp/src/concurrency/Monitor.cc | 29 +- lib/cpp/src/concurrency/Monitor.h | 8 +- lib/cpp/src/concurrency/Mutex.cc | 5 + lib/cpp/src/concurrency/Mutex.h | 5 + lib/cpp/src/concurrency/PosixThreadFactory.cc | 7 +- lib/cpp/src/concurrency/PosixThreadFactory.h | 3 +- lib/cpp/src/concurrency/Thread.h | 5 +- lib/cpp/src/concurrency/ThreadManager.cc | 293 +++++++++++------- lib/cpp/src/concurrency/ThreadManager.h | 71 +++-- lib/cpp/src/concurrency/TimerManager.cc | 211 +++++++++++++ lib/cpp/src/concurrency/TimerManager.h | 77 +++++ lib/cpp/src/concurrency/Util.h | 67 ++++ .../src/concurrency/test/TimerManagerTests.cc | 33 ++ 14 files changed, 702 insertions(+), 182 deletions(-) create mode 100644 lib/cpp/src/concurrency/TimerManager.cc create mode 100644 lib/cpp/src/concurrency/TimerManager.h create mode 100644 lib/cpp/src/concurrency/Util.h create mode 100644 lib/cpp/src/concurrency/test/TimerManagerTests.cc diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile index 8403d7c6..fde44110 100644 --- a/lib/cpp/Makefile +++ b/lib/cpp/Makefile @@ -8,11 +8,19 @@ # Author: # Mark Slee -target: libthrift +target: libthrift.so libconcurrency.so # Tools LD = g++ -LDFL = -shared -Wall -Isrc -fPIC -Wl,-soname=libthrift.so +CPP = g++ + +CC_COMMON_FLAGS = -g -c -Wall -Isrc -fPIC -fno-common + +LD_COMMON_FLAGS= + +LD_APP_FLAGS= $(LD_COMMON_FLAGS) + +LD_LIB_FLAGS= -dynamiclib $(LD_COMMON_FLAGS) # Source files SRCS = src/protocol/TBinaryProtocol.cc \ @@ -22,13 +30,61 @@ SRCS = src/protocol/TBinaryProtocol.cc \ src/transport/TServerSocket.cc \ src/server/TSimpleServer.cc -# Linked library -libthrift: - $(LD) -o libthrift.so $(LDFL) $(SRCS) +# Concurreny Utility Source files +CONCURRENCY_SRCS = src/concurrency/Monitor.cc \ + src/concurrency/Mutex.cc \ + src/concurrency/PosixThreadFactory.cc \ + src/concurrency/ThreadManager.cc \ + src/concurrency/TimerManager.cc -# Clean it up -clean: +CONCURRENCY_OBJS = $(patsubst %.cc,%.o,$(CONCURRENCY_SRCS)) + +$(CONCURRENCY_OBJS): %.o : %.cc + $(CC) $(CC_COMMON_FLAGS) $< -o $@ + +CONCURRENCY_TEST_SRCS = src/concurrency/test/TimerManagerTests.cc + +CONCURRENCY_TEST_OBJS = $(patsubst %.cc,%.o,$(CONCURRENCY_TEST_SRCS)) + +$(CONCURRENCY_TEST_OBJS): %.o : %.cc + $(CC) $(CC_COMMON_FLAGS) -I src/concurrency $< -o $@ + +# Linked libraries + +# thrift library + +THRIFT_OBJS = $(patsubst %.cc,%.o, $(SRCS)) + +$(THRIFT_OBJS): %.o : %.cc + $(CC) $(CC_COMMON_FLAGS) $< -o $@ + +libthrift.so: $(THRIFT_OBJS) + $(LD) -o $@ $(LD_LIB_FLAGS) $(THRIFT_OBJS) + +# concurrency util library + +libconcurrency.so: $(CONCURRENCY_OBJS) + $(LD) -o $@ $(LD_LIB_FLAGS) $(CONCURRENCY_OBJS) + +concurrency_tests: libconcurrency.so $(CONCURRENCY_TEST_OBJS) + $(LD) -o $@ $(LD_APP_FLAGS) -L. $(CONCURRENCY_TEST_OBJS) libconcurrency.so + +tests: concurrency_tests + +clean_libthrift: rm -f libthrift.so + rm -f $(THRIFT_OBJS) + +clean_libconcurrency: + rm -f libconcurrency.so + rm -f $(CONCURRENCY_OBJS) + +clean_tests: + rm -f concurrency_tests + rm -f $(CONCURRENTY_TEST_OBJS) + +# Clean it up +clean: clean_libthrift clean_libconcurrency clean_tests # Install install: libthrift diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc index d1b83d16..a2e42760 100644 --- a/lib/cpp/src/concurrency/Monitor.cc +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -1,15 +1,17 @@ #include "Monitor.h" +#include "Util.h" #include #include #include + namespace facebook { namespace thrift { namespace concurrency { /** Monitor implementation using the POSIX pthread library @author marc - @version $Id$ */ + @version $Id:$ */ class Monitor::Impl { @@ -61,7 +63,7 @@ class Monitor::Impl { struct timespec abstime; - toAbsoluteTimespec(abstime, timeout); + Util::toAbsoluteTimespec(abstime, timeout); int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); @@ -88,29 +90,6 @@ class Monitor::Impl { private: - /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure - specifying current time plus timeout - - @param timeout time to delay in milliseconds - @return struct timespec current time plus timeout */ - - static const void toAbsoluteTimespec(struct timespec& result, long long timeout) { - - // XXX Darwin doesn't seem to have any readily useable hi-res clock. - - time_t seconds; - - assert(time(&seconds) != (time_t)-1); - - seconds+= (timeout / 1000); - - long nanoseconds = (timeout % 1000) * 1000000; - - result.tv_sec = seconds + (nanoseconds / 1000000000); - - result.tv_nsec = nanoseconds % 1000000000; - } - mutable pthread_mutex_t _pthread_mutex; mutable bool mutexInitialized; diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h index 82544f12..13dec185 100644 --- a/lib/cpp/src/concurrency/Monitor.h +++ b/lib/cpp/src/concurrency/Monitor.h @@ -1,5 +1,5 @@ -#if !defined(_concurrency_mutex_h_) -#define _concurrency_mutex_h_ 1 +#if !defined(_concurrency_Monitor_h_) +#define _concurrency_Monitor_h_ 1 namespace facebook { namespace thrift { namespace concurrency { @@ -11,7 +11,7 @@ namespace facebook { namespace thrift { namespace concurrency { methods without needing to cast away constness or change to non-const signatures. @author marc - @version $Id$ */ + @version $Id:$ */ class Monitor { @@ -56,4 +56,4 @@ class Synchronized { }}} // facebook::thrift::concurrency -#endif // !defined(_concurrency_mutex_h_) +#endif // !defined(_concurrency_Monitor_h_) diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc index 39d768e5..8282e73f 100644 --- a/lib/cpp/src/concurrency/Mutex.cc +++ b/lib/cpp/src/concurrency/Mutex.cc @@ -3,6 +3,11 @@ #include #include +/** Implementation of Mutex class using POSIX mutex + + @author marc + @version $Id:$ */ + namespace facebook { namespace thrift { namespace concurrency { class Mutex::impl { diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h index e8371ea2..20d4c0b0 100644 --- a/lib/cpp/src/concurrency/Mutex.h +++ b/lib/cpp/src/concurrency/Mutex.h @@ -3,6 +3,11 @@ namespace facebook { namespace thrift { namespace concurrency { +/** A simple mutex class + + @author marc + @version $Id:$ */ + class Mutex { public: diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index e9d52f06..e7f84cdd 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -5,7 +5,10 @@ namespace facebook { namespace thrift { namespace concurrency { -/** The POSIX thread class. */ +/** The POSIX thread class. + + @author marc + @version $Id:$ */ class PthreadThread: public Thread { @@ -139,7 +142,7 @@ private: /** Converts relative thread priorities to absolute value based on posix thread scheduler policy The idea is simply to divide up the priority range for the given policy into the correpsonding relative - priority level (lowest..highest) and then prorate accordingly. */ + priority level (lowest..highest) and then pro-rate accordingly. */ static int toPthreadPriority(POLICY policy, PRIORITY priority) { diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index 88a08882..b42981b7 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -7,7 +7,8 @@ namespace facebook { namespace thrift { namespace concurrency { /** A thread factory to create posix threads - @author marc */ + @author marc + @version $Id:$ */ class PosixThreadFactory : public ThreadFactory { diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index befb4feb..3fc094d5 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -5,7 +5,10 @@ namespace facebook { namespace thrift { namespace concurrency { class Thread; -/** Minimal runnable class. More or less analogous to java.lang.Runnable. */ +/** Minimal runnable class. More or less analogous to java.lang.Runnable. + + @author marc + @version $Id:$ */ class Runnable { diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc index c4ca2b18..b5d02e64 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cc +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -1,6 +1,9 @@ #include "ThreadManager.h" +#include "Monitor.h" #include +#include +#include namespace facebook { namespace thrift { namespace concurrency { @@ -15,7 +18,108 @@ namespace facebook { namespace thrift { namespace concurrency { policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. @author marc - @version $Id */ + @version $Id:$ */ + +class ThreadManager::Impl : public ThreadManager { + + public: + + Impl(size_t highWatermark, size_t lowWatermark) : + _hiwat(highWatermark), + _lowat(lowWatermark) { + } + + ~Impl() {} + + size_t highWatermark() const {return _hiwat;} + + void highWatermark(size_t value) {_hiwat = value;} + + size_t lowWatermark() const {return _lowat;} + + void lowWatermark(size_t value) {_lowat = value;} + + const PoolPolicy* poolPolicy() const { + + Synchronized s(_monitor); + + return _poolPolicy; + } + + void poolPolicy(const PoolPolicy* value) { + + Synchronized s(_monitor); + + _poolPolicy = value; + } + + const ThreadFactory* threadFactory() const { + + Synchronized s(_monitor); + + return _threadFactory; + } + + void threadFactory(const ThreadFactory* value) { + + Synchronized s(_monitor); + + _threadFactory = value; + } + + void addThread(size_t value); + + void removeThread(size_t value); + + size_t idleWorkerCount() const {return _idleCount;} + + size_t workerCount() const { + + Synchronized s(_monitor); + + return _workers.size(); + } + + size_t pendingTaskCount() const { + + Synchronized s(_monitor); + + return _tasks.size(); + } + + size_t totalTaskCount() const { + + Synchronized s(_monitor); + + return _tasks.size() + _workers.size() - _idleCount; + } + + void add(Runnable* value); + + void remove(Runnable* task); + +private: + + size_t _hiwat; + + size_t _lowat; + + size_t _idleCount; + + const PoolPolicy* _poolPolicy;; + + const ThreadFactory* _threadFactory; + + friend class ThreadManager::Task; + + std::queue _tasks; + + Monitor _monitor; + + friend class ThreadManager::Worker; + + std::set _workers; +}; class ThreadManager::Task : public Runnable { @@ -49,7 +153,6 @@ public: }; class ThreadManager::Worker: public Runnable { - enum STATE { UNINITIALIZED, STARTING, @@ -59,7 +162,7 @@ class ThreadManager::Worker: public Runnable { }; public: - Worker(ThreadManager* manager) : + Worker(ThreadManager::Impl* manager) : _manager(manager), _state(UNINITIALIZED), _idle(false) @@ -134,170 +237,138 @@ class ThreadManager::Worker: public Runnable { private: - ThreadManager* _manager; + ThreadManager::Impl* _manager; - friend class ThreadManager; + friend class ThreadManager::Impl; STATE _state; bool _idle; }; -ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) : - _hiwat(highWatermark), - _lowat(lowWatermark) { -} - -ThreadManager::~ThreadManager() {} - -size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;} - -void ThreadManager::highWatermark(size_t value) {_hiwat = value;} - -size_t ThreadManager::lowWatermark() const {return _lowat;} - -void ThreadManager::lowWatermark(size_t value) {_lowat = value;} - -const PoolPolicy* ThreadManager::poolPolicy() const { - - Synchronized s(_monitor); - - return _poolPolicy; -} - -void ThreadManager::poolPolicy(const PoolPolicy* value) { - - Synchronized s(_monitor); - - _poolPolicy = value; -} - -const ThreadFactory* ThreadManager::threadFactory() const { - - Synchronized s(_monitor); - - return _threadFactory; -} - -void ThreadManager::threadFactory(const ThreadFactory* value) { +void ThreadManager::Impl::addThread(size_t value) { - Synchronized s(_monitor); + std::set newThreads; - _threadFactory = value; -} + for(size_t ix = 0; ix < value; ix++) { -void ThreadManager::addThread(size_t value) { + class ThreadManager::Worker; + + ThreadManager::Worker* worker = new ThreadManager::Worker(this); - std::set newThreads; + newThreads.insert(_threadFactory->newThread(worker)); + } - for(size_t ix = 0; ix < value; ix++) { + for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { - ThreadManager::Worker* worker = new ThreadManager::Worker(this); + (*ix)->start(); + } + for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { - newThreads.insert(_threadFactory->newThread(worker)); + (*ix)->start(); + } + + {Synchronized s(_monitor); + + _workers.insert(newThreads.begin(), newThreads.end()); + } } - for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { - - (*ix)->start(); - } - for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { - - (*ix)->start(); - } - - {Synchronized s(_monitor); - - _workers.insert(newThreads.begin(), newThreads.end()); - } -} - -void ThreadManager::removeThread(size_t value) { +void ThreadManager::Impl::removeThread(size_t value) { std::set removedThreads; - {Synchronized s(_monitor); + {Synchronized s(_monitor); - /* Overly clever loop + /* Overly clever loop + + First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0) + and remove a sufficient number of busy threads. */ - First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0) - and remove a sufficient number of busy threads. */ - - for(int idleOnly = 1; idleOnly <= 0; idleOnly--) { + for(int idleOnly = 1; idleOnly <= 0; idleOnly--) { - for(std::set::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) { + for(std::set::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) { - Worker* worker = (Worker*)(*workerThread)->runnable(); + Worker* worker = (Worker*)(*workerThread)->runnable(); - if(worker->_idle || !idleOnly) { + if(worker->_idle || !idleOnly) { - removedThreads.insert(*workerThread); + removedThreads.insert(*workerThread); - _workers.erase(workerThread); + _workers.erase(workerThread); + } } } + + _monitor.notifyAll(); } + - _monitor.notifyAll(); + // Join removed threads and free worker + + for(std::set::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) { + + Worker* worker = (Worker*)(*workerThread)->runnable(); + + (*workerThread)->join(); + + delete worker; + } } + +void ThreadManager::Impl::add(Runnable* value) { + Synchronized s(_monitor); - // Join removed threads and free worker + _tasks.push(new ThreadManager::Task(value)); - for(std::set::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) { + /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this + task in time. */ - Worker* worker = (Worker*)(*workerThread)->runnable(); + if(_tasks.size() == 1) { - (*workerThread)->join(); + assert(_idleCount == _workers.size()); - delete worker; + _monitor.notify(); + } } -} -size_t ThreadManager::idleWorkerCount() const {return _idleCount;} +void ThreadManager::Impl::remove(Runnable* task) { -size_t ThreadManager::workerCount() const { - - Synchronized s(_monitor); + Synchronized s(_monitor); + } - return _workers.size(); +ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) { + return new ThreadManager::Impl(lowWatermark, highWatermark); } -size_t ThreadManager::pendingTaskCount() const { - - Synchronized s(_monitor); +/** Basic Pool Policy Implementation */ - return _tasks.size(); -} +class BasicPoolPolicy::Impl : public PoolPolicy { -size_t ThreadManager::totalTaskCount() const { + public: - Synchronized s(_monitor); + Impl() {} - return _tasks.size() + _workers.size() - _idleCount; -} + ~Impl() {} -void ThreadManager::add(Runnable* value) { + void onEmpty(ThreadManager* source) const {} - Synchronized s(_monitor); + void onLowWatermark(ThreadManager* source) const {} - _tasks.push(new ThreadManager::Task(value)); + void onHighWatermark(ThreadManager* source) const {} +}; - /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this - task in time. */ +BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl()) {} - if(_tasks.size() == 1) { +BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;} - assert(_idleCount == _workers.size()); +void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);} - _monitor.notify(); - } -} +void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);} -void ThreadManager::remove(Runnable* task) { +void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);} - Synchronized s(_monitor); -} }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index 17428818..352afb9b 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -1,14 +1,17 @@ #if !defined(_concurrency_ThreadManager_h_) #define _concurrency_ThreadManager_h_ 1 -#include "Monitor.h" -#include "Thread.h" +#include -#include -#include +#include "Thread.h" namespace facebook { namespace thrift { namespace concurrency { +/** Thread Pool Manager and related classes + + @author marc + @version $Id:$ */ + class ThreadManager; /** PoolPolicy class @@ -19,14 +22,39 @@ class PoolPolicy { public: - virtual ~PoolPolicy() = 0; + PoolPolicy() {} - virtual void onlowWatermark(ThreadManager* source) const = 0; + virtual ~PoolPolicy() {} - virtual void onhighWatermark(ThreadManager* source) const = 0; + virtual void onEmpty(ThreadManager* source) const = 0; + + virtual void onLowWatermark(ThreadManager* source) const = 0; + + virtual void onHighWatermark(ThreadManager* source) const = 0; }; +class BasicPoolPolicy : public PoolPolicy { + + public: + + BasicPoolPolicy(); + + virtual ~BasicPoolPolicy(); + + virtual void onEmpty(ThreadManager* source) const; + + virtual void onLowWatermark(ThreadManager* source) const; + + virtual void onHighWatermark(ThreadManager* source) const; + + private: + + class Impl; + + Impl* _impl; +}; + /** ThreadManager class This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather @@ -41,9 +69,9 @@ class ThreadManager { public: - ThreadManager(size_t highWatermark=4, size_t lowWatermark=2); + ThreadManager(size_t highWatermark=4, size_t lowWatermark=2) {}; - virtual ~ThreadManager() = 0; + virtual ~ThreadManager() {}; virtual const PoolPolicy* poolPolicy() const = 0; @@ -89,32 +117,13 @@ class ThreadManager { virtual void remove(Runnable* task) = 0; - private: - - size_t _hiwat; - - size_t _lowat; - - size_t _idleCount; - - const PoolPolicy* _poolPolicy;; - - const ThreadFactory* _threadFactory;; + static ThreadManager* newThreadManager(size_t lowWatermark=2, size_t highWatermark=4); class Task; - - friend class Task; - - std::queue _tasks; - - Monitor _monitor; - + class Worker; - friend class Worker; - - std::set _workers; - + class Impl; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc new file mode 100644 index 00000000..bd68264c --- /dev/null +++ b/lib/cpp/src/concurrency/TimerManager.cc @@ -0,0 +1,211 @@ +#include "TimerManager.h" +#include "Util.h" + +#include + +#include + +namespace facebook { namespace thrift { namespace concurrency { + +/** TimerManager class + + @author marc + @version $Id:$ */ + +typedef std::multimap::iterator task_iterator; +typedef std::pair task_range; + +class TimerManager::Task : public Runnable { + +public: + enum STATE { + WAITING, + EXECUTING, + CANCELLED, + COMPLETE + }; + + Task(Runnable* runnable) : + _runnable(runnable), + _state(WAITING) + {} + + ~Task() {}; + + void run() { + if(_state == EXECUTING) { + _runnable->run(); + _state = COMPLETE; + } + } + + private: + + Runnable* _runnable; + + STATE _state; +}; + +class TimerManager::Dispatcher: public Runnable { + + enum STATE { + UNINITIALIZED, + STARTING, + STARTED, + STOPPING, + STOPPED + }; + +public: + Dispatcher(TimerManager* manager) : + _manager(manager), + _state(UNINITIALIZED) + {} + + ~Dispatcher() {} + + /** Dispatcher entry point + + As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */ + + void run() { + + {Synchronized(_manager->_monitor); + + if(_state == STARTING) { + _state = STARTED; + } + } + + do { + + std::set expiredTasks; + + {Synchronized(_manager->_monitor); + + long long now = Util::currentTime(); + + task_iterator expiredTaskEnd; + + while(_state == STARTED && + (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) { + + _manager->_monitor.wait(_manager->_nextTimeout - now); + + } + + if(_state == STARTED) { + + for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { + + TimerManager::Task* task = ix->second; + + expiredTasks.insert(task); + + _manager->_taskCount--; + } + + _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd); + } + } + + for(std::set::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { + + (*ix)->run(); + + delete *ix; + } + + } while(_state == STARTED); + + {Synchronized(_manager->_monitor); + + if(_state == STOPPING) { + + _state = STOPPED; + + _manager->_monitor.notify(); + + } + } + + return; + } + + private: + + TimerManager* _manager; + + friend class TimerManager; + + STATE _state; +}; + +TimerManager::TimerManager() {} + +TimerManager::~TimerManager() {} + +const ThreadFactory* TimerManager::threadFactory() const { + + Synchronized s(_monitor); + + return _threadFactory; +} + +void TimerManager::threadFactory(const ThreadFactory* value) { + + Synchronized s(_monitor); + + _threadFactory = value; +} + +void TimerManager::add(Runnable* task, long long timeout) { + + long long now = Util::currentTime(); + + timeout += now; + + {Synchronized s(_monitor); + + _taskCount++; + + _taskMap.insert(std::pair(timeout, new Task(task))); + + /* If the task map was empty, or if we have an expiration that is earlier than any previously seen, + kick the dispatcher so it can update its timeout */ + + if(_taskCount == 1 || timeout < _nextTimeout) { + + _monitor.notify(); + } + + if(timeout < _nextTimeout) { + + _nextTimeout = timeout; + } + } +} + +void TimerManager::add(Runnable* task, const struct timespec& value) { + + long long expiration; + + Util::toMilliseconds(expiration, value); + + /* XXX + Need to convert this to an explicit exception */ + + long long now = Util::currentTime(); + + assert(expiration < now); + + add(task, expiration - now); +} + + +void TimerManager::remove(Runnable* task) { + +} + +}}} // facebook::thrift::concurrency + diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h new file mode 100644 index 00000000..002460ad --- /dev/null +++ b/lib/cpp/src/concurrency/TimerManager.h @@ -0,0 +1,77 @@ +#if !defined(_concurrency_TimerManager_h_) +#define _concurrency_TimerManager_h_ 1 + +#include "Monitor.h" +#include "Thread.h" + +#include + +#include + +namespace facebook { namespace thrift { namespace concurrency { + +/** Timer Manager + + This class dispatches timer tasks when they fall due. + + @author marc + @version $Id:$ */ + +class TimerManager { + + public: + + TimerManager(); + + virtual ~TimerManager() = 0; + + virtual const ThreadFactory* threadFactory() const = 0; + + virtual void threadFactory(const ThreadFactory* value) = 0; + + virtual size_t taskCount() const = 0; + + /** Adds a task to be executed at some time in the future by a worker thread. + + @param task The task to execute + @param timeout Time in milliseconds to delay before executing task */ + + virtual void add(Runnable* task, long long timeout) = 0; + + /** Adds a task to be executed at some time in the future by a worker thread. + + @param task The task to execute + @param timeout Absolute time in the future to execute task. */ + + virtual void add(Runnable* task, const struct timespec& timeout) = 0; + + /** Removes a pending task */ + + virtual void remove(Runnable* task) = 0; + + private: + + const ThreadFactory* _threadFactory; + + class Task; + + friend class Task; + + std::multimap _taskMap; + + size_t _taskCount; + + long long _nextTimeout; + + Monitor _monitor; + + class Dispatcher; + + friend class Dispatcher; + + Dispatcher* _dispatcher; +}; + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_TimerManager_h_) diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h new file mode 100644 index 00000000..3ab89293 --- /dev/null +++ b/lib/cpp/src/concurrency/Util.h @@ -0,0 +1,67 @@ +#if !defined(_concurrency_Util_h_) +#define _concurrency_Util_h_ 1 + +#include +#include + +namespace facebook { namespace thrift { namespace concurrency { + +/** Utility methods + + This class contains basic utility methods for converting time formats, and other common platform-dependent concurrency operations. + It should not be included in API headers for other concurrency library headers, since it will, by definition, pull in all sorts of + horrid platform dependent crap. Rather it should be inluded directly in concurrency library implementation source. + + @author marc + @version $Id:$ */ + +class Util { + + public: + + /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure + specifying current time plus timeout + + @param struct timespec& current time plus timeout result + @param timeout time to delay in milliseconds */ + + static const void toAbsoluteTimespec(struct timespec& result, long long value) { + + // XXX Darwin doesn't seem to have any readily useable hi-res clock. + + time_t seconds; + + assert(time(&seconds) != (time_t)-1); + + seconds+= (value / 1000); + + long nanoseconds = (value % 1000) * 1000000; + + result.tv_sec = seconds + (nanoseconds / 1000000000); + + result.tv_nsec = nanoseconds % 1000000000; + } + + /** Converts absolute timespec to milliseconds from epoch */ + + static const void toMilliseconds(long long& result, const struct timespec& value) { + + result = value.tv_sec * 1000 + value.tv_nsec * 1000000; + } + + /** Get current time as milliseconds from epoch */ + + static const long long currentTime() { + + time_t now; + + time(&now); + + return (long long)now * 1000; + } +}; + + +}}} // facebook::thrift::concurrency + +#endif // !defined(_concurrency_Util_h_) diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.cc b/lib/cpp/src/concurrency/test/TimerManagerTests.cc new file mode 100644 index 00000000..abd0e95f --- /dev/null +++ b/lib/cpp/src/concurrency/test/TimerManagerTests.cc @@ -0,0 +1,33 @@ +#include +#include + +#include + +namespace facebook { namespace thrift { namespace concurrency { namespace test { + +/** ThreadManagerTests class */ + + +class ThreadManagerTests { + + void init() { + + ThreadManager* threadManager = ThreadManager::newThreadManager(); + + threadManager->poolPolicy(new BasicPoolPolicy()); + + threadManager->threadFactory(new PosixThreadFactory()); + + threadManager->poolPolicy(new BasicPoolPolicy()); + } +}; + + +}}}} // facebook::thrift::concurrency + +int main(int argc, char** argv) { + + return 0; + +} + -- 2.17.1