~PthreadThread() {
/* Nothing references this thread, if is is not detached, do a join
- now, otherwise the thread-id and, possibly, other resources will
+ now, otherwise the thread-id and, possibly, other resources will
be leaked. */
if(!detached_) {
try {
join();
} catch(...) {
- // We're really hosed.
+ // We're really hosed.
}
}
}
throw SystemResourceException("pthread_attr_init failed");
}
- if(pthread_attr_setdetachstate(&thread_attr,
- detached_ ?
- PTHREAD_CREATE_DETACHED :
+ if(pthread_attr_setdetachstate(&thread_attr,
+ detached_ ?
+ PTHREAD_CREATE_DETACHED :
PTHREAD_CREATE_JOINABLE) != 0) {
throw SystemResourceException("pthread_attr_setdetachstate failed");
}
void join() {
if (!detached_ && state_ != uninitialized) {
void* ignore;
- pthread_join(pthread_, &ignore);
- detached_ = true;
+ /* XXX
+ If join fails it is most likely due to the fact
+ that the last reference was the thread itself and cannot
+ join. This results in leaked threads and will eventually
+ cause the process to run out of thread resources.
+ We're beyond the point of throwing an exception. Not clear how
+ best to handle this. */
+ detached_ = pthread_join(pthread_, &ignore) == 0;
}
}
- id_t id() {
+ id_t getId() {
return static_cast<id_t>(pthread_);
}
return result;
}
- int stackSize() const { return stackSize_; }
+ int getStackSize() const { return stackSize_; }
- void stackSize(int value) { stackSize_ = value; }
+ void setStackSize(int value) { stackSize_ = value; }
- PRIORITY priority() const { return priority_; }
-
- Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
+ PRIORITY getPriority() const { return priority_; }
/**
* Sets priority.
* XXX
* Need to handle incremental priorities properly.
*/
- void priority(PRIORITY value) { priority_ = value; }
+ void setPriority(PRIORITY value) { priority_ = value; }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const {return static_cast<id_t>(pthread_self());}
+
};
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
-int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
+int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
+
+void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
-void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
-PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
+bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
-void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
+void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
-Thread::id_t PosixThreadFactory::currentThreadId() const { return impl_->currentThreadId(); }
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
}}} // facebook::thrift::concurrency
#include <boost/shared_ptr.hpp>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
/**
- * A thread factory to create posix threads
+ * A thread factory to create posix threads
*
* @author marc
* @version $Id:$
DECREMENT = 8
};
- /**
- * Posix thread (pthread) factory. All threads created by a factory are reference-counted
- * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ /**
+ * Posix thread (pthread) factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
* the Runnable tasks they host will be properly cleaned up once the last strong reference
* to both is given up.
*
* Threads are created with the specified policy, priority, stack-size and detachable-mode
- * detached means the thread is free-running and will release all system resources the
- * when it completes. A detachable thread is not joinable. The join method
- * of a detachable thread will return immediately with no error.
+ * detached means the thread is free-running and will release all system resources the
+ * when it completes. A detachable thread is not joinable. The join method
+ * of a detachable thread will return immediately with no error.
*
* Joinable threads will detach themselves iff they were not explicitly joined and
* there are no remaining strong references to the thread. This guarantees that
- * joinnable threads don't leak resources even when the application neglects to
+ * joinnable threads don't leak resources even when the application neglects to
* call join explicitly.
*
* By default threads are joinable.
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
// From ThreadFactory;
- Thread::id_t currentThreadId() const;
+ Thread::id_t getCurrentThreadId() const;
+
+ /**
+ * Gets stack size for created threads
+ *
+ * @return int size in megabytes
+ */
+ virtual int getStackSize() const;
/**
* Sets stack size for created threads
*
* @param value size in megabytes
*/
- virtual void stackSize(int value);
+ virtual void setStackSize(int value);
/**
- * Gets stack size for created threads
- *
- * @return int size in megabytes
+ * Gets priority relative to current policy
*/
- virtual int stackSize() const;
+ virtual PRIORITY getPriority() const;
/**
* Sets priority relative to current policy
*/
- virtual void priority(PRIORITY priority);
+ virtual void setPriority(PRIORITY priority);
/**
- * Gets priority relative to current policy
+ * Sets detached mode of threads
*/
- virtual PRIORITY priority() const;
-
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
private:
class Impl;
boost::shared_ptr<Impl> impl_;
/**
* Gets the thread's platform-specific ID
*/
- virtual id_t id() = 0;
+ virtual id_t getId() = 0;
/**
* Gets the runnable object this thread is hosting
static const Thread::id_t unknown_thread_id;
- virtual Thread::id_t currentThreadId() const = 0;
+ virtual Thread::id_t getCurrentThreadId() const = 0;
};
}}} // facebook::thrift::concurrency
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start();
- idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->id(), *ix));
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
}
{
for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
workers_.erase(*ix);
- idMap_.erase((*ix)->id());
+ idMap_.erase((*ix)->getId());
}
deadWorkers_.clear();
}
bool ThreadManager::Impl::canSleep() {
- const Thread::id_t id = threadFactory_->currentThreadId();
+ const Thread::id_t id = threadFactory_->getCurrentThreadId();
return idMap_.find(id) == idMap_.end();
}
std::cout << "ThreadFactory tests..." << std::endl;
size_t count = 1000;
+ size_t floodLoops = 1;
+ size_t floodCount = 100000;
std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
assert(threadFactoryTests.reapNThreads(count));
+ std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+ assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
assert(threadFactoryTests.synchStartTest());
}
}
}
-
throw e;
}
}
-
+
tix = 0;
for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
throw e;
}
}
-
+
{
Synchronized s(*monitor);
while (*activeCount > 0) {
monitor->wait(1000);
}
}
-
+
for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
threads.erase(*thread);
}
return success;
}
+
+
+ class FloodTask : public Runnable {
+ public:
+
+ FloodTask(const size_t id) :_id(id) {}
+ ~FloodTask(){
+ if(_id % 1000 == 0) {
+ std::cout << "\t\tthread " << _id << " done" << std::endl;
+ }
+ }
+
+ void run(){
+ if(_id % 1000 == 0) {
+ std::cout << "\t\tthread " << _id << " started" << std::endl;
+ }
+
+ usleep(1);
+ }
+ const size_t _id;
+ };
+
+ void foo(PosixThreadFactory *tf) {
+ }
+
+ bool floodNTest(size_t loop=1, size_t count=100000) {
+
+ bool success = false;
+
+ for(size_t lix = 0; lix < loop; lix++) {
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+ threadFactory.setDetached(true);
+
+ for(size_t tix = 0; tix < count; tix++) {
+
+ try {
+
+ shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+ thread->start();
+
+ usleep(1);
+
+ } catch (TException& e) {
+
+ std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl;
+
+ return success;
+ }
+ }
+
+ std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+ success = true;
+ }
+
+ return success;
+ }
};
const double ThreadFactoryTests::ERROR = .20;
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
- threadFactory->priority(PosixThreadFactory::HIGHEST);
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
threadManager->threadFactory(threadFactory);
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
- threadFactory->priority(PosixThreadFactory::HIGHEST);
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
threadManager->threadFactory(threadFactory);