From: Roger Meier Date: Sun, 11 Dec 2011 21:05:35 +0000 (+0000) Subject: THRIFT-1272 add subclass of ReadWriteMutex that avoids writer X-Git-Tag: 0.9.1~495 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=3fc481961778b1ec5faebd2b42d9a15d8b875e35;p=common%2Fthrift.git THRIFT-1272 add subclass of ReadWriteMutex that avoids writer starvation Patch: Dave Watson git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1213067 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp index 332d415b..4f3f3232 100644 --- a/lib/cpp/src/concurrency/Mutex.cpp +++ b/lib/cpp/src/concurrency/Mutex.cpp @@ -270,9 +270,9 @@ public: PROFILE_MUTEX_LOCKED(); } - bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); } + bool attemptRead() const { return !pthread_rwlock_tryrdlock(&rw_lock_); } - bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); } + bool attemptWrite() const { return !pthread_rwlock_trywrlock(&rw_lock_); } void release() const { PROFILE_MUTEX_START_UNLOCK(); @@ -300,5 +300,35 @@ bool ReadWriteMutex::attemptWrite() const { return impl_->attemptWrite(); } void ReadWriteMutex::release() const { impl_->release(); } +NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) {} + +void NoStarveReadWriteMutex::acquireRead() const +{ + if (writerWaiting_) { + // writer is waiting, block on the writer's mutex until he's done with it + mutex_.lock(); + mutex_.unlock(); + } + + ReadWriteMutex::acquireRead(); +} + +void NoStarveReadWriteMutex::acquireWrite() const +{ + // if we can acquire the rwlock the easy way, we're done + if (attemptWrite()) { + return; + } + + // failed to get the rwlock, do it the hard way: + // locking the mutex and setting writerWaiting will cause all new readers to + // block on the mutex rather than on the rwlock. + mutex_.lock(); + writerWaiting_ = true; + ReadWriteMutex::acquireWrite(); + writerWaiting_ = false; + mutex_.unlock(); +} + }}} // apache::thrift::concurrency diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h index 847aaf69..1763b5c6 100644 --- a/lib/cpp/src/concurrency/Mutex.h +++ b/lib/cpp/src/concurrency/Mutex.h @@ -101,6 +101,25 @@ private: boost::shared_ptr impl_; }; +/** + * A ReadWriteMutex that guarantees writers will not be starved by readers: + * When a writer attempts to acquire the mutex, all new readers will be + * blocked from acquiring the mutex until the writer has acquired and + * released it. In some operating systems, this may already be guaranteed + * by a regular ReadWriteMutex. + */ +class NoStarveReadWriteMutex : public ReadWriteMutex { +public: + NoStarveReadWriteMutex(); + + virtual void acquireRead() const; + virtual void acquireWrite() const; + +private: + Mutex mutex_; + mutable volatile bool writerWaiting_; +}; + class Guard : boost::noncopyable { public: Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) { diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index 0c6d2654..1626c6e9 100644 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -65,7 +65,8 @@ TESTS = \ UnitTests_SOURCES = \ UnitTestMain.cpp \ TMemoryBufferTest.cpp \ - TBufferBaseTest.cpp + TBufferBaseTest.cpp \ + RWMutexStarveTest.cpp UnitTests_LDADD = \ libtestgencpp.la \ diff --git a/lib/cpp/test/RWMutexStarveTest.cpp b/lib/cpp/test/RWMutexStarveTest.cpp new file mode 100644 index 00000000..ca04e6e2 --- /dev/null +++ b/lib/cpp/test/RWMutexStarveTest.cpp @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include +#include + +#include "concurrency/Mutex.h" +#include "concurrency/PosixThreadFactory.h" + +using boost::shared_ptr; +using boost::unit_test::test_suite; +using boost::unit_test::framework::master_test_suite; + +using namespace apache::thrift::concurrency; +using namespace std; + +class Locker : public Runnable +{ +protected: + Locker(shared_ptr rwlock, bool writer) : + rwlock_(rwlock), writer_(writer), + started_(false), gotLock_(false), signaled_(false) { } + +public: + virtual void run() + { + started_ = true; + if (writer_) { + rwlock_->acquireWrite(); + } else { + rwlock_->acquireRead(); + } + gotLock_ = true; + while (!signaled_) { + usleep(5000); + } + rwlock_->release(); + } + + bool started() const { return started_; } + bool gotLock() const { return gotLock_; } + void signal() { signaled_ = true; } + +protected: + shared_ptr rwlock_; + bool writer_; + volatile bool started_; + volatile bool gotLock_; + volatile bool signaled_; +}; + +class Reader : public Locker +{ +public: + Reader(shared_ptr rwlock) : Locker(rwlock, false) { } +}; + +class Writer : public Locker +{ +public: + Writer(shared_ptr rwlock) : Locker(rwlock, true) { } +}; + +void test_starve(PosixThreadFactory::POLICY policy) +{ + // the man pages for pthread_wrlock_rdlock suggest that any OS guarantee about + // writer starvation may be influenced by the scheduling policy, so let's try + // all 3 policies to see if any of them work. + PosixThreadFactory factory(policy); + factory.setDetached(false); + + shared_ptr rwlock(new NoStarveReadWriteMutex()); + + shared_ptr reader1(new Reader(rwlock)); + shared_ptr reader2(new Reader(rwlock)); + shared_ptr writer(new Writer(rwlock)); + + shared_ptr treader1 = factory.newThread(reader1); + shared_ptr treader2 = factory.newThread(reader2); + shared_ptr twriter = factory.newThread(writer); + + // launch a reader and make sure he has the lock + treader1->start(); + while (!reader1->gotLock()) { + usleep(2000); + } + + // launch a writer and make sure he's blocked on the lock + twriter->start(); + while (!writer->started()) { + usleep(2000); + } + // tricky part... we can never be 100% sure that the writer is actually + // blocked on the lock, but we can pretty reasonably sure because we know + // he just executed the line immediately before getting the lock, and + // we'll wait a full second for him to get on it. + sleep(1); + + // launch a second reader... if the RWMutex guarantees that writers won't + // starve, this reader should not be able to acquire the lock until the writer + // has acquired and released it. + treader2->start(); + while (!reader2->started()) { + usleep(2000); + } + // again... can't be 100% sure the reader is waiting on (or has) the lock + // but we can be close. + sleep(1); + + // tell reader 1 to let go of the lock + reader1->signal(); + + // wait for someone to get the lock + while (!reader2->gotLock() && !writer->gotLock()) { + usleep(2000); + } + + // the test succeeded if the WRITER got the lock. + bool success = writer->gotLock(); + + // tell everyone we're done and wait for them to finish + reader2->signal(); + writer->signal(); + treader1->join(); + treader2->join(); + twriter->join(); + + // make sure it worked. + BOOST_CHECK_MESSAGE(success, "writer is starving"); +} + +BOOST_AUTO_TEST_SUITE( RWMutexStarveTest ) + +BOOST_AUTO_TEST_CASE( test_starve_other ) +{ + test_starve(PosixThreadFactory::OTHER); +} + +BOOST_AUTO_TEST_CASE( test_starve_rr ) +{ + test_starve(PosixThreadFactory::ROUND_ROBIN); +} + +BOOST_AUTO_TEST_CASE( test_starve_fifo ) +{ + test_starve(PosixThreadFactory::FIFO); +} + +BOOST_AUTO_TEST_SUITE_END()