blob: c4ca2b1810622440a7f1cc1633ea4c538c3c2ade [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "ThreadManager.h"
2
3#include <assert.h>
4
5namespace facebook { namespace thrift { namespace concurrency {
6
7/** ThreadManager class
8
9 This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
10 it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
11 PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
12 size needs to be adjusted and call this object addThread and removeThread methods to make changes.
13
14 This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
15 policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
16
17 @author marc
18 @version $Id */
19
20class ThreadManager::Task : public Runnable {
21
22public:
23 enum STATE {
24 WAITING,
25 EXECUTING,
26 CANCELLED,
27 COMPLETE
28 };
29
30 Task(Runnable* runnable) :
31 _runnable(runnable),
32 _state(WAITING)
33 {}
34
35 ~Task() {};
36
37 void run() {
38 if(_state == EXECUTING) {
39 _runnable->run();
40 _state = COMPLETE;
41 }
42 }
43
44 private:
45
46 Runnable* _runnable;
47
48 STATE _state;
49};
50
51class ThreadManager::Worker: public Runnable {
52
53 enum STATE {
54 UNINITIALIZED,
55 STARTING,
56 STARTED,
57 STOPPING,
58 STOPPED
59 };
60
61 public:
62 Worker(ThreadManager* manager) :
63 _manager(manager),
64 _state(UNINITIALIZED),
65 _idle(false)
66 {}
67
68 ~Worker() {}
69
70 /** Worker entry point
71
72 As long as worker thread is running, pull tasks off the task queue and execute. */
73
74 void run() {
75
76 {Synchronized(_manager->_monitor);
77
78 if(_state == STARTING) {
79 _state = STARTED;
80 }
81 }
82
83 do {
84
85 ThreadManager::Task* task = NULL;
86
87 /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
88
89 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
90
91 {Synchronized(_manager->_monitor);
92
93 while(_state == STARTED && _manager->_tasks.empty()) {
94
95 _manager->_idleCount++;
96
97 _idle = true;
98
99 _manager->_monitor.wait();
100
101 _idle = false;
102
103 _manager->_idleCount--;
104 }
105
106 if(_state == STARTED) {
107
108 task = _manager->_tasks.front();
109 }
110 }
111
112 if(task != NULL) {
113
114 task->run();
115
116 delete task;
117 }
118
119 } while(_state == STARTED);
120
121 {Synchronized(_manager->_monitor);
122
123 if(_state == STOPPING) {
124
125 _state = STOPPED;
126
127 _manager->_monitor.notify();
128
129 }
130 }
131
132 return;
133 }
134
135 private:
136
137 ThreadManager* _manager;
138
139 friend class ThreadManager;
140
141 STATE _state;
142
143 bool _idle;
144};
145
146ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) :
147 _hiwat(highWatermark),
148 _lowat(lowWatermark) {
149}
150
151ThreadManager::~ThreadManager() {}
152
153size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
154
155void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
156
157size_t ThreadManager::lowWatermark() const {return _lowat;}
158
159void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
160
161const PoolPolicy* ThreadManager::poolPolicy() const {
162
163 Synchronized s(_monitor);
164
165 return _poolPolicy;
166}
167
168void ThreadManager::poolPolicy(const PoolPolicy* value) {
169
170 Synchronized s(_monitor);
171
172 _poolPolicy = value;
173}
174
175const ThreadFactory* ThreadManager::threadFactory() const {
176
177 Synchronized s(_monitor);
178
179 return _threadFactory;
180}
181
182void ThreadManager::threadFactory(const ThreadFactory* value) {
183
184 Synchronized s(_monitor);
185
186 _threadFactory = value;
187}
188
189void ThreadManager::addThread(size_t value) {
190
191 std::set<Thread*> newThreads;
192
193 for(size_t ix = 0; ix < value; ix++) {
194
195 ThreadManager::Worker* worker = new ThreadManager::Worker(this);
196
197 newThreads.insert(_threadFactory->newThread(worker));
198 }
199
200 for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
201
202 (*ix)->start();
203 }
204 for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
205
206 (*ix)->start();
207 }
208
209 {Synchronized s(_monitor);
210
211 _workers.insert(newThreads.begin(), newThreads.end());
212 }
213}
214
215void ThreadManager::removeThread(size_t value) {
216
217 std::set<Thread*> removedThreads;
218
219 {Synchronized s(_monitor);
220
221 /* Overly clever loop
222
223 First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
224 and remove a sufficient number of busy threads. */
225
226 for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
227
228 for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
229
230 Worker* worker = (Worker*)(*workerThread)->runnable();
231
232 if(worker->_idle || !idleOnly) {
233
234 removedThreads.insert(*workerThread);
235
236 _workers.erase(workerThread);
237 }
238 }
239 }
240
241 _monitor.notifyAll();
242 }
243
244
245 // Join removed threads and free worker
246
247 for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
248
249 Worker* worker = (Worker*)(*workerThread)->runnable();
250
251 (*workerThread)->join();
252
253 delete worker;
254 }
255}
256
257size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
258
259size_t ThreadManager::workerCount() const {
260
261 Synchronized s(_monitor);
262
263 return _workers.size();
264}
265
266size_t ThreadManager::pendingTaskCount() const {
267
268 Synchronized s(_monitor);
269
270 return _tasks.size();
271}
272
273size_t ThreadManager::totalTaskCount() const {
274
275 Synchronized s(_monitor);
276
277 return _tasks.size() + _workers.size() - _idleCount;
278}
279
280void ThreadManager::add(Runnable* value) {
281
282 Synchronized s(_monitor);
283
284 _tasks.push(new ThreadManager::Task(value));
285
286 /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
287 task in time. */
288
289 if(_tasks.size() == 1) {
290
291 assert(_idleCount == _workers.size());
292
293 _monitor.notify();
294 }
295}
296
297void ThreadManager::remove(Runnable* task) {
298
299 Synchronized s(_monitor);
300}
301
302}}} // facebook::thrift::concurrency
303