| /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements. See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership. The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License. You may obtain a copy of the License at | 
 |  * | 
 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, | 
 |  * software distributed under the License is distributed on an | 
 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 |  * KIND, either express or implied. See the License for the | 
 |  * specific language governing permissions and limitations | 
 |  * under the License. | 
 |  */ | 
 |  | 
 | #include "TimerManager.h" | 
 | #include "Exception.h" | 
 | #include "Util.h" | 
 |  | 
 | #include <assert.h> | 
 | #include <iostream> | 
 | #include <set> | 
 |  | 
 | namespace apache { namespace thrift { namespace concurrency { | 
 |  | 
 | using boost::shared_ptr; | 
 |  | 
 | /** | 
 |  * TimerManager class | 
 |  * | 
 |  * @version $Id:$ | 
 |  */ | 
 | class TimerManager::Task : public Runnable { | 
 |  | 
 |  public: | 
 |   enum STATE { | 
 |     WAITING, | 
 |     EXECUTING, | 
 |     CANCELLED, | 
 |     COMPLETE | 
 |   }; | 
 |  | 
 |   Task(shared_ptr<Runnable> runnable) : | 
 |     runnable_(runnable), | 
 |     state_(WAITING) {} | 
 |  | 
 |   ~Task() { | 
 |   } | 
 |  | 
 |   void run() { | 
 |     if (state_ == EXECUTING) { | 
 |       runnable_->run(); | 
 |       state_ = COMPLETE; | 
 |     } | 
 |   } | 
 |  | 
 |  private: | 
 |   shared_ptr<Runnable> runnable_; | 
 |   friend class TimerManager::Dispatcher; | 
 |   STATE state_; | 
 | }; | 
 |  | 
 | class TimerManager::Dispatcher: public Runnable { | 
 |  | 
 |  public: | 
 |   Dispatcher(TimerManager* manager) : | 
 |     manager_(manager) {} | 
 |  | 
 |   ~Dispatcher() {} | 
 |  | 
 |   /** | 
 |    * Dispatcher entry point | 
 |    * | 
 |    * As long as dispatcher thread is running, pull tasks off the task taskMap_ | 
 |    * and execute. | 
 |    */ | 
 |   void run() { | 
 |     { | 
 |       Synchronized s(manager_->monitor_); | 
 |       if (manager_->state_ == TimerManager::STARTING) { | 
 |         manager_->state_ = TimerManager::STARTED; | 
 |         manager_->monitor_.notifyAll(); | 
 |       } | 
 |     } | 
 |  | 
 |     do { | 
 |       std::set<shared_ptr<TimerManager::Task> > expiredTasks; | 
 |       { | 
 |         Synchronized s(manager_->monitor_); | 
 |         task_iterator expiredTaskEnd; | 
 |         int64_t now = Util::currentTime(); | 
 |         while (manager_->state_ == TimerManager::STARTED && | 
 |                (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) { | 
 |           int64_t timeout = 0LL; | 
 |           if (!manager_->taskMap_.empty()) { | 
 |             timeout = manager_->taskMap_.begin()->first - now; | 
 |           } | 
 |           assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0)); | 
 |           try { | 
 |             manager_->monitor_.wait(timeout); | 
 |           } catch (TimedOutException &e) {} | 
 |           now = Util::currentTime(); | 
 |         } | 
 |  | 
 |         if (manager_->state_ == TimerManager::STARTED) { | 
 |           for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { | 
 |             shared_ptr<TimerManager::Task> task = ix->second; | 
 |             expiredTasks.insert(task); | 
 |             if (task->state_ == TimerManager::Task::WAITING) { | 
 |               task->state_ = TimerManager::Task::EXECUTING; | 
 |             } | 
 |             manager_->taskCount_--; | 
 |           } | 
 |           manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); | 
 |         } | 
 |       } | 
 |  | 
 |       for (std::set<shared_ptr<Task> >::iterator ix =  expiredTasks.begin(); ix != expiredTasks.end(); ix++) { | 
 |         (*ix)->run(); | 
 |       } | 
 |  | 
 |     } while (manager_->state_ == TimerManager::STARTED); | 
 |  | 
 |     { | 
 |       Synchronized s(manager_->monitor_); | 
 |       if (manager_->state_ == TimerManager::STOPPING) { | 
 |         manager_->state_ = TimerManager::STOPPED; | 
 |         manager_->monitor_.notify(); | 
 |       } | 
 |     } | 
 |     return; | 
 |   } | 
 |  | 
 |  private: | 
 |   TimerManager* manager_; | 
 |   friend class TimerManager; | 
 | }; | 
 |  | 
 | TimerManager::TimerManager() : | 
 |   taskCount_(0), | 
 |   state_(TimerManager::UNINITIALIZED), | 
 |   dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) { | 
 | } | 
 |  | 
 |  | 
 | TimerManager::~TimerManager() { | 
 |  | 
 |   // If we haven't been explicitly stopped, do so now.  We don't need to grab | 
 |   // the monitor here, since stop already takes care of reentrancy. | 
 |  | 
 |   if (state_ != STOPPED) { | 
 |     try { | 
 |       stop(); | 
 |     } catch(...) { | 
 |       throw; | 
 |       // uhoh | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | void TimerManager::start() { | 
 |   bool doStart = false; | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     if (threadFactory_ == NULL) { | 
 |       throw InvalidArgumentException(); | 
 |     } | 
 |     if (state_ == TimerManager::UNINITIALIZED) { | 
 |       state_ = TimerManager::STARTING; | 
 |       doStart = true; | 
 |     } | 
 |   } | 
 |  | 
 |   if (doStart) { | 
 |     dispatcherThread_ = threadFactory_->newThread(dispatcher_); | 
 |     dispatcherThread_->start(); | 
 |   } | 
 |  | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     while (state_ == TimerManager::STARTING) { | 
 |       monitor_.wait(); | 
 |     } | 
 |     assert(state_ != TimerManager::STARTING); | 
 |   } | 
 | } | 
 |  | 
 | void TimerManager::stop() { | 
 |   bool doStop = false; | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     if (state_ == TimerManager::UNINITIALIZED) { | 
 |       state_ = TimerManager::STOPPED; | 
 |     } else if (state_ != STOPPING &&  state_ != STOPPED) { | 
 |       doStop = true; | 
 |       state_ = STOPPING; | 
 |       monitor_.notifyAll(); | 
 |     } | 
 |     while (state_ != STOPPED) { | 
 |       monitor_.wait(); | 
 |     } | 
 |   } | 
 |  | 
 |   if (doStop) { | 
 |     // Clean up any outstanding tasks | 
 |     taskMap_.clear(); | 
 |  | 
 |     // Remove dispatcher's reference to us. | 
 |     dispatcher_->manager_ = NULL; | 
 |   } | 
 | } | 
 |  | 
 | shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { | 
 |   Synchronized s(monitor_); | 
 |   return threadFactory_; | 
 | } | 
 |  | 
 | void TimerManager::threadFactory(shared_ptr<const ThreadFactory>  value) { | 
 |   Synchronized s(monitor_); | 
 |   threadFactory_ = value; | 
 | } | 
 |  | 
 | size_t TimerManager::taskCount() const { | 
 |   return taskCount_; | 
 | } | 
 |  | 
 | void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) { | 
 |   int64_t now = Util::currentTime(); | 
 |   timeout += now; | 
 |  | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     if (state_ != TimerManager::STARTED) { | 
 |       throw IllegalStateException(); | 
 |     } | 
 |  | 
 |     // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him | 
 |     // if the expiration time is shorter than the current value. Need to test before we insert, | 
 |     // because the new task might insert at the front. | 
 |     bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first; | 
 |  | 
 |     taskCount_++; | 
 |     taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task)))); | 
 |  | 
 |     // If the task map was empty, or if we have an expiration that is earlier | 
 |     // than any previously seen, kick the dispatcher so it can update its | 
 |     // timeout | 
 |     if (notifyRequired) { | 
 |       monitor_.notify(); | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) { | 
 |  | 
 |   int64_t expiration; | 
 |   Util::toMilliseconds(expiration, value); | 
 |  | 
 |   int64_t now = Util::currentTime(); | 
 |  | 
 |   if (expiration < now) { | 
 |     throw  InvalidArgumentException(); | 
 |   } | 
 |  | 
 |   add(task, expiration - now); | 
 | } | 
 |  | 
 |  | 
 | void TimerManager::remove(shared_ptr<Runnable> task) { | 
 |   (void) task; | 
 |   Synchronized s(monitor_); | 
 |   if (state_ != TimerManager::STARTED) { | 
 |     throw IllegalStateException(); | 
 |   } | 
 | } | 
 |  | 
 | TimerManager::STATE TimerManager::state() const { return state_; } | 
 |  | 
 | }}} // apache::thrift::concurrency | 
 |  |