blob: e7b5174310336cc60d4b0d47b773b015cc02c8d2 [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <config.h>
21#include <concurrency/ThreadManager.h>
22#include <concurrency/PosixThreadFactory.h>
23#include <concurrency/Monitor.h>
24#include <concurrency/Util.h>
25
26#include <assert.h>
27#include <set>
28#include <iostream>
29#include <set>
30#include <stdint.h>
31
32namespace apache { namespace thrift { namespace concurrency { namespace test {
33
34using namespace apache::thrift::concurrency;
35
36/**
37 * ThreadManagerTests class
38 *
39 * @version $Id:$
40 */
41class ThreadManagerTests {
42
43public:
44
45 static const double ERROR;
46
47 class Task: public Runnable {
48
49 public:
50
51 Task(Monitor& monitor, size_t& count, int64_t timeout) :
52 _monitor(monitor),
53 _count(count),
54 _timeout(timeout),
55 _done(false) {}
56
57 void run() {
58
59 _startTime = Util::currentTime();
60
61 {
62 Synchronized s(_sleep);
63
64 try {
65 _sleep.wait(_timeout);
66 } catch(TimedOutException& e) {
67 ;
68 }catch(...) {
69 assert(0);
70 }
71 }
72
73 _endTime = Util::currentTime();
74
75 _done = true;
76
77 {
78 Synchronized s(_monitor);
79
80 // std::cout << "Thread " << _count << " completed " << std::endl;
81
82 _count--;
83
84 if (_count == 0) {
85
86 _monitor.notify();
87 }
88 }
89 }
90
91 Monitor& _monitor;
92 size_t& _count;
93 int64_t _timeout;
94 int64_t _startTime;
95 int64_t _endTime;
96 bool _done;
97 Monitor _sleep;
98 };
99
100 /**
101 * Dispatch count tasks, each of which blocks for timeout milliseconds then
102 * completes. Verify that all tasks completed and that thread manager cleans
103 * up properly on delete.
104 */
105 bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
106
107 Monitor monitor;
108
109 size_t activeCount = count;
110
111 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
112
113 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
114
115 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
116
117 threadManager->threadFactory(threadFactory);
118
119 threadManager->start();
120
121 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
122
123 for (size_t ix = 0; ix < count; ix++) {
124
125 tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
126 }
127
128 int64_t time00 = Util::currentTime();
129
130 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
131
132 threadManager->add(*ix);
133 }
134
135 {
136 Synchronized s(monitor);
137
138 while(activeCount > 0) {
139
140 monitor.wait();
141 }
142 }
143
144 int64_t time01 = Util::currentTime();
145
146 int64_t firstTime = 9223372036854775807LL;
147 int64_t lastTime = 0;
148
149 double averageTime = 0;
150 int64_t minTime = 9223372036854775807LL;
151 int64_t maxTime = 0;
152
153 for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
154
155 shared_ptr<ThreadManagerTests::Task> task = *ix;
156
157 int64_t delta = task->_endTime - task->_startTime;
158
159 assert(delta > 0);
160
161 if (task->_startTime < firstTime) {
162 firstTime = task->_startTime;
163 }
164
165 if (task->_endTime > lastTime) {
166 lastTime = task->_endTime;
167 }
168
169 if (delta < minTime) {
170 minTime = delta;
171 }
172
173 if (delta > maxTime) {
174 maxTime = delta;
175 }
176
177 averageTime+= delta;
178 }
179
180 averageTime /= count;
181
182 std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
183
184 double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
185
186 double error = ((time01 - time00) - expectedTime) / expectedTime;
187
188 if (error < 0) {
189 error*= -1.0;
190 }
191
192 bool success = error < ERROR;
193
194 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
195
196 return success;
197 }
198
199 class BlockTask: public Runnable {
200
201 public:
202
203 BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
204 _monitor(monitor),
205 _bmonitor(bmonitor),
206 _count(count) {}
207
208 void run() {
209 {
210 Synchronized s(_bmonitor);
211
212 _bmonitor.wait();
213
214 }
215
216 {
217 Synchronized s(_monitor);
218
219 _count--;
220
221 if (_count == 0) {
222
223 _monitor.notify();
224 }
225 }
226 }
227
228 Monitor& _monitor;
229 Monitor& _bmonitor;
230 size_t& _count;
231 };
232
233 /**
234 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
235 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
236
237 bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
238
239 bool success = false;
240
241 try {
242
243 Monitor bmonitor;
244 Monitor monitor;
245
246 size_t pendingTaskMaxCount = workerCount;
247
248 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
249
250 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
251
252 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
253
254 threadFactory->setPriority(PosixThreadFactory::HIGHEST);
255
256 threadManager->threadFactory(threadFactory);
257
258 threadManager->start();
259
260 std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
261
262 for (size_t ix = 0; ix < workerCount; ix++) {
263
264 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
265 }
266
267 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
268
269 tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
270 }
271
272 for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
273 threadManager->add(*ix);
274 }
275
276 if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
277 throw TException("Unexpected pending task count");
278 }
279
280 shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
281
282 try {
283 threadManager->add(extraTask, 1);
284 throw TException("Unexpected success adding task in excess of pending task count");
285 } catch(TimedOutException& e) {
286 }
287
288 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
289
290 {
291 Synchronized s(bmonitor);
292
293 bmonitor.notifyAll();
294 }
295
296 {
297 Synchronized s(monitor);
298
299 while(activeCounts[0] != 0) {
300 monitor.wait();
301 }
302 }
303
304 std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
305
306 try {
307 threadManager->add(extraTask, 1);
308 } catch(TimedOutException& e) {
309 std::cout << "\t\t\t" << "add timed out unexpectedly" << std::endl;
310 throw TException("Unexpected timeout adding task");
311
312 } catch(TooManyPendingTasksException& e) {
313 std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
314 throw TException("Unexpected timeout adding task");
315 }
316
317 // Wake up tasks that were pending before and wait for them to complete
318
319 {
320 Synchronized s(bmonitor);
321
322 bmonitor.notifyAll();
323 }
324
325 {
326 Synchronized s(monitor);
327
328 while(activeCounts[1] != 0) {
329 monitor.wait();
330 }
331 }
332
333 // Wake up the extra task and wait for it to complete
334
335 {
336 Synchronized s(bmonitor);
337
338 bmonitor.notifyAll();
339 }
340
341 {
342 Synchronized s(monitor);
343
344 while(activeCounts[2] != 0) {
345 monitor.wait();
346 }
347 }
348
349 if(!(success = (threadManager->totalTaskCount() == 0))) {
350 throw TException("Unexpected pending task count");
351 }
352
353 } catch(TException& e) {
354 }
355
356 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
357 return success;
358 }
359};
360
361const double ThreadManagerTests::ERROR = .20;
362
363}}}} // apache::thrift::concurrency
364
365using namespace apache::thrift::concurrency::test;
366