int priority_;
int stackSize_;
weak_ptr<PthreadThread> self_;
+ bool detached_;
public:
- PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
+ PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
pthread_(0),
state_(uninitialized),
policy_(policy),
priority_(priority),
- stackSize_(stackSize) {
+ stackSize_(stackSize),
+ detached_(detached) {
this->Thread::runnable(runnable);
}
- ~PthreadThread() {}
+ ~PthreadThread() {
+ /* Nothing references this thread, if is is not detached, do a join
+ now, otherwise the thread-id and, possibly, other resources will
+ be leaked. */
+ if(!detached_) {
+ try {
+ join();
+ } catch(...) {
+ // We're really hosed.
+ }
+ }
+ }
void start() {
if (state_ != uninitialized) {
return;
}
- state_ = starting;
-
pthread_attr_t thread_attr;
if (pthread_attr_init(&thread_attr) != 0) {
throw SystemResourceException("pthread_attr_init failed");
}
- if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
- throw SystemResourceException("pthread_attr_setdetachstate failed");
+ if(pthread_attr_setdetachstate(&thread_attr,
+ detached_ ?
+ PTHREAD_CREATE_DETACHED :
+ PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
}
// Set thread stack size
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = self_.lock();
+ state_ = starting;
+
if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
throw SystemResourceException("pthread_create failed");
}
}
void join() {
- if (state_ != stopped) {
+ if (!detached_ && state_ != uninitialized) {
void* ignore;
pthread_join(pthread_, &ignore);
+ detached_ = true;
}
}
id_t id() {
- return pthread_;
+ return static_cast<id_t>(pthread_);
}
shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
* @param runnable A runnable object
*/
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
result->weakRef(result);
runnable->thread(result);
return result;
PRIORITY priority() const { return priority_; }
- Thread::id_t currentThreadId() const { return pthread_self(); }
+ Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
/**
* Sets priority.
DECREMENT = 8
};
+ /**
+ * Posix thread (pthread) factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * Threads are created with the specified policy, priority, stack-size and detachable-mode
+ * detached means the thread is free-running and will release all system resources the
+ * when it completes. A detachable thread is not joinable. The join method
+ * of a detachable thread will return immediately with no error.
+ *
+ * Joinable threads will detach themselves iff they were not explicitly joined and
+ * there are no remaining strong references to the thread. This guarantees that
+ * joinnable threads don't leak resources even when the application neglects to
+ * call join explicitly.
+ *
+ * By default threads are joinable.
+ */
+
PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
// From ThreadFactory;
state_(WAITING) {}
~Task() {
- //debug
- std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
}
void run() {
Dispatcher(TimerManager* manager) :
manager_(manager) {}
- ~Dispatcher() {
- // debug
- std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
- }
+ ~Dispatcher() {}
/**
* Dispatcher entry point
// If we haven't been explicitly stopped, do so now. We don't need to grab
// the monitor here, since stop already takes care of reentrancy.
- std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
if (state_ != STOPPED) {
try {
stop();
} catch(...) {
- std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
throw;
// uhoh
}
*/
bool helloWorldTest() {
- PosixThreadFactory threadFactory = PosixThreadFactory();
+ PosixThreadFactory threadFactory = PosixThreadFactory();
shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
int& _count;
};
- bool reapNThreads(int count=10) {
-
- Monitor* monitor = new Monitor();
-
- int* activeCount = new int(count);
+ bool reapNThreads(int loop=1, int count=10) {
PosixThreadFactory threadFactory = PosixThreadFactory();
- std::set<shared_ptr<Thread> > threads;
+ Monitor* monitor = new Monitor();
- for (int ix = 0; ix < count; ix++) {
- threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
- }
+ for(int lix = 0; lix < loop; lix++) {
- for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ int* activeCount = new int(count);
- (*thread)->start();
- }
+ std::set<shared_ptr<Thread> > threads;
+ int tix;
- {
- Synchronized s(*monitor);
- while (*activeCount > 0) {
- monitor->wait(1000);
+ for (tix = 0; tix < count; tix++) {
+ try {
+ threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+ } catch(SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+ throw e;
+ }
+ }
+
+ tix = 0;
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
+
+ try {
+ (*thread)->start();
+ } catch(SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl;
+ throw e;
+ }
+ }
+
+ {
+ Synchronized s(*monitor);
+ while (*activeCount > 0) {
+ monitor->wait(1000);
+ }
+ }
+
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ threads.erase(*thread);
}
- }
- for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
- threads.erase(*thread);
+ std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
}
std::cout << "\t\t\tSuccess!" << std::endl;