| /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements. See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership. The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License. You may obtain a copy of the License at | 
 |  * | 
 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, | 
 |  * software distributed under the License is distributed on an | 
 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 |  * KIND, either express or implied. See the License for the | 
 |  * specific language governing permissions and limitations | 
 |  * under the License. | 
 |  */ | 
 |  | 
 | #ifdef HAVE_CONFIG_H | 
 | #include <config.h> | 
 | #endif | 
 | #include "Monitor.h" | 
 | #include "Exception.h" | 
 | #include "Util.h" | 
 |  | 
 | #include <assert.h> | 
 | #include <errno.h> | 
 |  | 
 | #include <boost/scoped_ptr.hpp> | 
 | #include <boost/thread.hpp> | 
 | #include <boost/date_time/posix_time/posix_time.hpp> | 
 |  | 
 | namespace apache { namespace thrift { namespace concurrency { | 
 |  | 
 | /** | 
 |  * Monitor implementation using the boost thread library | 
 |  * | 
 |  * @version $Id:$ | 
 |  */ | 
 | class Monitor::Impl : public boost::condition_variable_any { | 
 |  | 
 |  public: | 
 |  | 
 |   Impl() | 
 |      : ownedMutex_(new Mutex()), | 
 |        mutex_(NULL) { | 
 |     init(ownedMutex_.get()); | 
 |   } | 
 |  | 
 |   Impl(Mutex* mutex) | 
 |      : mutex_(NULL) { | 
 |     init(mutex); | 
 |   } | 
 |  | 
 |   Impl(Monitor* monitor) | 
 |      : mutex_(NULL) { | 
 |     init(&(monitor->mutex())); | 
 |   } | 
 |  | 
 |   Mutex& mutex() { return *mutex_; } | 
 |   void lock() { mutex().lock(); } | 
 |   void unlock() { mutex().unlock(); } | 
 |  | 
 |   /** | 
 |    * Exception-throwing version of waitForTimeRelative(), called simply | 
 |    * wait(int64) for historical reasons.  Timeout is in milliseconds. | 
 |    * | 
 |    * If the condition occurs,  this function returns cleanly; on timeout or | 
 |    * error an exception is thrown. | 
 |    */ | 
 |   void wait(int64_t timeout_ms) { | 
 |     int result = waitForTimeRelative(timeout_ms); | 
 |     if (result == ETIMEDOUT) { | 
 |       throw TimedOutException(); | 
 |     } else if (result != 0) { | 
 |       throw TException( | 
 |         "Monitor::wait() failed"); | 
 |     } | 
 |   } | 
 |  | 
 |   /** | 
 |    * Waits until the specified timeout in milliseconds for the condition to | 
 |    * occur, or waits forever if timeout_ms == 0. | 
 |    * | 
 |    * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. | 
 |    */ | 
 |   int waitForTimeRelative(int64_t timeout_ms) { | 
 |     if (timeout_ms == 0LL) { | 
 |       return waitForever(); | 
 |     } | 
 |  | 
 |     assert(mutex_); | 
 | 	boost::timed_mutex* mutexImpl = | 
 |       reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); | 
 |     assert(mutexImpl); | 
 |  | 
 | 	boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); | 
 | 	int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT; | 
 | 	lock.release(); | 
 | 	return res; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Waits until the absolute time specified using struct timespec. | 
 |    * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. | 
 |    */ | 
 |   int waitForTime(const timespec* abstime) { | 
 |     assert(mutex_); | 
 |     boost::timed_mutex* mutexImpl = | 
 |       reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); | 
 |     assert(mutexImpl); | 
 |  | 
 |     struct timespec currenttime; | 
 |     Util::toTimespec(currenttime, Util::currentTime()); | 
 |  | 
 | 	long tv_sec = abstime->tv_sec - currenttime.tv_sec; | 
 | 	long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec; | 
 | 	if(tv_sec < 0) | 
 | 		tv_sec = 0; | 
 | 	if(tv_nsec < 0) | 
 | 		tv_nsec = 0; | 
 |  | 
 | 	boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); | 
 | 	int res = timed_wait(lock, boost::get_system_time() + | 
 | 		boost::posix_time::seconds(tv_sec) + | 
 | 		boost::posix_time::microseconds(tv_nsec / 1000) | 
 | 		) ? 0 : ETIMEDOUT; | 
 | 	lock.release(); | 
 | 	return res; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Waits forever until the condition occurs. | 
 |    * Returns 0 if condition occurs, or an error code otherwise. | 
 |    */ | 
 |   int waitForever() { | 
 |     assert(mutex_); | 
 |     boost::timed_mutex* mutexImpl = | 
 |       reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); | 
 |     assert(mutexImpl); | 
 |  | 
 | 	boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); | 
 | 	((boost::condition_variable_any*)this)->wait(lock); | 
 | 	lock.release(); | 
 |     return 0; | 
 |   } | 
 |  | 
 |  | 
 |   void notify() { | 
 | 	  notify_one(); | 
 |   } | 
 |  | 
 |   void notifyAll() { | 
 | 	  notify_all(); | 
 |   } | 
 |  | 
 |  private: | 
 |  | 
 |   void init(Mutex* mutex) { | 
 |     mutex_ = mutex; | 
 |   } | 
 |  | 
 |   boost::scoped_ptr<Mutex> ownedMutex_; | 
 |   Mutex* mutex_; | 
 | }; | 
 |  | 
 | Monitor::Monitor() : impl_(new Monitor::Impl()) {} | 
 | Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {} | 
 | Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {} | 
 |  | 
 | Monitor::~Monitor() { delete impl_; } | 
 |  | 
 | Mutex& Monitor::mutex() const { return const_cast<Monitor::Impl*>(impl_)->mutex(); } | 
 |  | 
 | void Monitor::lock() const { const_cast<Monitor::Impl*>(impl_)->lock(); } | 
 |  | 
 | void Monitor::unlock() const { const_cast<Monitor::Impl*>(impl_)->unlock(); } | 
 |  | 
 | void Monitor::wait(int64_t timeout) const { const_cast<Monitor::Impl*>(impl_)->wait(timeout); } | 
 |  | 
 | int Monitor::waitForTime(const timespec* abstime) const { | 
 |   return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); | 
 | } | 
 |  | 
 | int Monitor::waitForTimeRelative(int64_t timeout_ms) const { | 
 |   return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms); | 
 | } | 
 |  | 
 | int Monitor::waitForever() const { | 
 |   return const_cast<Monitor::Impl*>(impl_)->waitForever(); | 
 | } | 
 |  | 
 | void Monitor::notify() const { const_cast<Monitor::Impl*>(impl_)->notify(); } | 
 |  | 
 | void Monitor::notifyAll() const { const_cast<Monitor::Impl*>(impl_)->notifyAll(); } | 
 |  | 
 | }}} // apache::thrift::concurrency |