blob: 52473c7e201f18fb2b81b739284c27af6b80b2e7 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +000021#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +000023
Marc Slemko6f038a72006-08-03 18:58:09 +000024#include <boost/shared_ptr.hpp>
25
Marc Slemko66949872006-07-15 01:52:39 +000026#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000027#include <queue>
28#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000029
Marc Slemko6f038a72006-08-03 18:58:09 +000030#if defined(DEBUG)
31#include <iostream>
32#endif //defined(DEBUG)
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
David Reissd4a269c2007-08-23 02:37:19 +000036using boost::shared_ptr;
37using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000038
Mark Sleef5f2be42006-09-05 21:05:31 +000039/**
40 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000041 *
Mark Sleef5f2be42006-09-05 21:05:31 +000042 * This class manages a pool of threads. It uses a ThreadFactory to create
43 * threads. It never actually creates or destroys worker threads, rather
44 * it maintains statistics on number of idle threads, number of active threads,
45 * task backlog, and average wait and service times.
46 *
Mark Sleef5f2be42006-09-05 21:05:31 +000047 * @version $Id:$
48 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049class ThreadManager::Impl : public ThreadManager {
50
51 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000053 workerCount_(0),
54 workerMaxCount_(0),
55 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000056 pendingTaskCountMax_(0),
David Reissa0dbfef2010-03-09 05:19:32 +000057 state_(ThreadManager::UNINITIALIZED),
58 monitor_(&mutex_),
59 maxMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000060
Mark Sleef5f2be42006-09-05 21:05:31 +000061 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000062
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000063 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000064
Mark Slee7c10eaf2007-03-01 02:45:10 +000065 void stop() { stopImpl(false); }
66
67 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000068
Mark Slee2f6404d2006-10-10 01:37:40 +000069 const ThreadManager::STATE state() const {
70 return state_;
71 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000072
Marc Slemko6f038a72006-08-03 18:58:09 +000073 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000074 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000075 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000076 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000077
78 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000079 Synchronized s(monitor_);
80 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000081 }
82
Marc Slemkod466b212006-07-20 00:04:18 +000083 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084
Marc Slemkod466b212006-07-20 00:04:18 +000085 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000086
Mark Slee2f6404d2006-10-10 01:37:40 +000087 size_t idleWorkerCount() const {
88 return idleCount_;
89 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000090
91 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000092 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000093 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000094 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000095
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000098 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000099 }
100
101 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000102 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000103 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000104 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000105
106 size_t pendingTaskCountMax() const {
107 Synchronized s(monitor_);
108 return pendingTaskCountMax_;
109 }
110
111 void pendingTaskCountMax(const size_t value) {
112 Synchronized s(monitor_);
113 pendingTaskCountMax_ = value;
114 }
115
116 bool canSleep();
117
Mark Slee9b82d272007-05-23 05:16:07 +0000118 void add(shared_ptr<Runnable> value, int64_t timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000119
Marc Slemko6f038a72006-08-03 18:58:09 +0000120 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000121
David Reiss01fe1532010-03-09 05:19:25 +0000122 shared_ptr<Runnable> removeNextPending();
123
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000125 void stopImpl(bool join);
126
Mark Slee2f6404d2006-10-10 01:37:40 +0000127 size_t workerCount_;
128 size_t workerMaxCount_;
129 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000130 size_t pendingTaskCountMax_;
131
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 ThreadManager::STATE state_;
133 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000134
Mark Sleef5f2be42006-09-05 21:05:31 +0000135
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000136 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000137 std::queue<shared_ptr<Task> > tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000138 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000139 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000140 Monitor maxMonitor_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000141 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000142
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000143 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000144 std::set<shared_ptr<Thread> > workers_;
145 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000146 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000147};
Marc Slemko66949872006-07-15 01:52:39 +0000148
149class ThreadManager::Task : public Runnable {
150
Mark Sleef5f2be42006-09-05 21:05:31 +0000151 public:
Marc Slemko66949872006-07-15 01:52:39 +0000152 enum STATE {
153 WAITING,
154 EXECUTING,
155 CANCELLED,
156 COMPLETE
157 };
158
Marc Slemko6f038a72006-08-03 18:58:09 +0000159 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 runnable_(runnable),
161 state_(WAITING) {}
Marc Slemko66949872006-07-15 01:52:39 +0000162
Mark Sleef5f2be42006-09-05 21:05:31 +0000163 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000164
165 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 if (state_ == EXECUTING) {
167 runnable_->run();
168 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000169 }
170 }
171
David Reiss01fe1532010-03-09 05:19:25 +0000172 shared_ptr<Runnable> getRunnable() {
173 return runnable_;
174 }
175
Marc Slemko66949872006-07-15 01:52:39 +0000176 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000178 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000180};
181
182class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000183 enum STATE {
184 UNINITIALIZED,
185 STARTING,
186 STARTED,
187 STOPPING,
188 STOPPED
189 };
190
191 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000192 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000193 manager_(manager),
194 state_(UNINITIALIZED),
195 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000196
197 ~Worker() {}
198
Mark Slee7c10eaf2007-03-01 02:45:10 +0000199 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000200 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000201 return
202 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
203 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000205
Mark Slee7c10eaf2007-03-01 02:45:10 +0000206 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000207 /**
208 * Worker entry point
209 *
210 * As long as worker thread is running, pull tasks off the task queue and
211 * execute.
212 */
Marc Slemko66949872006-07-15 01:52:39 +0000213 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000214 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000215 bool notifyManager = false;
216
Mark Sleef5f2be42006-09-05 21:05:31 +0000217 /**
218 * Increment worker semaphore and notify manager if worker count reached
219 * desired max
220 *
221 * Note: We have to release the monitor and acquire the workerMonitor
222 * since that is what the manager blocks on for worker add/remove
223 */
224 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000225 Synchronized s(manager_->monitor_);
226 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000227 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000228 manager_->workerCount_++;
229 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000230 }
231 }
232
Mark Sleef5f2be42006-09-05 21:05:31 +0000233 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 Synchronized s(manager_->workerMonitor_);
235 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000236 notifyManager = false;
237 }
238
Mark Sleef5f2be42006-09-05 21:05:31 +0000239 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000240 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000241
Mark Sleef5f2be42006-09-05 21:05:31 +0000242 /**
243 * While holding manager monitor block for non-empty task queue (Also
244 * check that the thread hasn't been requested to stop). Once the queue
245 * is non-empty, dequeue a task, release monitor, and execute. If the
246 * worker max count has been decremented such that we exceed it, mark
247 * ourself inactive, decrement the worker count and notify the manager
248 * (technically we're notifying the next blocked thread but eventually
249 * the manager will see it.
250 */
251 {
David Reissa0dbfef2010-03-09 05:19:32 +0000252 Guard g(manager_->mutex_);
David Reiss96d23882007-07-26 21:10:32 +0000253 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000254
David Reiss96d23882007-07-26 21:10:32 +0000255 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000256 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000257 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000258 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000259 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000260 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000261 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000262 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000263
David Reiss96d23882007-07-26 21:10:32 +0000264 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000265 if (!manager_->tasks_.empty()) {
266 task = manager_->tasks_.front();
267 manager_->tasks_.pop();
268 if (task->state_ == ThreadManager::Task::WAITING) {
269 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000270 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000271
272 /* If we have a pending task max and we just dropped below it, wakeup any
273 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000274 if (manager_->pendingTaskCountMax_ != 0 &&
275 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
David Reissa0dbfef2010-03-09 05:19:32 +0000276 manager_->maxMonitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000277 }
David Reiss96d23882007-07-26 21:10:32 +0000278 }
279 } else {
280 idle_ = true;
281 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000282 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000283 }
Marc Slemko66949872006-07-15 01:52:39 +0000284 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000285
Mark Sleef5f2be42006-09-05 21:05:31 +0000286 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000287 if (task->state_ == ThreadManager::Task::EXECUTING) {
288 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000289 task->run();
290 } catch(...) {
291 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000292 }
293 }
Marc Slemko66949872006-07-15 01:52:39 +0000294 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000295 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000296
Mark Sleef5f2be42006-09-05 21:05:31 +0000297 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000298 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000299 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000300 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000301 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000302 }
303 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000304
Marc Slemko66949872006-07-15 01:52:39 +0000305 return;
306 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000307
Mark Sleef5f2be42006-09-05 21:05:31 +0000308 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000309 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000310 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000311 STATE state_;
312 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000313};
314
Mark Sleef5f2be42006-09-05 21:05:31 +0000315
316 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000317 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000318 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000319 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000320 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000321 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000322 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000323
Mark Sleef5f2be42006-09-05 21:05:31 +0000324 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000325 Synchronized s(monitor_);
326 workerMaxCount_ += value;
327 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000328 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000329
Mark Sleef5f2be42006-09-05 21:05:31 +0000330 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000331 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000332 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000333 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000334 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000335 }
336
Mark Sleef5f2be42006-09-05 21:05:31 +0000337 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000338 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000339 while (workerCount_ != workerMaxCount_) {
340 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000341 }
342 }
343}
Marc Slemkod466b212006-07-20 00:04:18 +0000344
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000345void ThreadManager::Impl::start() {
346
Mark Slee2f6404d2006-10-10 01:37:40 +0000347 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000348 return;
349 }
350
Mark Sleef5f2be42006-09-05 21:05:31 +0000351 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000352 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000353 if (state_ == ThreadManager::UNINITIALIZED) {
354 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000355 throw InvalidArgumentException();
356 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000357 state_ = ThreadManager::STARTED;
358 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000359 }
360
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 while (state_ == STARTING) {
362 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000363 }
364 }
365}
366
Mark Slee7c10eaf2007-03-01 02:45:10 +0000367void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000368 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000369 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000370 return;
371 }
372
Mark Sleef5f2be42006-09-05 21:05:31 +0000373 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000374 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000375 if (state_ != ThreadManager::STOPPING &&
376 state_ != ThreadManager::JOINING &&
377 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000378 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000379 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000380 }
381 }
382
Mark Sleef5f2be42006-09-05 21:05:31 +0000383 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000385 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000386
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000387 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000388 // should be able to block here for transition to STOPPED since we're no
389 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000390
391 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000392 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000393 state_ = ThreadManager::STOPPED;
394 }
395
Marc Slemkod466b212006-07-20 00:04:18 +0000396}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000397
Marc Slemkod466b212006-07-20 00:04:18 +0000398void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000399 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000400 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000401 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000403 throw InvalidArgumentException();
404 }
405
Mark Slee2f6404d2006-10-10 01:37:40 +0000406 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000407
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 if (idleCount_ < value) {
409 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000410 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000411 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000412 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000413 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000414 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000415 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000416
Mark Sleef5f2be42006-09-05 21:05:31 +0000417 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000418 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000419
Mark Slee2f6404d2006-10-10 01:37:40 +0000420 while (workerCount_ != workerMaxCount_) {
421 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000422 }
423
Mark Slee2f6404d2006-10-10 01:37:40 +0000424 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
425 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000426 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000427 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000428
Mark Slee2f6404d2006-10-10 01:37:40 +0000429 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000430 }
431}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000432
433 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000434 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000435 return idMap_.find(id) == idMap_.end();
436 }
437
Mark Slee9b82d272007-05-23 05:16:07 +0000438 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
David Reissa0dbfef2010-03-09 05:19:32 +0000439 Guard g(mutex_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000440
Mark Slee2f6404d2006-10-10 01:37:40 +0000441 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000442 throw IllegalStateException();
443 }
Marc Slemkod466b212006-07-20 00:04:18 +0000444
Mark Slee2782d6d2007-05-23 04:55:30 +0000445 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000446 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000447 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
David Reissa0dbfef2010-03-09 05:19:32 +0000448 // This is thread safe because the mutex is shared between monitors.
449 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000450 }
451 } else {
452 throw TooManyPendingTasksException();
453 }
454 }
455
Mark Slee2f6404d2006-10-10 01:37:40 +0000456 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000457
Mark Sleef5f2be42006-09-05 21:05:31 +0000458 // If idle thread is available notify it, otherwise all worker threads are
459 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 if (idleCount_ > 0) {
461 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000462 }
Marc Slemko66949872006-07-15 01:52:39 +0000463 }
464
Marc Slemko6f038a72006-08-03 18:58:09 +0000465void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000466 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000467 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000468 throw IllegalStateException();
469 }
Marc Slemko66949872006-07-15 01:52:39 +0000470}
471
David Reiss01fe1532010-03-09 05:19:25 +0000472boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
473 Guard g(mutex_);
474 if (state_ != ThreadManager::STARTED) {
475 throw IllegalStateException();
476 }
477
478 if (tasks_.empty()) {
479 return boost::shared_ptr<Runnable>();
480 }
481
482 shared_ptr<ThreadManager::Task> task = tasks_.front();
483 tasks_.pop();
484
485 return task->getRunnable();
486}
487
Marc Slemkod466b212006-07-20 00:04:18 +0000488class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000489
Mark Slee2782d6d2007-05-23 04:55:30 +0000490 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000491 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000492 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000493 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000495 }
Marc Slemko66949872006-07-15 01:52:39 +0000496
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000497 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000498 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000499 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000500 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000501 }
502
Mark Slee2782d6d2007-05-23 04:55:30 +0000503 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000504 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000505 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000506 bool firstTime_;
507 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000508};
Marc Slemko66949872006-07-15 01:52:39 +0000509
Marc Slemko66949872006-07-15 01:52:39 +0000510
Marc Slemko6f038a72006-08-03 18:58:09 +0000511shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
512 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000513}
Marc Slemko66949872006-07-15 01:52:39 +0000514
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000515shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
516 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000517}
Marc Slemko66949872006-07-15 01:52:39 +0000518
T Jake Lucianib5e62212009-01-31 22:36:20 +0000519}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000520