blob: abfcf6e7057530e34e3e5695744e670e8f9cc496 [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include "ThreadManager.h"
21#include "Exception.h"
22#include "Monitor.h"
23
24#include <boost/shared_ptr.hpp>
25
26#include <assert.h>
27#include <queue>
28#include <set>
29
30#if defined(DEBUG)
31#include <iostream>
32#endif //defined(DEBUG)
33
34namespace apache { namespace thrift { namespace concurrency {
35
36using boost::shared_ptr;
37using boost::dynamic_pointer_cast;
38
39/**
40 * ThreadManager class
41 *
42 * This class manages a pool of threads. It uses a ThreadFactory to create
43 * threads. It never actually creates or destroys worker threads, rather
44 * it maintains statistics on number of idle threads, number of active threads,
45 * task backlog, and average wait and service times.
46 *
47 * @version $Id:$
48 */
49class ThreadManager::Impl : public ThreadManager {
50
51 public:
52 Impl() :
53 workerCount_(0),
54 workerMaxCount_(0),
55 idleCount_(0),
56 pendingTaskCountMax_(0),
57 state_(ThreadManager::UNINITIALIZED) {}
58
59 ~Impl() { stop(); }
60
61 void start();
62
63 void stop() { stopImpl(false); }
64
65 void join() { stopImpl(true); }
66
67 const ThreadManager::STATE state() const {
68 return state_;
69 }
70
71 shared_ptr<ThreadFactory> threadFactory() const {
72 Synchronized s(monitor_);
73 return threadFactory_;
74 }
75
76 void threadFactory(shared_ptr<ThreadFactory> value) {
77 Synchronized s(monitor_);
78 threadFactory_ = value;
79 }
80
81 void addWorker(size_t value);
82
83 void removeWorker(size_t value);
84
85 size_t idleWorkerCount() const {
86 return idleCount_;
87 }
88
89 size_t workerCount() const {
90 Synchronized s(monitor_);
91 return workerCount_;
92 }
93
94 size_t pendingTaskCount() const {
95 Synchronized s(monitor_);
96 return tasks_.size();
97 }
98
99 size_t totalTaskCount() const {
100 Synchronized s(monitor_);
101 return tasks_.size() + workerCount_ - idleCount_;
102 }
103
104 size_t pendingTaskCountMax() const {
105 Synchronized s(monitor_);
106 return pendingTaskCountMax_;
107 }
108
109 void pendingTaskCountMax(const size_t value) {
110 Synchronized s(monitor_);
111 pendingTaskCountMax_ = value;
112 }
113
114 bool canSleep();
115
116 void add(shared_ptr<Runnable> value, int64_t timeout);
117
118 void remove(shared_ptr<Runnable> task);
119
120private:
121 void stopImpl(bool join);
122
123 size_t workerCount_;
124 size_t workerMaxCount_;
125 size_t idleCount_;
126 size_t pendingTaskCountMax_;
127
128 ThreadManager::STATE state_;
129 shared_ptr<ThreadFactory> threadFactory_;
130
131
132 friend class ThreadManager::Task;
133 std::queue<shared_ptr<Task> > tasks_;
134 Monitor monitor_;
135 Monitor workerMonitor_;
136
137 friend class ThreadManager::Worker;
138 std::set<shared_ptr<Thread> > workers_;
139 std::set<shared_ptr<Thread> > deadWorkers_;
140 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
141};
142
143class ThreadManager::Task : public Runnable {
144
145 public:
146 enum STATE {
147 WAITING,
148 EXECUTING,
149 CANCELLED,
150 COMPLETE
151 };
152
153 Task(shared_ptr<Runnable> runnable) :
154 runnable_(runnable),
155 state_(WAITING) {}
156
157 ~Task() {}
158
159 void run() {
160 if (state_ == EXECUTING) {
161 runnable_->run();
162 state_ = COMPLETE;
163 }
164 }
165
166 private:
167 shared_ptr<Runnable> runnable_;
168 friend class ThreadManager::Worker;
169 STATE state_;
170};
171
172class ThreadManager::Worker: public Runnable {
173 enum STATE {
174 UNINITIALIZED,
175 STARTING,
176 STARTED,
177 STOPPING,
178 STOPPED
179 };
180
181 public:
182 Worker(ThreadManager::Impl* manager) :
183 manager_(manager),
184 state_(UNINITIALIZED),
185 idle_(false) {}
186
187 ~Worker() {}
188
189 private:
190 bool isActive() const {
191 return
192 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
193 (manager_->state_ == JOINING && !manager_->tasks_.empty());
194 }
195
196 public:
197 /**
198 * Worker entry point
199 *
200 * As long as worker thread is running, pull tasks off the task queue and
201 * execute.
202 */
203 void run() {
204 bool active = false;
205 bool notifyManager = false;
206
207 /**
208 * Increment worker semaphore and notify manager if worker count reached
209 * desired max
210 *
211 * Note: We have to release the monitor and acquire the workerMonitor
212 * since that is what the manager blocks on for worker add/remove
213 */
214 {
215 Synchronized s(manager_->monitor_);
216 active = manager_->workerCount_ < manager_->workerMaxCount_;
217 if (active) {
218 manager_->workerCount_++;
219 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
220 }
221 }
222
223 if (notifyManager) {
224 Synchronized s(manager_->workerMonitor_);
225 manager_->workerMonitor_.notify();
226 notifyManager = false;
227 }
228
229 while (active) {
230 shared_ptr<ThreadManager::Task> task;
231
232 /**
233 * While holding manager monitor block for non-empty task queue (Also
234 * check that the thread hasn't been requested to stop). Once the queue
235 * is non-empty, dequeue a task, release monitor, and execute. If the
236 * worker max count has been decremented such that we exceed it, mark
237 * ourself inactive, decrement the worker count and notify the manager
238 * (technically we're notifying the next blocked thread but eventually
239 * the manager will see it.
240 */
241 {
242 Synchronized s(manager_->monitor_);
243 active = isActive();
244
245 while (active && manager_->tasks_.empty()) {
246 manager_->idleCount_++;
247 idle_ = true;
248 manager_->monitor_.wait();
249 active = isActive();
250 idle_ = false;
251 manager_->idleCount_--;
252 }
253
254 if (active) {
255 if (!manager_->tasks_.empty()) {
256 task = manager_->tasks_.front();
257 manager_->tasks_.pop();
258 if (task->state_ == ThreadManager::Task::WAITING) {
259 task->state_ = ThreadManager::Task::EXECUTING;
260 }
261
262 /* If we have a pending task max and we just dropped below it, wakeup any
263 thread that might be blocked on add. */
264 if (manager_->pendingTaskCountMax_ != 0 &&
265 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
266 manager_->monitor_.notify();
267 }
268 }
269 } else {
270 idle_ = true;
271 manager_->workerCount_--;
272 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
273 }
274 }
275
276 if (task != NULL) {
277 if (task->state_ == ThreadManager::Task::EXECUTING) {
278 try {
279 task->run();
280 } catch(...) {
281 // XXX need to log this
282 }
283 }
284 }
285 }
286
287 {
288 Synchronized s(manager_->workerMonitor_);
289 manager_->deadWorkers_.insert(this->thread());
290 if (notifyManager) {
291 manager_->workerMonitor_.notify();
292 }
293 }
294
295 return;
296 }
297
298 private:
299 ThreadManager::Impl* manager_;
300 friend class ThreadManager::Impl;
301 STATE state_;
302 bool idle_;
303};
304
305
306 void ThreadManager::Impl::addWorker(size_t value) {
307 std::set<shared_ptr<Thread> > newThreads;
308 for (size_t ix = 0; ix < value; ix++) {
309 class ThreadManager::Worker;
310 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
311 newThreads.insert(threadFactory_->newThread(worker));
312 }
313
314 {
315 Synchronized s(monitor_);
316 workerMaxCount_ += value;
317 workers_.insert(newThreads.begin(), newThreads.end());
318 }
319
320 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
321 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
322 worker->state_ = ThreadManager::Worker::STARTING;
323 (*ix)->start();
324 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
325 }
326
327 {
328 Synchronized s(workerMonitor_);
329 while (workerCount_ != workerMaxCount_) {
330 workerMonitor_.wait();
331 }
332 }
333}
334
335void ThreadManager::Impl::start() {
336
337 if (state_ == ThreadManager::STOPPED) {
338 return;
339 }
340
341 {
342 Synchronized s(monitor_);
343 if (state_ == ThreadManager::UNINITIALIZED) {
344 if (threadFactory_ == NULL) {
345 throw InvalidArgumentException();
346 }
347 state_ = ThreadManager::STARTED;
348 monitor_.notifyAll();
349 }
350
351 while (state_ == STARTING) {
352 monitor_.wait();
353 }
354 }
355}
356
357void ThreadManager::Impl::stopImpl(bool join) {
358 bool doStop = false;
359 if (state_ == ThreadManager::STOPPED) {
360 return;
361 }
362
363 {
364 Synchronized s(monitor_);
365 if (state_ != ThreadManager::STOPPING &&
366 state_ != ThreadManager::JOINING &&
367 state_ != ThreadManager::STOPPED) {
368 doStop = true;
369 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
370 }
371 }
372
373 if (doStop) {
374 removeWorker(workerCount_);
375 }
376
377 // XXX
378 // should be able to block here for transition to STOPPED since we're no
379 // using shared_ptrs
380
381 {
382 Synchronized s(monitor_);
383 state_ = ThreadManager::STOPPED;
384 }
385
386}
387
388void ThreadManager::Impl::removeWorker(size_t value) {
389 std::set<shared_ptr<Thread> > removedThreads;
390 {
391 Synchronized s(monitor_);
392 if (value > workerMaxCount_) {
393 throw InvalidArgumentException();
394 }
395
396 workerMaxCount_ -= value;
397
398 if (idleCount_ < value) {
399 for (size_t ix = 0; ix < idleCount_; ix++) {
400 monitor_.notify();
401 }
402 } else {
403 monitor_.notifyAll();
404 }
405 }
406
407 {
408 Synchronized s(workerMonitor_);
409
410 while (workerCount_ != workerMaxCount_) {
411 workerMonitor_.wait();
412 }
413
414 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
415 workers_.erase(*ix);
416 idMap_.erase((*ix)->getId());
417 }
418
419 deadWorkers_.clear();
420 }
421}
422
423 bool ThreadManager::Impl::canSleep() {
424 const Thread::id_t id = threadFactory_->getCurrentThreadId();
425 return idMap_.find(id) == idMap_.end();
426 }
427
428 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
429 Synchronized s(monitor_);
430
431 if (state_ != ThreadManager::STARTED) {
432 throw IllegalStateException();
433 }
434
435 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
436 if (canSleep() && timeout >= 0) {
437 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
438 monitor_.wait(timeout);
439 }
440 } else {
441 throw TooManyPendingTasksException();
442 }
443 }
444
445 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
446
447 // If idle thread is available notify it, otherwise all worker threads are
448 // running and will get around to this task in time.
449 if (idleCount_ > 0) {
450 monitor_.notify();
451 }
452 }
453
454void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
455 Synchronized s(monitor_);
456 if (state_ != ThreadManager::STARTED) {
457 throw IllegalStateException();
458 }
459}
460
461class SimpleThreadManager : public ThreadManager::Impl {
462
463 public:
464 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
465 workerCount_(workerCount),
466 pendingTaskCountMax_(pendingTaskCountMax),
467 firstTime_(true) {
468 }
469
470 void start() {
471 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
472 ThreadManager::Impl::start();
473 addWorker(workerCount_);
474 }
475
476 private:
477 const size_t workerCount_;
478 const size_t pendingTaskCountMax_;
479 bool firstTime_;
480 Monitor monitor_;
481};
482
483
484shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
485 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
486}
487
488shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
489 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
490}
491
492}}} // apache::thrift::concurrency
493