From b9db49c6776bde3b06ce8a5ceb32c873f8de4592 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 9 Mar 2010 05:19:30 +0000 Subject: [PATCH] cpp: Let Monitors share Mutex instances - Let Monitor objects share a Mutex() instance so that more than one condition can be implemented on top of a single mutex protecting an important data structure. - Make Mutex and Monitor noncopyable - Add an accessor to Mutex() so the underlying pthread_mutex_t* can be retrieved for passing to pthread_condwait - Change Monitor to use the actual Mutex class instead of creating a naked pthread_mutex_t on its own - Add new constructors to Monitor git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920666 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/concurrency/Monitor.cpp | 73 +++++++++++++++++++---------- lib/cpp/src/concurrency/Monitor.h | 38 +++++++++------ lib/cpp/src/concurrency/Mutex.cpp | 4 ++ lib/cpp/src/concurrency/Mutex.h | 2 + 4 files changed, 77 insertions(+), 40 deletions(-) diff --git a/lib/cpp/src/concurrency/Monitor.cpp b/lib/cpp/src/concurrency/Monitor.cpp index 2055caa9..2943ef70 100644 --- a/lib/cpp/src/concurrency/Monitor.cpp +++ b/lib/cpp/src/concurrency/Monitor.cpp @@ -21,6 +21,8 @@ #include "Exception.h" #include "Util.h" +#include + #include #include @@ -30,6 +32,8 @@ namespace apache { namespace thrift { namespace concurrency { +using boost::scoped_ptr; + /** * Monitor implementation using the POSIX pthread library * @@ -39,43 +43,48 @@ class Monitor::Impl { public: - Impl() : - mutexInitialized_(false), - condInitialized_(false) { - - if (pthread_mutex_init(&pthread_mutex_, NULL) == 0) { - mutexInitialized_ = true; + Impl() + : ownedMutex_(new Mutex()), + mutex_(NULL), + condInitialized_(false) { + init(ownedMutex_.get()); + } - if (pthread_cond_init(&pthread_cond_, NULL) == 0) { - condInitialized_ = true; - } - } + Impl(Mutex* mutex) + : mutex_(NULL), + condInitialized_(false) { + init(mutex); + } - if (!mutexInitialized_ || !condInitialized_) { - cleanup(); - throw SystemResourceException(); - } + Impl(Monitor* monitor) + : mutex_(NULL), + condInitialized_(false) { + init(&(monitor->mutex())); } ~Impl() { cleanup(); } - void lock() const { pthread_mutex_lock(&pthread_mutex_); } - - void unlock() const { pthread_mutex_unlock(&pthread_mutex_); } + Mutex& mutex() { return *mutex_; } + void lock() { mutex().lock(); } + void unlock() { mutex().unlock(); } void wait(int64_t timeout) const { + assert(mutex_); + pthread_mutex_t* mutexImpl = + reinterpret_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); // XXX Need to assert that caller owns mutex assert(timeout >= 0LL); if (timeout == 0LL) { - int iret = pthread_cond_wait(&pthread_cond_, &pthread_mutex_); + int iret = pthread_cond_wait(&pthread_cond_, mutexImpl); assert(iret == 0); } else { struct timespec abstime; int64_t now = Util::currentTime(); Util::toTimespec(abstime, now + timeout); int result = pthread_cond_timedwait(&pthread_cond_, - &pthread_mutex_, + mutexImpl, &abstime); if (result == ETIMEDOUT) { // pthread_cond_timedwait has been observed to return early on @@ -100,13 +109,20 @@ class Monitor::Impl { private: - void cleanup() { - if (mutexInitialized_) { - mutexInitialized_ = false; - int iret = pthread_mutex_destroy(&pthread_mutex_); - assert(iret == 0); + void init(Mutex* mutex) { + mutex_ = mutex; + + if (pthread_cond_init(&pthread_cond_, NULL) == 0) { + condInitialized_ = true; } + if (!condInitialized_) { + cleanup(); + throw SystemResourceException(); + } + } + + void cleanup() { if (condInitialized_) { condInitialized_ = false; int iret = pthread_cond_destroy(&pthread_cond_); @@ -114,16 +130,21 @@ class Monitor::Impl { } } - mutable pthread_mutex_t pthread_mutex_; - mutable bool mutexInitialized_; + scoped_ptr ownedMutex_; + Mutex* mutex_; + mutable pthread_cond_t pthread_cond_; mutable bool condInitialized_; }; Monitor::Monitor() : impl_(new Monitor::Impl()) {} +Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {} +Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {} Monitor::~Monitor() { delete impl_; } +Mutex& Monitor::mutex() const { return impl_->mutex(); } + void Monitor::lock() const { impl_->lock(); } void Monitor::unlock() const { impl_->unlock(); } diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h index 234bf326..f29119d3 100644 --- a/lib/cpp/src/concurrency/Monitor.h +++ b/lib/cpp/src/concurrency/Monitor.h @@ -21,6 +21,10 @@ #define _THRIFT_CONCURRENCY_MONITOR_H_ 1 #include "Exception.h" +#include "Mutex.h" + +#include + namespace apache { namespace thrift { namespace concurrency { @@ -29,7 +33,12 @@ namespace apache { namespace thrift { namespace concurrency { * notifying condition events requires that the caller own the mutex. Mutex * lock and unlock operations can be performed independently of condition * events. This is more or less analogous to java.lang.Object multi-thread - * operations + * operations. + * + * Note the Monitor can create a new, internal mutex; alternatively, a + * separate Mutex can be passed in and the Monitor will re-use it without + * taking ownership. It's the user's responsibility to make sure that the + * Mutex is not deallocated before the Monitor. * * Note that all methods are const. Monitors implement logical constness, not * bit constness. This allows const methods to call monitor methods without @@ -37,14 +46,22 @@ namespace apache { namespace thrift { namespace concurrency { * * @version $Id:$ */ -class Monitor { - +class Monitor : boost::noncopyable { public: - + /** Creates a new mutex, and takes ownership of it. */ Monitor(); + /** Uses the provided mutex without taking ownership. */ + explicit Monitor(Mutex* mutex); + + /** Uses the mutex inside the provided Monitor without taking ownership. */ + explicit Monitor(Monitor* monitor); + + /** Deallocates the mutex only if we own it. */ virtual ~Monitor(); + Mutex& mutex() const; + virtual void lock() const; virtual void unlock() const; @@ -64,18 +81,11 @@ class Monitor { class Synchronized { public: - - Synchronized(const Monitor& value) : - monitor_(value) { - monitor_.lock(); - } - - ~Synchronized() { - monitor_.unlock(); - } + Synchronized(const Monitor* monitor) : g(monitor->mutex()) { } + Synchronized(const Monitor& monitor) : g(monitor.mutex()) { } private: - const Monitor& monitor_; + Guard g; }; diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp index 045dbdfe..5d33c114 100644 --- a/lib/cpp/src/concurrency/Mutex.cpp +++ b/lib/cpp/src/concurrency/Mutex.cpp @@ -52,6 +52,8 @@ class Mutex::impl { void unlock() const { pthread_mutex_unlock(&pthread_mutex_); } + void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; } + private: mutable pthread_mutex_t pthread_mutex_; mutable bool initialized_; @@ -59,6 +61,8 @@ class Mutex::impl { Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {} +void* Mutex::getUnderlyingImpl() const { return impl_->getUnderlyingImpl(); } + void Mutex::lock() const { impl_->lock(); } bool Mutex::trylock() const { return impl_->trylock(); } diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h index 73c73e0e..f7ee9f25 100644 --- a/lib/cpp/src/concurrency/Mutex.h +++ b/lib/cpp/src/concurrency/Mutex.h @@ -39,6 +39,8 @@ class Mutex { virtual bool trylock() const; virtual void unlock() const; + void* getUnderlyingImpl() const; + static void DEFAULT_INITIALIZER(void*); static void ADAPTIVE_INITIALIZER(void*); static void RECURSIVE_INITIALIZER(void*); -- 2.17.1