#include <exception>
#include <Thrift.h>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
class NoSuchTaskException : public facebook::thrift::TException {};
class TimedOutException : public facebook::thrift::TException {};
+class TooManyPendingTasksException : public facebook::thrift::TException {};
+
+class SystemResourceException : public facebook::thrift::TException {
+public:
+ SystemResourceException() {}
+
+ SystemResourceException(const std::string& message) :
+ TException(message) {}
+};
+
}}} // facebook::thrift::concurrency
#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
-#include "Monitor.h"
-#include "Exception.h"
+#include "Monitor.h"
+#include "Exception.h"
#include "Util.h"
#include <assert.h>
#include <pthread.h>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
/**
* Monitor implementation using the POSIX pthread library
- *
+ *
* @author marc
* @version $Id:$
*/
Impl() :
mutexInitialized_(false),
condInitialized_(false) {
-
- try {
- int ret = pthread_mutex_init(&pthread_mutex_, NULL);
- assert(ret == 0);
+
+ if(pthread_mutex_init(&pthread_mutex_, NULL) == 0) {
mutexInitialized_ = true;
- ret = pthread_cond_init(&pthread_cond_, NULL);
- assert(ret == 0);
- condInitialized_ = true;
- } catch(...) {
+
+ if(pthread_cond_init(&pthread_cond_, NULL) == 0) {
+ condInitialized_ = true;
+ }
+ }
+
+ if(!mutexInitialized_ || !condInitialized_) {
cleanup();
+ throw SystemResourceException();
}
}
&abstime);
if (result == ETIMEDOUT) {
assert(Util::currentTime() >= (now + timeout));
+ throw TimedOutException();
}
}
}
#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
-namespace facebook { namespace thrift { namespace concurrency {
+#include "Exception.h"
+
+namespace facebook { namespace thrift { namespace concurrency {
/**
* A monitor is a combination mutex and condition-event. Waiting and
class Synchronized {
public:
-
+
Synchronized(const Monitor& value) :
monitor_(value) {
monitor_.lock();
// http://developers.facebook.com/thrift/
#include "PosixThreadFactory.h"
+#include "Exception.h"
#include <assert.h>
#include <pthread.h>
using namespace boost;
/**
- * The POSIX thread class.
+ * The POSIX thread class.
*
* @author marc
* @version $Id:$
weak_ptr<PthreadThread> self_;
public:
-
- PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
+
+ PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
pthread_(0),
- state_(uninitialized),
+ state_(uninitialized),
policy_(policy),
priority_(priority),
stackSize_(stackSize) {
state_ = starting;
pthread_attr_t thread_attr;
- int ret = pthread_attr_init(&thread_attr);
- assert(ret == 0);
+ if(pthread_attr_init(&thread_attr) != 0) {
+ throw SystemResourceException("pthread_attr_init failed");
+ }
- ret = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
- assert(ret == 0);
+ if(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
+ }
// Set thread stack size
- ret = pthread_attr_setstacksize(&thread_attr, MB * stackSize_);
- assert(ret == 0);
+ if(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
+ throw SystemResourceException("pthread_attr_setstacksize failed");
+ }
// Set thread policy
- ret = pthread_attr_setschedpolicy(&thread_attr, policy_);
- assert(ret == 0);
+ if(pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
+ throw SystemResourceException("pthread_attr_setschedpolicy failed");
+ }
struct sched_param sched_param;
sched_param.sched_priority = priority_;
// Set thread priority
- ret = pthread_attr_setschedparam(&thread_attr, &sched_param);
- assert(ret == 0);
+ if(pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
+ throw SystemResourceException("pthread_attr_setschedparam failed");
+ }
// Create reference
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = self_.lock();
- ret = pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);
- assert(ret == 0);
+
+ if(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
+ throw SystemResourceException("pthread_create failed");
+ }
}
void join() {
}
}
+ id_t id() {
+ return pthread_;
+ }
+
shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
if (thread->state_ != stopping && thread->state_ != stopped) {
thread->state_ = stopping;
}
-
+
return (void*)0;
}
public:
- Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
policy_(policy),
priority_(priority),
stackSize_(stackSize),
detached_(detached) {}
/**
- * Creates a new POSIX thread to run the runnable object
+ * Creates a new POSIX thread to run the runnable object
*
* @param runnable A runnable object
*/
PRIORITY priority() const { return priority_; }
+ Thread::id_t currentThreadId() const {return pthread_self();}
+
/**
* Sets priority.
*
void priority(PRIORITY value) { priority_ = value; }
};
-PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
+Thread::id_t PosixThreadFactory::currentThreadId() const {return impl_->currentThreadId();}
+
}}} // facebook::thrift::concurrency
// From ThreadFactory;
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+ // From ThreadFactory;
+ Thread::id_t currentThreadId() const;
+
/**
* Sets stack size for created threads
*
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
class Thread;
/**
- * Minimal runnable class. More or less analogous to java.lang.Runnable.
+ * Minimal runnable class. More or less analogous to java.lang.Runnable.
*
* @author marc
* @version $Id:$
};
/**
- * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
* and ready to start execution. More or less analogous to java.lang.Thread
* (minus all the thread group, priority, mode and other baggage, since that
* is difficult to abstract across platforms and is left for platform-specific
* @see facebook::thrift::concurrency::ThreadFactory)
*/
class Thread {
-
+
public:
+
+ typedef unsigned long long id_t;
+
virtual ~Thread() {};
/**
*/
virtual void join() = 0;
+ /**
+ * Gets the thread's platform-specific ID
+ */
+ virtual id_t id() = 0;
+
/**
* Gets the runnable object this thread is hosting
*/
private:
boost::shared_ptr<Runnable> _runnable;
+
};
/**
public:
virtual ~ThreadFactory() {}
virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+
+ /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */
+
+ static const Thread::id_t unknown_thread_id;
+
+ virtual Thread::id_t currentThreadId() const = 0;
};
}}} // facebook::thrift::concurrency
#include <iostream>
#endif //defined(DEBUG)
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
using namespace boost;
-
/**
* ThreadManager class
- *
+ *
* This class manages a pool of threads. It uses a ThreadFactory to create
* threads. It never actually creates or destroys worker threads, rather
* it maintains statistics on number of idle threads, number of active threads,
class ThreadManager::Impl : public ThreadManager {
public:
- Impl() :
+ Impl() :
workerCount_(0),
workerMaxCount_(0),
idleCount_(0),
+ pendingTaskCountMax_(0),
state_(ThreadManager::UNINITIALIZED) {}
~Impl() { stop(); }
}
shared_ptr<ThreadFactory> threadFactory() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return threadFactory_;
}
-
- void threadFactory(shared_ptr<ThreadFactory> value) {
+
+ void threadFactory(shared_ptr<ThreadFactory> value) {
Synchronized s(monitor_);
threadFactory_ = value;
}
void addWorker(size_t value);
-
+
void removeWorker(size_t value);
-
+
size_t idleWorkerCount() const {
return idleCount_;
}
size_t workerCount() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return workerCount_;
}
-
+
size_t pendingTaskCount() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return tasks_.size();
}
size_t totalTaskCount() const {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
return tasks_.size() + workerCount_ - idleCount_;
}
-
- void add(shared_ptr<Runnable> value);
+
+ size_t pendingTaskCountMax() const {
+ Synchronized s(monitor_);
+ return pendingTaskCountMax_;
+ }
+
+ void pendingTaskCountMax(const size_t value) {
+ Synchronized s(monitor_);
+ pendingTaskCountMax_ = value;
+ }
+
+ bool canSleep();
+
+ void add(shared_ptr<Runnable> value, long long timeout);
void remove(shared_ptr<Runnable> task);
size_t workerCount_;
size_t workerMaxCount_;
size_t idleCount_;
+ size_t pendingTaskCountMax_;
+
ThreadManager::STATE state_;
shared_ptr<ThreadFactory> threadFactory_;
friend class ThreadManager::Worker;
std::set<shared_ptr<Thread> > workers_;
std::set<shared_ptr<Thread> > deadWorkers_;
+ std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
};
class ThreadManager::Task : public Runnable {
};
public:
- Worker(ThreadManager::Impl* manager) :
+ Worker(ThreadManager::Impl* manager) :
manager_(manager),
state_(UNINITIALIZED),
idle_(false) {}
* execute.
*/
void run() {
- bool active = false;
+ bool active = false;
bool notifyManager = false;
/**
if (task->state_ == ThreadManager::Task::WAITING) {
task->state_ = ThreadManager::Task::EXECUTING;
}
+
+ /* If we have a pending task max and we just dropped below it, wakeup any
+ thread that might be blocked on add. */
+ if(manager_->pendingTaskCountMax_ != 0 &&
+ manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
+ manager_->workerMonitor_.notify();
+ }
}
} else {
idle_ = true;
notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
}
}
-
+
if (task != NULL) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
try {
}
}
}
-
+
{
- Synchronized s(manager_->workerMonitor_);
+ Synchronized s(manager_->workerMonitor_);
manager_->deadWorkers_.insert(this->thread());
if (notifyManager) {
manager_->workerMonitor_.notify();
}
}
-
+
return;
}
-
+
private:
ThreadManager::Impl* manager_;
friend class ThreadManager::Impl;
void ThreadManager::Impl::addWorker(size_t value) {
std::set<shared_ptr<Thread> > newThreads;
for (size_t ix = 0; ix < value; ix++) {
- class ThreadManager::Worker;
+ class ThreadManager::Worker;
shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(threadFactory_->newThread(worker));
}
workerMaxCount_ += value;
workers_.insert(newThreads.begin(), newThreads.end());
}
-
+
for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start();
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->id(), *ix));
}
{
- Synchronized s(workerMonitor_);
+ Synchronized s(workerMonitor_);
while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait();
}
}
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (state_ == ThreadManager::UNINITIALIZED) {
if (threadFactory_ == NULL) {
throw InvalidArgumentException();
}
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (state_ != ThreadManager::STOPPING &&
state_ != ThreadManager::JOINING &&
state_ != ThreadManager::STOPPED) {
removeWorker(workerCount_);
}
- // XXX
+ // XXX
// should be able to block here for transition to STOPPED since we're no
// using shared_ptrs
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
state_ = ThreadManager::STOPPED;
}
void ThreadManager::Impl::removeWorker(size_t value) {
std::set<shared_ptr<Thread> > removedThreads;
{
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (value > workerMaxCount_) {
throw InvalidArgumentException();
}
}
{
- Synchronized s(workerMonitor_);
+ Synchronized s(workerMonitor_);
while (workerCount_ != workerMaxCount_) {
workerMonitor_.wait();
for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
workers_.erase(*ix);
+ idMap_.erase((*ix)->id());
}
-
+
deadWorkers_.clear();
}
}
-
-void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
- Synchronized s(monitor_);
+
+ bool ThreadManager::Impl::canSleep() {
+ const Thread::id_t id = threadFactory_->currentThreadId();
+ return idMap_.find(id) == idMap_.end();
+ }
+
+ void ThreadManager::Impl::add(shared_ptr<Runnable> value, long long timeout) {
+ Synchronized s(monitor_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
+ if(pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+
+ if(canSleep()) {
+
+ while(pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+ monitor_.wait(timeout);
+ }
+ } else {
+ throw TooManyPendingTasksException();
+ }
+ }
+
tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
// If idle thread is available notify it, otherwise all worker threads are
}
void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
- Synchronized s(monitor_);
+ Synchronized s(monitor_);
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();
}
class SimpleThreadManager : public ThreadManager::Impl {
public:
- SimpleThreadManager(size_t workerCount=4) :
+ SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
workerCount_(workerCount),
+ pendingTaskCountMax_(pendingTaskCountMax),
firstTime_(true) {
}
void start() {
+ ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
ThreadManager::Impl::start();
addWorker(workerCount_);
}
private:
const size_t workerCount_;
+ const size_t pendingTaskCountMax_;
bool firstTime_;
Monitor monitor_;
};
return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
-shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
- return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
}
}}} // facebook::thrift::concurrency
+
#include <sys/types.h>
#include "Thread.h"
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
/**
* Thread Pool Manager and related classes
* Stops the thread manager. Aborts all remaining unprocessed task, shuts
* down all created worker threads, and realeases all allocated resources.
* This method blocks for all worker threads to complete, thus it can
- * potentially block forever if a worker thread is running a task that
+ * potentially block forever if a worker thread is running a task that
* won't terminate.
*/
virtual void stop() = 0;
STOPPING,
STOPPED
};
-
+
virtual const STATE state() const = 0;
virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
virtual size_t totalTaskCount() const = 0;
/**
- * Adds a task to be execued at some time in the future by a worker thread.
+ * Gets the maximum pending task count. 0 indicates no maximum
+ */
+ virtual size_t pendingTaskCountMax() const = 0;
+
+ /**
+ * Adds a task to be executed at some time in the future by a worker thread.
*
- * @param value The task to run
+ * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
+ * is greater than or equalt to pendingTaskCountMax(). If this method is called in the
+ * context of a ThreadManager worker thread it will throw a
+ * TooManyPendingTasksException
+ *
+ * @param task The task to queue for execution
+ *
+ * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
+ * is specified
+ *
+ * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
*/
- virtual void add(boost::shared_ptr<Runnable>value) = 0;
+ virtual void add(boost::shared_ptr<Runnable>task, long long timeout=0LL) = 0;
/**
* Removes a pending task
static boost::shared_ptr<ThreadManager> newThreadManager();
/**
- * Creates a simple thread manager the uses count number of worker threads
+ * Creates a simple thread manager the uses count number of worker threads and has
+ * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
+ * on pending tasks
*/
- static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4);
+ static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4, size_t pendingTaskCountMax=0);
class Task;
-
+
class Worker;
class Impl;
timeout = manager_->taskMap_.begin()->first - now;
}
assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
- manager_->monitor_.wait(timeout);
+ try {
+ manager_->monitor_.wait(timeout);
+ } catch(TimedOutException& e) {}
now = Util::currentTime();
}
if (runAll || args[0].compare("thread-factory") == 0) {
ThreadFactoryTests threadFactoryTests;
-
+
std::cout << "ThreadFactory tests..." << std::endl;
-
+
size_t count = 1000;
std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
time00 = Util::currentTime();
time01 = time00;
size_t count = 0;
-
+
while (time01 < time00 + 10) {
count++;
time01 = Util::currentTime();
ThreadManagerTests threadManagerTests;
assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+
+ std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl;
+
+ assert(threadManagerTests.blockTest(delay, workerCount));
+
}
}
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
+#include <config.h>
#include <concurrency/Thread.h>
#include <concurrency/PosixThreadFactory.h>
#include <concurrency/Monitor.h>
using namespace facebook::thrift::concurrency;
/**
- * ThreadManagerTests class
+ * ThreadManagerTests class
*
* @author marc
* @version $Id:$
public:
static const double ERROR;
-
+
class Task: public Runnable {
public:
* Reap N threads
*/
class ReapNTask: public Runnable {
-
+
public:
-
+
ReapNTask(Monitor& monitor, int& activeCount) :
_monitor(monitor),
_count(activeCount) {}
-
+
void run() {
Synchronized s(_monitor);
-
+
_count--;
-
+
//std::cout << "\t\t\tthread count: " << _count << std::endl;
-
+
if (_count == 0) {
_monitor.notify();
}
class SynchStartTask: public Runnable {
public:
-
+
enum STATE {
UNINITIALIZED,
STARTING,
bool synchStartTest() {
Monitor monitor;
-
+
SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
-
+
shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
PosixThreadFactory threadFactory = PosixThreadFactory();
{
Synchronized s(monitor);
- monitor.wait(100);
+ try {
+ monitor.wait(100);
+ } catch(TimedOutException& e) {
+ }
if (state == SynchStartTask::STARTED) {
monitor.notify();
}
-
+
while (state == SynchStartTask::STOPPING) {
monitor.wait();
}
for (size_t ix = 0; ix < count; ix++) {
{
Synchronized s(monitor);
- monitor.wait(timeout);
+ try {
+ monitor.wait(timeout);
+ } catch(TimedOutException& e) {
+ }
}
}
using namespace facebook::thrift::concurrency;
/**
- * ThreadManagerTests class
+ * ThreadManagerTests class
*
* @author marc
* @version $Id:$
class Task: public Runnable {
public:
-
- Task(Monitor& monitor, size_t& count, long long timeout) :
+
+ Task(Monitor& monitor, size_t& count, long long timeout) :
_monitor(monitor),
_count(count),
_timeout(timeout),
{
Synchronized s(_sleep);
- _sleep.wait(_timeout);
+ try {
+ _sleep.wait(_timeout);
+ } catch(TimedOutException& e) {
+ ;
+ }catch(...) {
+ assert(0);
+ }
}
_endTime = Util::currentTime();
_done = true;
-
+
{
Synchronized s(_monitor);
// std::cout << "Thread " << _count << " completed " << std::endl;
-
+
_count--;
if (_count == 0) {
-
+
_monitor.notify();
}
}
}
-
+
Monitor& _monitor;
size_t& _count;
long long _timeout;
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadFactory->priority(PosixThreadFactory::HIGHEST);
-
+
threadManager->threadFactory(threadFactory);
threadManager->start();
-
+
std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
for (size_t ix = 0; ix < count; ix++) {
Synchronized s(monitor);
while(activeCount > 0) {
-
+
monitor.wait();
}
}
long long maxTime = 0;
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
-
+
shared_ptr<ThreadManagerTests::Task> task = *ix;
long long delta = task->_endTime - task->_startTime;
averageTime+= delta;
}
-
+
averageTime /= count;
std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
return success;
}
+
+ class BlockTask: public Runnable {
+
+ public:
+
+ BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+ _monitor(monitor),
+ _bmonitor(bmonitor),
+ _count(count) {}
+
+ void run() {
+ {
+ Synchronized s(_bmonitor);
+
+ _bmonitor.wait();
+
+ }
+
+ {
+ Synchronized s(_monitor);
+
+ _count--;
+
+ if (_count == 0) {
+
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+ Monitor& _bmonitor;
+ size_t& _count;
+ };
+
+ /**
+ * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
+ * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
+
+ bool blockTest(long long timeout=100LL, size_t workerCount=2) {
+
+ bool success = false;
+
+ try {
+
+ Monitor bmonitor;
+ Monitor monitor;
+
+ size_t pendingTaskMaxCount = workerCount;
+
+ size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+ threadFactory->priority(PosixThreadFactory::HIGHEST);
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+ for (size_t ix = 0; ix < workerCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+ }
+
+ for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+ }
+
+ for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ threadManager->add(*ix);
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+ try {
+ threadManager->add(extraTask, 1);
+ throw TException("Unexpected success adding task in excess of pending task count");
+ } catch(TimedOutException& e) {
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[0] != 0) {
+ monitor.wait();
+ }
+ }
+
+ std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ try {
+ threadManager->add(extraTask, 1);
+ } catch(TimedOutException& e) {
+ std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
+ throw TException("Unexpected timeout adding task");
+
+ } catch(TooManyPendingTasksException& e) {
+ std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+ throw TException("Unexpected timeout adding task");
+ }
+
+ // Wake up tasks that were pending before and wait for them to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[1] != 0) {
+ monitor.wait();
+ }
+ }
+
+ // Wake up the extra task and wait for it to complete
+
+ {
+ Synchronized s(bmonitor);
+
+ bmonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(monitor);
+
+ while(activeCounts[2] != 0) {
+ monitor.wait();
+ }
+ }
+
+ if(!(success = (threadManager->totalTaskCount() == 0))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ } catch(TException& e) {
+ }
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+ return success;
+ }
};
const double ThreadManagerTests::ERROR = .20;
shared_ptr<ThreadManager> threadManager) :
TServer(processor, serverTransport, transportFactory, protocolFactory),
threadManager_(threadManager),
- stop_(false) {}
+ stop_(false), timeout_(0) {}
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory),
threadManager_(threadManager),
- stop_(false) {}
+ stop_(false), timeout_(0) {}
TThreadPoolServer::~TThreadPoolServer() {}
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)), timeout_);
} catch (TTransportException& ttx) {
if (inputTransport != NULL) { inputTransport->close(); }
}
+long long TThreadPoolServer::timeout() const {return timeout_;}
+void TThreadPoolServer::timeout(long long value) {timeout_ = value;}
+
}}} // facebook::thrift::server
virtual ~TThreadPoolServer();
virtual void serve();
+
+ virtual long long timeout() const;
+ virtual void timeout(long long value);
virtual void stop() {
stop_ = true;
boost::shared_ptr<ThreadManager> threadManager_;
volatile bool stop_;
+
+ volatile long long timeout_;
};
Monitor m;
for (int i = 0; i < seconds; ++i) {
fprintf(stderr, "Thread %d: sleep %d\n", thread, i);
- m.wait(1000);
+ try {
+ m.wait(1000);
+ } catch(TimedOutException& e) {
+ }
}
fprintf(stderr, "THREAD %d DONE\n", thread);
}