From: Marc Slemko Date: Tue, 5 Jun 2007 22:20:14 +0000 (+0000) Subject: Modified PosixThreadFactory X-Git-Tag: 0.2.0~1342 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=a647903dd2edaa8cf846fe9dac4df82cf74740a0;p=common%2Fthrift.git Modified PosixThreadFactory Added explicit detached getter and setter Modified PosixThreadFactory::~PThread: Check for join failing and don't transition to detached_ state if it does. Potential thread-handle leak for threads created joinable who aren't referenced by any external thread. Solution for now has to be "DONT DO THAT", the clever approach doesn't always work. Added ThreadFactoryTests.floodNThreads: Loop M times for N threads where M x N is bigger than 32K to verify that detached threads can be created ad infinitum. Reviewed By: mcslee Revert Plan: revertible Test Plan: concurrency_test thread-factory passes git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665130 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index 2e8ed474..73aba616 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -63,13 +63,13 @@ class PthreadThread: public Thread { ~PthreadThread() { /* Nothing references this thread, if is is not detached, do a join - now, otherwise the thread-id and, possibly, other resources will + now, otherwise the thread-id and, possibly, other resources will be leaked. */ if(!detached_) { try { join(); } catch(...) { - // We're really hosed. + // We're really hosed. } } } @@ -84,9 +84,9 @@ class PthreadThread: public Thread { throw SystemResourceException("pthread_attr_init failed"); } - if(pthread_attr_setdetachstate(&thread_attr, - detached_ ? - PTHREAD_CREATE_DETACHED : + if(pthread_attr_setdetachstate(&thread_attr, + detached_ ? + PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE) != 0) { throw SystemResourceException("pthread_attr_setdetachstate failed"); } @@ -123,12 +123,18 @@ class PthreadThread: public Thread { void join() { if (!detached_ && state_ != uninitialized) { void* ignore; - pthread_join(pthread_, &ignore); - detached_ = true; + /* XXX + If join fails it is most likely due to the fact + that the last reference was the thread itself and cannot + join. This results in leaked threads and will eventually + cause the process to run out of thread resources. + We're beyond the point of throwing an exception. Not clear how + best to handle this. */ + detached_ = pthread_join(pthread_, &ignore) == 0; } } - id_t id() { + id_t getId() { return static_cast(pthread_); } @@ -234,13 +240,11 @@ class PosixThreadFactory::Impl { return result; } - int stackSize() const { return stackSize_; } + int getStackSize() const { return stackSize_; } - void stackSize(int value) { stackSize_ = value; } + void setStackSize(int value) { stackSize_ = value; } - PRIORITY priority() const { return priority_; } - - Thread::id_t currentThreadId() const {return static_cast(pthread_self());} + PRIORITY getPriority() const { return priority_; } /** * Sets priority. @@ -248,7 +252,14 @@ class PosixThreadFactory::Impl { * XXX * Need to handle incremental priorities properly. */ - void priority(PRIORITY value) { priority_ = value; } + void setPriority(PRIORITY value) { priority_ = value; } + + bool isDetached() const { return detached_; } + + void setDetached(bool value) { detached_ = value; } + + Thread::id_t getCurrentThreadId() const {return static_cast(pthread_self());} + }; PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : @@ -256,14 +267,18 @@ PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int sta shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const { return impl_->newThread(runnable); } -int PosixThreadFactory::stackSize() const { return impl_->stackSize(); } +int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); } + +void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); } + +PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); } -void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); } +void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); } -PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); } +bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); } -void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); } +void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); } -Thread::id_t PosixThreadFactory::currentThreadId() const { return impl_->currentThreadId(); } +Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index 16be14da..894d5f5c 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -11,10 +11,10 @@ #include -namespace facebook { namespace thrift { namespace concurrency { +namespace facebook { namespace thrift { namespace concurrency { /** - * A thread factory to create posix threads + * A thread factory to create posix threads * * @author marc * @version $Id:$ @@ -52,20 +52,20 @@ class PosixThreadFactory : public ThreadFactory { DECREMENT = 8 }; - /** - * Posix thread (pthread) factory. All threads created by a factory are reference-counted - * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + /** + * Posix thread (pthread) factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and * the Runnable tasks they host will be properly cleaned up once the last strong reference * to both is given up. * * Threads are created with the specified policy, priority, stack-size and detachable-mode - * detached means the thread is free-running and will release all system resources the - * when it completes. A detachable thread is not joinable. The join method - * of a detachable thread will return immediately with no error. + * detached means the thread is free-running and will release all system resources the + * when it completes. A detachable thread is not joinable. The join method + * of a detachable thread will return immediately with no error. * * Joinable threads will detach themselves iff they were not explicitly joined and * there are no remaining strong references to the thread. This guarantees that - * joinnable threads don't leak resources even when the application neglects to + * joinnable threads don't leak resources even when the application neglects to * call join explicitly. * * By default threads are joinable. @@ -77,32 +77,42 @@ class PosixThreadFactory : public ThreadFactory { boost::shared_ptr newThread(boost::shared_ptr runnable) const; // From ThreadFactory; - Thread::id_t currentThreadId() const; + Thread::id_t getCurrentThreadId() const; + + /** + * Gets stack size for created threads + * + * @return int size in megabytes + */ + virtual int getStackSize() const; /** * Sets stack size for created threads * * @param value size in megabytes */ - virtual void stackSize(int value); + virtual void setStackSize(int value); /** - * Gets stack size for created threads - * - * @return int size in megabytes + * Gets priority relative to current policy */ - virtual int stackSize() const; + virtual PRIORITY getPriority() const; /** * Sets priority relative to current policy */ - virtual void priority(PRIORITY priority); + virtual void setPriority(PRIORITY priority); /** - * Gets priority relative to current policy + * Sets detached mode of threads */ - virtual PRIORITY priority() const; - + virtual void setDetached(bool detached); + + /** + * Gets current detached mode + */ + virtual bool isDetached() const; + private: class Impl; boost::shared_ptr impl_; diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index 3e5964fb..39ee8167 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -75,7 +75,7 @@ class Thread { /** * Gets the thread's platform-specific ID */ - virtual id_t id() = 0; + virtual id_t getId() = 0; /** * Gets the runnable object this thread is hosting @@ -104,7 +104,7 @@ class ThreadFactory { static const Thread::id_t unknown_thread_id; - virtual Thread::id_t currentThreadId() const = 0; + virtual Thread::id_t getCurrentThreadId() const = 0; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp index 56de4d00..3d87724c 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/concurrency/ThreadManager.cpp @@ -308,7 +308,7 @@ class ThreadManager::Worker: public Runnable { shared_ptr worker = dynamic_pointer_cast((*ix)->runnable()); worker->state_ = ThreadManager::Worker::STARTING; (*ix)->start(); - idMap_.insert(std::pair >((*ix)->id(), *ix)); + idMap_.insert(std::pair >((*ix)->getId(), *ix)); } { @@ -400,7 +400,7 @@ void ThreadManager::Impl::removeWorker(size_t value) { for (std::set >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) { workers_.erase(*ix); - idMap_.erase((*ix)->id()); + idMap_.erase((*ix)->getId()); } deadWorkers_.clear(); @@ -408,7 +408,7 @@ void ThreadManager::Impl::removeWorker(size_t value) { } bool ThreadManager::Impl::canSleep() { - const Thread::id_t id = threadFactory_->currentThreadId(); + const Thread::id_t id = threadFactory_->getCurrentThreadId(); return idMap_.find(id) == idMap_.end(); } diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp index 5fe46513..a1604726 100644 --- a/lib/cpp/src/concurrency/test/Tests.cpp +++ b/lib/cpp/src/concurrency/test/Tests.cpp @@ -33,11 +33,17 @@ int main(int argc, char** argv) { std::cout << "ThreadFactory tests..." << std::endl; size_t count = 1000; + size_t floodLoops = 1; + size_t floodCount = 100000; std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl; assert(threadFactoryTests.reapNThreads(count)); + std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl; + + assert(threadFactoryTests.floodNTest(floodLoops, floodCount)); + std::cout << "\t\tThreadFactory synchronous start test" << std::endl; assert(threadFactoryTests.synchStartTest()); @@ -134,4 +140,3 @@ int main(int argc, char** argv) { } } } - diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h index 4cd6bd52..99bc94ec 100644 --- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -112,7 +112,7 @@ public: throw e; } } - + tix = 0; for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) { @@ -123,14 +123,14 @@ public: throw e; } } - + { Synchronized s(*monitor); while (*activeCount > 0) { monitor->wait(1000); } } - + for (std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { threads.erase(*thread); } @@ -276,6 +276,67 @@ public: return success; } + + + class FloodTask : public Runnable { + public: + + FloodTask(const size_t id) :_id(id) {} + ~FloodTask(){ + if(_id % 1000 == 0) { + std::cout << "\t\tthread " << _id << " done" << std::endl; + } + } + + void run(){ + if(_id % 1000 == 0) { + std::cout << "\t\tthread " << _id << " started" << std::endl; + } + + usleep(1); + } + const size_t _id; + }; + + void foo(PosixThreadFactory *tf) { + } + + bool floodNTest(size_t loop=1, size_t count=100000) { + + bool success = false; + + for(size_t lix = 0; lix < loop; lix++) { + + PosixThreadFactory threadFactory = PosixThreadFactory(); + threadFactory.setDetached(true); + + for(size_t tix = 0; tix < count; tix++) { + + try { + + shared_ptr task(new FloodTask(lix * count + tix )); + + shared_ptr thread = threadFactory.newThread(task); + + thread->start(); + + usleep(1); + + } catch (TException& e) { + + std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl; + + return success; + } + } + + std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl; + + success = true; + } + + return success; + } }; const double ThreadFactoryTests::ERROR = .20; diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h index 5f518cae..a8fdcdac 100644 --- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -100,7 +100,7 @@ public: shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); - threadFactory->priority(PosixThreadFactory::HIGHEST); + threadFactory->setPriority(PosixThreadFactory::HIGHEST); threadManager->threadFactory(threadFactory); @@ -239,7 +239,7 @@ public: shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); - threadFactory->priority(PosixThreadFactory::HIGHEST); + threadFactory->setPriority(PosixThreadFactory::HIGHEST); threadManager->threadFactory(threadFactory);