| /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements. See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership. The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License. You may obtain a copy of the License at | 
 |  * | 
 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, | 
 |  * software distributed under the License is distributed on an | 
 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 |  * KIND, either express or implied. See the License for the | 
 |  * specific language governing permissions and limitations | 
 |  * under the License. | 
 |  */ | 
 |  | 
 | #include "ThreadManager.h" | 
 | #include "Exception.h" | 
 | #include "Monitor.h" | 
 | #include "Util.h" | 
 |  | 
 | #include <boost/shared_ptr.hpp> | 
 |  | 
 | #include <assert.h> | 
 | #include <queue> | 
 | #include <set> | 
 |  | 
 | #if defined(DEBUG) | 
 | #include <iostream> | 
 | #endif //defined(DEBUG) | 
 |  | 
 | namespace apache { namespace thrift { namespace concurrency { | 
 |  | 
 | using boost::shared_ptr; | 
 | using boost::dynamic_pointer_cast; | 
 |  | 
 | /** | 
 |  * ThreadManager class | 
 |  * | 
 |  * This class manages a pool of threads. It uses a ThreadFactory to create | 
 |  * threads.  It never actually creates or destroys worker threads, rather | 
 |  * it maintains statistics on number of idle threads, number of active threads, | 
 |  * task backlog, and average wait and service times. | 
 |  * | 
 |  * @version $Id:$ | 
 |  */ | 
 | class ThreadManager::Impl : public ThreadManager  { | 
 |  | 
 |  public: | 
 |   Impl() : | 
 |     workerCount_(0), | 
 |     workerMaxCount_(0), | 
 |     idleCount_(0), | 
 |     pendingTaskCountMax_(0), | 
 |     expiredCount_(0), | 
 |     state_(ThreadManager::UNINITIALIZED), | 
 |     monitor_(&mutex_), | 
 |     maxMonitor_(&mutex_) {} | 
 |  | 
 |   ~Impl() { stop(); } | 
 |  | 
 |   void start(); | 
 |  | 
 |   void stop() { stopImpl(false); } | 
 |  | 
 |   void join() { stopImpl(true); } | 
 |  | 
 |   ThreadManager::STATE state() const { | 
 |     return state_; | 
 |   } | 
 |  | 
 |   shared_ptr<ThreadFactory> threadFactory() const { | 
 |     Synchronized s(monitor_); | 
 |     return threadFactory_; | 
 |   } | 
 |  | 
 |   void threadFactory(shared_ptr<ThreadFactory> value) { | 
 |     Synchronized s(monitor_); | 
 |     threadFactory_ = value; | 
 |   } | 
 |  | 
 |   void addWorker(size_t value); | 
 |  | 
 |   void removeWorker(size_t value); | 
 |  | 
 |   size_t idleWorkerCount() const { | 
 |     return idleCount_; | 
 |   } | 
 |  | 
 |   size_t workerCount() const { | 
 |     Synchronized s(monitor_); | 
 |     return workerCount_; | 
 |   } | 
 |  | 
 |   size_t pendingTaskCount() const { | 
 |     Synchronized s(monitor_); | 
 |     return tasks_.size(); | 
 |   } | 
 |  | 
 |   size_t totalTaskCount() const { | 
 |     Synchronized s(monitor_); | 
 |     return tasks_.size() + workerCount_ - idleCount_; | 
 |   } | 
 |  | 
 |   size_t pendingTaskCountMax() const { | 
 |     Synchronized s(monitor_); | 
 |     return pendingTaskCountMax_; | 
 |   } | 
 |  | 
 |   size_t expiredTaskCount() { | 
 |     Synchronized s(monitor_); | 
 |     size_t result = expiredCount_; | 
 |     expiredCount_ = 0; | 
 |     return result; | 
 |   } | 
 |  | 
 |   void pendingTaskCountMax(const size_t value) { | 
 |     Synchronized s(monitor_); | 
 |     pendingTaskCountMax_ = value; | 
 |   } | 
 |  | 
 |   bool canSleep(); | 
 |  | 
 |   void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration); | 
 |  | 
 |   void remove(shared_ptr<Runnable> task); | 
 |  | 
 |   shared_ptr<Runnable> removeNextPending(); | 
 |  | 
 |   void removeExpiredTasks(); | 
 |  | 
 |   void setExpireCallback(ExpireCallback expireCallback); | 
 |  | 
 | private: | 
 |   void stopImpl(bool join); | 
 |  | 
 |   size_t workerCount_; | 
 |   size_t workerMaxCount_; | 
 |   size_t idleCount_; | 
 |   size_t pendingTaskCountMax_; | 
 |   size_t expiredCount_; | 
 |   ExpireCallback expireCallback_; | 
 |  | 
 |   ThreadManager::STATE state_; | 
 |   shared_ptr<ThreadFactory> threadFactory_; | 
 |  | 
 |  | 
 |   friend class ThreadManager::Task; | 
 |   std::queue<shared_ptr<Task> > tasks_; | 
 |   Mutex mutex_; | 
 |   Monitor monitor_; | 
 |   Monitor maxMonitor_; | 
 |   Monitor workerMonitor_; | 
 |  | 
 |   friend class ThreadManager::Worker; | 
 |   std::set<shared_ptr<Thread> > workers_; | 
 |   std::set<shared_ptr<Thread> > deadWorkers_; | 
 |   std::map<const Thread::id_t, shared_ptr<Thread> > idMap_; | 
 | }; | 
 |  | 
 | class ThreadManager::Task : public Runnable { | 
 |  | 
 |  public: | 
 |   enum STATE { | 
 |     WAITING, | 
 |     EXECUTING, | 
 |     CANCELLED, | 
 |     COMPLETE | 
 |   }; | 
 |  | 
 |   Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL)  : | 
 |     runnable_(runnable), | 
 |     state_(WAITING), | 
 |     expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {} | 
 |  | 
 |   ~Task() {} | 
 |  | 
 |   void run() { | 
 |     if (state_ == EXECUTING) { | 
 |       runnable_->run(); | 
 |       state_ = COMPLETE; | 
 |     } | 
 |   } | 
 |  | 
 |   shared_ptr<Runnable> getRunnable() { | 
 |     return runnable_; | 
 |   } | 
 |  | 
 |   int64_t getExpireTime() const { | 
 |     return expireTime_; | 
 |   } | 
 |  | 
 |  private: | 
 |   shared_ptr<Runnable> runnable_; | 
 |   friend class ThreadManager::Worker; | 
 |   STATE state_; | 
 |   int64_t expireTime_; | 
 | }; | 
 |  | 
 | class ThreadManager::Worker: public Runnable { | 
 |   enum STATE { | 
 |     UNINITIALIZED, | 
 |     STARTING, | 
 |     STARTED, | 
 |     STOPPING, | 
 |     STOPPED | 
 |   }; | 
 |  | 
 |  public: | 
 |   Worker(ThreadManager::Impl* manager) : | 
 |     manager_(manager), | 
 |     state_(UNINITIALIZED), | 
 |     idle_(false) {} | 
 |  | 
 |   ~Worker() {} | 
 |  | 
 |  private: | 
 |   bool isActive() const { | 
 |     return | 
 |       (manager_->workerCount_ <= manager_->workerMaxCount_) || | 
 |       (manager_->state_ == JOINING && !manager_->tasks_.empty()); | 
 |   } | 
 |  | 
 |  public: | 
 |   /** | 
 |    * Worker entry point | 
 |    * | 
 |    * As long as worker thread is running, pull tasks off the task queue and | 
 |    * execute. | 
 |    */ | 
 |   void run() { | 
 |     bool active = false; | 
 |     bool notifyManager = false; | 
 |  | 
 |     /** | 
 |      * Increment worker semaphore and notify manager if worker count reached | 
 |      * desired max | 
 |      * | 
 |      * Note: We have to release the monitor and acquire the workerMonitor | 
 |      * since that is what the manager blocks on for worker add/remove | 
 |      */ | 
 |     { | 
 |       Synchronized s(manager_->monitor_); | 
 |       active = manager_->workerCount_ < manager_->workerMaxCount_; | 
 |       if (active) { | 
 |         manager_->workerCount_++; | 
 |         notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; | 
 |       } | 
 |     } | 
 |  | 
 |     if (notifyManager) { | 
 |       Synchronized s(manager_->workerMonitor_); | 
 |       manager_->workerMonitor_.notify(); | 
 |       notifyManager = false; | 
 |     } | 
 |  | 
 |     while (active) { | 
 |       shared_ptr<ThreadManager::Task> task; | 
 |  | 
 |       /** | 
 |        * While holding manager monitor block for non-empty task queue (Also | 
 |        * check that the thread hasn't been requested to stop). Once the queue | 
 |        * is non-empty, dequeue a task, release monitor, and execute. If the | 
 |        * worker max count has been decremented such that we exceed it, mark | 
 |        * ourself inactive, decrement the worker count and notify the manager | 
 |        * (technically we're notifying the next blocked thread but eventually | 
 |        * the manager will see it. | 
 |        */ | 
 |       { | 
 |         Guard g(manager_->mutex_); | 
 |         active = isActive(); | 
 |  | 
 |         while (active && manager_->tasks_.empty()) { | 
 |           manager_->idleCount_++; | 
 |           idle_ = true; | 
 |           manager_->monitor_.wait(); | 
 |           active = isActive(); | 
 |           idle_ = false; | 
 |           manager_->idleCount_--; | 
 |         } | 
 |  | 
 |         if (active) { | 
 |           manager_->removeExpiredTasks(); | 
 |  | 
 |           if (!manager_->tasks_.empty()) { | 
 |             task = manager_->tasks_.front(); | 
 |             manager_->tasks_.pop(); | 
 |             if (task->state_ == ThreadManager::Task::WAITING) { | 
 |               task->state_ = ThreadManager::Task::EXECUTING; | 
 |             } | 
 |  | 
 |             /* If we have a pending task max and we just dropped below it, wakeup any | 
 |                thread that might be blocked on add. */ | 
 |             if (manager_->pendingTaskCountMax_ != 0 && | 
 |                 manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) { | 
 |               manager_->maxMonitor_.notify(); | 
 |             } | 
 |           } | 
 |         } else { | 
 |           idle_ = true; | 
 |           manager_->workerCount_--; | 
 |           notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_); | 
 |         } | 
 |       } | 
 |  | 
 |       if (task != NULL) { | 
 |         if (task->state_ == ThreadManager::Task::EXECUTING) { | 
 |           try { | 
 |             task->run(); | 
 |           } catch(...) { | 
 |             // XXX need to log this | 
 |           } | 
 |         } | 
 |       } | 
 |     } | 
 |  | 
 |     { | 
 |       Synchronized s(manager_->workerMonitor_); | 
 |       manager_->deadWorkers_.insert(this->thread()); | 
 |       if (notifyManager) { | 
 |         manager_->workerMonitor_.notify(); | 
 |       } | 
 |     } | 
 |  | 
 |     return; | 
 |   } | 
 |  | 
 |   private: | 
 |     ThreadManager::Impl* manager_; | 
 |     friend class ThreadManager::Impl; | 
 |     STATE state_; | 
 |     bool idle_; | 
 | }; | 
 |  | 
 |  | 
 |   void ThreadManager::Impl::addWorker(size_t value) { | 
 |   std::set<shared_ptr<Thread> > newThreads; | 
 |   for (size_t ix = 0; ix < value; ix++) { | 
 |     shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this)); | 
 |     newThreads.insert(threadFactory_->newThread(worker)); | 
 |   } | 
 |  | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     workerMaxCount_ += value; | 
 |     workers_.insert(newThreads.begin(), newThreads.end()); | 
 |   } | 
 |  | 
 |   for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { | 
 |     shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable()); | 
 |     worker->state_ = ThreadManager::Worker::STARTING; | 
 |     (*ix)->start(); | 
 |     idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix)); | 
 |   } | 
 |  | 
 |   { | 
 |     Synchronized s(workerMonitor_); | 
 |     while (workerCount_ != workerMaxCount_) { | 
 |       workerMonitor_.wait(); | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | void ThreadManager::Impl::start() { | 
 |  | 
 |   if (state_ == ThreadManager::STOPPED) { | 
 |     return; | 
 |   } | 
 |  | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     if (state_ == ThreadManager::UNINITIALIZED) { | 
 |       if (threadFactory_ == NULL) { | 
 |         throw InvalidArgumentException(); | 
 |       } | 
 |       state_ = ThreadManager::STARTED; | 
 |       monitor_.notifyAll(); | 
 |     } | 
 |  | 
 |     while (state_ == STARTING) { | 
 |       monitor_.wait(); | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | void ThreadManager::Impl::stopImpl(bool join) { | 
 |   bool doStop = false; | 
 |   if (state_ == ThreadManager::STOPPED) { | 
 |     return; | 
 |   } | 
 |  | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     if (state_ != ThreadManager::STOPPING && | 
 |         state_ != ThreadManager::JOINING && | 
 |         state_ != ThreadManager::STOPPED) { | 
 |       doStop = true; | 
 |       state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING; | 
 |     } | 
 |   } | 
 |  | 
 |   if (doStop) { | 
 |     removeWorker(workerCount_); | 
 |   } | 
 |  | 
 |   // XXX | 
 |   // should be able to block here for transition to STOPPED since we're no | 
 |   // using shared_ptrs | 
 |  | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     state_ = ThreadManager::STOPPED; | 
 |   } | 
 |  | 
 | } | 
 |  | 
 | void ThreadManager::Impl::removeWorker(size_t value) { | 
 |   std::set<shared_ptr<Thread> > removedThreads; | 
 |   { | 
 |     Synchronized s(monitor_); | 
 |     if (value > workerMaxCount_) { | 
 |       throw InvalidArgumentException(); | 
 |     } | 
 |  | 
 |     workerMaxCount_ -= value; | 
 |  | 
 |     if (idleCount_ < value) { | 
 |       for (size_t ix = 0; ix < idleCount_; ix++) { | 
 |         monitor_.notify(); | 
 |       } | 
 |     } else { | 
 |       monitor_.notifyAll(); | 
 |     } | 
 |   } | 
 |  | 
 |   { | 
 |     Synchronized s(workerMonitor_); | 
 |  | 
 |     while (workerCount_ != workerMaxCount_) { | 
 |       workerMonitor_.wait(); | 
 |     } | 
 |  | 
 |     for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) { | 
 |       workers_.erase(*ix); | 
 |       idMap_.erase((*ix)->getId()); | 
 |     } | 
 |  | 
 |     deadWorkers_.clear(); | 
 |   } | 
 | } | 
 |  | 
 |   bool ThreadManager::Impl::canSleep() { | 
 |     const Thread::id_t id = threadFactory_->getCurrentThreadId(); | 
 |     return idMap_.find(id) == idMap_.end(); | 
 |   } | 
 |  | 
 |   void ThreadManager::Impl::add(shared_ptr<Runnable> value, | 
 |                                 int64_t timeout, | 
 |                                 int64_t expiration) { | 
 |     Guard g(mutex_, timeout); | 
 |  | 
 |     if (!g) { | 
 |       throw TimedOutException(); | 
 |     } | 
 |  | 
 |     if (state_ != ThreadManager::STARTED) { | 
 |       throw IllegalStateException("ThreadManager::Impl::add ThreadManager " | 
 |                                   "not started"); | 
 |     } | 
 |  | 
 |     removeExpiredTasks(); | 
 |     if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) { | 
 |       if (canSleep() && timeout >= 0) { | 
 |         while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) { | 
 |           // This is thread safe because the mutex is shared between monitors. | 
 |           maxMonitor_.wait(timeout); | 
 |         } | 
 |       } else { | 
 |         throw TooManyPendingTasksException(); | 
 |       } | 
 |     } | 
 |  | 
 |     tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration))); | 
 |  | 
 |     // If idle thread is available notify it, otherwise all worker threads are | 
 |     // running and will get around to this task in time. | 
 |     if (idleCount_ > 0) { | 
 |       monitor_.notify(); | 
 |     } | 
 |   } | 
 |  | 
 | void ThreadManager::Impl::remove(shared_ptr<Runnable> task) { | 
 |   (void) task; | 
 |   Synchronized s(monitor_); | 
 |   if (state_ != ThreadManager::STARTED) { | 
 |     throw IllegalStateException("ThreadManager::Impl::remove ThreadManager not " | 
 |                                 "started"); | 
 |   } | 
 | } | 
 |  | 
 | boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() { | 
 |   Guard g(mutex_); | 
 |   if (state_ != ThreadManager::STARTED) { | 
 |     throw IllegalStateException("ThreadManager::Impl::removeNextPending " | 
 |                                 "ThreadManager not started"); | 
 |   } | 
 |  | 
 |   if (tasks_.empty()) { | 
 |     return boost::shared_ptr<Runnable>(); | 
 |   } | 
 |  | 
 |   shared_ptr<ThreadManager::Task> task = tasks_.front(); | 
 |   tasks_.pop(); | 
 |    | 
 |   return task->getRunnable(); | 
 | } | 
 |  | 
 | void ThreadManager::Impl::removeExpiredTasks() { | 
 |   int64_t now = 0LL; // we won't ask for the time untile we need it | 
 |  | 
 |   // note that this loop breaks at the first non-expiring task | 
 |   while (!tasks_.empty()) { | 
 |     shared_ptr<ThreadManager::Task> task = tasks_.front(); | 
 |     if (task->getExpireTime() == 0LL) { | 
 |       break; | 
 |     } | 
 |     if (now == 0LL) { | 
 |       now = Util::currentTime(); | 
 |     } | 
 |     if (task->getExpireTime() > now) { | 
 |       break; | 
 |     } | 
 |     if (expireCallback_) { | 
 |       expireCallback_(task->getRunnable()); | 
 |     } | 
 |     tasks_.pop(); | 
 |     expiredCount_++; | 
 |   } | 
 | } | 
 |  | 
 |  | 
 | void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) { | 
 |   expireCallback_ = expireCallback; | 
 | } | 
 |  | 
 | class SimpleThreadManager : public ThreadManager::Impl { | 
 |  | 
 |  public: | 
 |   SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) : | 
 |     workerCount_(workerCount), | 
 |     pendingTaskCountMax_(pendingTaskCountMax), | 
 |     firstTime_(true) { | 
 |   } | 
 |  | 
 |   void start() { | 
 |     ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_); | 
 |     ThreadManager::Impl::start(); | 
 |     addWorker(workerCount_); | 
 |   } | 
 |  | 
 |  private: | 
 |   const size_t workerCount_; | 
 |   const size_t pendingTaskCountMax_; | 
 |   bool firstTime_; | 
 |   Monitor monitor_; | 
 | }; | 
 |  | 
 |  | 
 | shared_ptr<ThreadManager> ThreadManager::newThreadManager() { | 
 |   return shared_ptr<ThreadManager>(new ThreadManager::Impl()); | 
 | } | 
 |  | 
 | shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) { | 
 |   return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax)); | 
 | } | 
 |  | 
 | }}} // apache::thrift::concurrency | 
 |  |