From: Marc Slemko Date: Mon, 4 Jun 2007 21:01:19 +0000 (+0000) Subject: Modified PosixThreadFactory::PThread: X-Git-Tag: 0.2.0~1343 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=67606e5d7207874d003bc9ba433edd66d74662f0;p=common%2Fthrift.git Modified PosixThreadFactory::PThread: Pay attention to detached flags. If thread is create non-detached and has not been joined when all references are given up, (ie boost::share_ptr calls ~PThread) do the join in the destructor to prevent thread ids from being leaked. Modified ThreadFactoryTests.reapNThreads: Loop M times for M threads where M x N is bigger than 32K to verify that thread ids aren't leaked Modified TimerManager.cpp: Removed debug messages. Reviewed By: mcslee Revert Plan: revertible Test Plan: concurrency_test thread-factory passes git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665129 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index 5f86ea27..2e8ed474 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -46,35 +46,49 @@ class PthreadThread: public Thread { int priority_; int stackSize_; weak_ptr self_; + bool detached_; public: - PthreadThread(int policy, int priority, int stackSize, shared_ptr runnable) : + PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr runnable) : pthread_(0), state_(uninitialized), policy_(policy), priority_(priority), - stackSize_(stackSize) { + stackSize_(stackSize), + detached_(detached) { this->Thread::runnable(runnable); } - ~PthreadThread() {} + ~PthreadThread() { + /* Nothing references this thread, if is is not detached, do a join + now, otherwise the thread-id and, possibly, other resources will + be leaked. */ + if(!detached_) { + try { + join(); + } catch(...) { + // We're really hosed. + } + } + } void start() { if (state_ != uninitialized) { return; } - state_ = starting; - pthread_attr_t thread_attr; if (pthread_attr_init(&thread_attr) != 0) { throw SystemResourceException("pthread_attr_init failed"); } - if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) { - throw SystemResourceException("pthread_attr_setdetachstate failed"); + if(pthread_attr_setdetachstate(&thread_attr, + detached_ ? + PTHREAD_CREATE_DETACHED : + PTHREAD_CREATE_JOINABLE) != 0) { + throw SystemResourceException("pthread_attr_setdetachstate failed"); } // Set thread stack size @@ -99,20 +113,23 @@ class PthreadThread: public Thread { shared_ptr* selfRef = new shared_ptr(); *selfRef = self_.lock(); + state_ = starting; + if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) { throw SystemResourceException("pthread_create failed"); } } void join() { - if (state_ != stopped) { + if (!detached_ && state_ != uninitialized) { void* ignore; pthread_join(pthread_, &ignore); + detached_ = true; } } id_t id() { - return pthread_; + return static_cast(pthread_); } shared_ptr runnable() const { return Thread::runnable(); } @@ -211,7 +228,7 @@ class PosixThreadFactory::Impl { * @param runnable A runnable object */ shared_ptr newThread(shared_ptr runnable) const { - shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable)); + shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable)); result->weakRef(result); runnable->thread(result); return result; @@ -223,7 +240,7 @@ class PosixThreadFactory::Impl { PRIORITY priority() const { return priority_; } - Thread::id_t currentThreadId() const { return pthread_self(); } + Thread::id_t currentThreadId() const {return static_cast(pthread_self());} /** * Sets priority. diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index 5f032c1c..16be14da 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -52,6 +52,25 @@ class PosixThreadFactory : public ThreadFactory { DECREMENT = 8 }; + /** + * Posix thread (pthread) factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + * the Runnable tasks they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * Threads are created with the specified policy, priority, stack-size and detachable-mode + * detached means the thread is free-running and will release all system resources the + * when it completes. A detachable thread is not joinable. The join method + * of a detachable thread will return immediately with no error. + * + * Joinable threads will detach themselves iff they were not explicitly joined and + * there are no remaining strong references to the thread. This guarantees that + * joinnable threads don't leak resources even when the application neglects to + * call join explicitly. + * + * By default threads are joinable. + */ + PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false); // From ThreadFactory; diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp index 2134e37f..b9e604cc 100644 --- a/lib/cpp/src/concurrency/TimerManager.cpp +++ b/lib/cpp/src/concurrency/TimerManager.cpp @@ -40,8 +40,6 @@ class TimerManager::Task : public Runnable { state_(WAITING) {} ~Task() { - //debug - std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; } void run() { @@ -64,10 +62,7 @@ class TimerManager::Dispatcher: public Runnable { Dispatcher(TimerManager* manager) : manager_(manager) {} - ~Dispatcher() { - // debug - std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; - } + ~Dispatcher() {} /** * Dispatcher entry point @@ -148,13 +143,11 @@ TimerManager::~TimerManager() { // If we haven't been explicitly stopped, do so now. We don't need to grab // the monitor here, since stop already takes care of reentrancy. - std::cerr << "TimerManager::dtor[" << this << "]" << std::endl; if (state_ != STOPPED) { try { stop(); } catch(...) { - std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl; throw; // uhoh } diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h index f7d607a3..4cd6bd52 100644 --- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -47,7 +47,7 @@ public: */ bool helloWorldTest() { - PosixThreadFactory threadFactory = PosixThreadFactory(); + PosixThreadFactory threadFactory = PosixThreadFactory(); shared_ptr task = shared_ptr(new ThreadFactoryTests::Task()); @@ -90,35 +90,52 @@ public: int& _count; }; - bool reapNThreads(int count=10) { - - Monitor* monitor = new Monitor(); - - int* activeCount = new int(count); + bool reapNThreads(int loop=1, int count=10) { PosixThreadFactory threadFactory = PosixThreadFactory(); - std::set > threads; + Monitor* monitor = new Monitor(); - for (int ix = 0; ix < count; ix++) { - threads.insert(threadFactory.newThread(shared_ptr(new ReapNTask(*monitor, *activeCount)))); - } + for(int lix = 0; lix < loop; lix++) { - for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + int* activeCount = new int(count); - (*thread)->start(); - } + std::set > threads; + int tix; - { - Synchronized s(*monitor); - while (*activeCount > 0) { - monitor->wait(1000); + for (tix = 0; tix < count; tix++) { + try { + threads.insert(threadFactory.newThread(shared_ptr(new ReapNTask(*monitor, *activeCount)))); + } catch(SystemResourceException& e) { + std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl; + throw e; + } + } + + tix = 0; + for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) { + + try { + (*thread)->start(); + } catch(SystemResourceException& e) { + std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl; + throw e; + } + } + + { + Synchronized s(*monitor); + while (*activeCount > 0) { + monitor->wait(1000); + } + } + + for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + threads.erase(*thread); } - } - for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { - threads.erase(*thread); + std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl; } std::cout << "\t\t\tSuccess!" << std::endl;