blob: 51ac38a9a5cb8b26854133dddd748bed898d91c1 [file] [log] [blame]
Marc Slemko3ea00332006-08-17 01:11:13 +00001#include <concurrency/ThreadManager.h>
2#include <concurrency/PosixThreadFactory.h>
3#include <concurrency/Monitor.h>
4#include <concurrency/Util.h>
5#include <protocol/TBinaryProtocol.h>
6#include <server/TSimpleServer.h>
7#include <server/TThreadPoolServer.h>
8#include <transport/TServerSocket.h>
9#include <transport/TSocket.h>
10#include <transport/TBufferedTransport.h>
11#include "StressTest.h"
12
13#include <iostream>
14#include <set>
15#include <stdexcept>
16#include <sstream>
17
18using namespace std;
19
20using namespace facebook::thrift;
21using namespace facebook::thrift::protocol;
22using namespace facebook::thrift::transport;
23using namespace facebook::thrift::server;
24
25using namespace test::stress;
26
27class Server : public ServiceServerIf {
28 public:
29 Server(shared_ptr<TProtocol> protocol) :
30 ServiceServerIf(protocol) {}
31
32 void echoVoid() {return;}
33 uint8_t echoByte(uint8_t arg) {return arg;}
34 uint16_t echoU16(uint16_t arg) {return arg;}
35 uint32_t echoU32(uint32_t arg) {return arg;}
36 uint64_t echoU64(uint64_t arg) {return arg;}
37};
38
39class ClientThread: public Runnable {
40public:
41
42 ClientThread(shared_ptr<TTransport>transport, shared_ptr<ServiceClient> client, Monitor& monitor, size_t& workerCount, size_t loopCount) :
43 _transport(transport),
44 _client(client),
45 _monitor(monitor),
46 _workerCount(workerCount),
47 _loopCount(loopCount)
48 {}
49
50 void run() {
51
52 // Wait for all worker threads to start
53
54 {Synchronized s(_monitor);
55 while(_workerCount == 0) {
56 _monitor.wait();
57 }
58 }
59
60 _startTime = Util::currentTime();
61
62 _transport->open();
63
64 //uint64_t arg = 0;
65 //uint64_t result = 0;
66
67 for(size_t ix = 0; ix < _loopCount; ix++) {
68 // result = _client->echoU64(arg);
69 // assert(result == arg);
70 _client->echoVoid();
71 //arg++;
72 }
73
74 _endTime = Util::currentTime();
75
76 _transport->close();
77
78 _done = true;
79
80 {Synchronized s(_monitor);
81
82 _workerCount--;
Marc Slemko056f9ba2006-08-17 02:59:05 +000083
Marc Slemko3ea00332006-08-17 01:11:13 +000084 if(_workerCount == 0) {
85
86 _monitor.notify();
87 }
88 }
89 }
90
Marc Slemko3ea00332006-08-17 01:11:13 +000091 shared_ptr<TTransport> _transport;
92 shared_ptr<ServiceClient> _client;
93 Monitor& _monitor;
94 size_t& _workerCount;
95 size_t _loopCount;
96 long long _startTime;
97 long long _endTime;
98 bool _done;
99 Monitor _sleep;
100};
101
Marc Slemko3ea00332006-08-17 01:11:13 +0000102int main(int argc, char **argv) {
103
104 int port = 9090;
105 string serverType = "thread-pool";
106 string protocolType = "binary";
107 size_t workerCount = 4;
108 size_t clientCount = 10;
109 size_t loopCount = 10000;
110
Marc Slemko056f9ba2006-08-17 02:59:05 +0000111 ostringstream usage;
Marc Slemko3ea00332006-08-17 01:11:13 +0000112
113 usage <<
114 argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>]" << endl <<
115
116 "\t\tserver-type\t\ttype of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl <<
117
118 "\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl <<
119
120 "\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl;
121
122 map<string, string> args;
123
124 for(int ix = 1; ix < argc; ix++) {
125
126 string arg(argv[ix]);
127
128 if(arg.compare(0,2, "--") == 0) {
129
130 size_t end = arg.find_first_of("=", 2);
131
Marc Slemko056f9ba2006-08-17 02:59:05 +0000132 string key = string(arg, 2, end - 2);
133
Marc Slemko3ea00332006-08-17 01:11:13 +0000134 if(end != string::npos) {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000135 args[key] = string(arg, end + 1);
Marc Slemko3ea00332006-08-17 01:11:13 +0000136 } else {
Marc Slemko056f9ba2006-08-17 02:59:05 +0000137 args[key] = "true";
Marc Slemko3ea00332006-08-17 01:11:13 +0000138 }
Marc Slemko3ea00332006-08-17 01:11:13 +0000139 } else {
140 throw invalid_argument("Unexcepted command line token: "+arg);
141 }
142 }
143
144 try {
145
146 if(!args["port"].empty()) {
147 port = atoi(args["port"].c_str());
148 }
149
150 if(!args["server-type"].empty()) {
151 serverType = args["server-type"];
152
153 if(serverType == "simple") {
154
155 } else if(serverType == "thread-pool") {
156
157 } else {
158
159 throw invalid_argument("Unknown server type "+serverType);
160 }
161 }
162
163 if(!args["workers"].empty()) {
164 workerCount = atoi(args["workers"].c_str());
165 }
166
167 if(!args["clients"].empty()) {
168 clientCount = atoi(args["clients"].c_str());
169 }
170
171 if(!args["loop"].empty()) {
172 loopCount = atoi(args["loop"].c_str());
173 }
174 } catch(exception& e) {
175 cerr << e.what() << endl;
176 cerr << usage;
177 }
178
179 // Dispatcher
180 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol);
181
182 shared_ptr<Server> server(new Server(binaryProtocol));
183
184 // Options
185 shared_ptr<TServerOptions> serverOptions(new TServerOptions());
186
187 // Transport
188 shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
189
190 // ThreadFactory
191
192 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
193
194 shared_ptr<Thread> serverThread;
195
196 if(serverType == "simple") {
197
198 serverThread = threadFactory->newThread(shared_ptr<Runnable>(new TSimpleServer(server, serverOptions, serverSocket)));
199
200 } else if(serverType == "thread-pool") {
201
202 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
203
204 threadManager->threadFactory(threadFactory);
205
206 threadManager->start();
207
208 serverThread = threadFactory->newThread(shared_ptr<TServer>(new TThreadPoolServer(server,
209 serverOptions,
210 serverSocket,
211 threadManager)));
212 }
213
Marc Slemko056f9ba2006-08-17 02:59:05 +0000214 cerr << "Starting the server on port " << port << endl;
Marc Slemko3ea00332006-08-17 01:11:13 +0000215
216 serverThread->start();
217
218 Monitor monitor;
219
220 size_t threadCount = 0;
221
222 set<shared_ptr<Thread> > clientThreads;
223
224 for(size_t ix = 0; ix < clientCount; ix++) {
225
226 shared_ptr<TSocket> socket(new TSocket("127.0.01", port));
227 shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket, 2048));
228 shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol());
229 shared_ptr<ServiceClient> serviceClient(new ServiceClient(bufferedSocket, binaryProtocol));
230
231 clientThreads.insert(threadFactory->newThread(shared_ptr<ClientThread>(new ClientThread(bufferedSocket, serviceClient, monitor, threadCount, loopCount))));
232 }
233
234 for(std::set<shared_ptr<Thread> >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) {
235 (*thread)->start();
236 }
237
Marc Slemko056f9ba2006-08-17 02:59:05 +0000238 cerr << endl;
239
240 long long time00;
241 long long time01;
Marc Slemko3ea00332006-08-17 01:11:13 +0000242
243 {Synchronized s(monitor);
244 threadCount = clientCount;
Marc Slemko056f9ba2006-08-17 02:59:05 +0000245
246 cerr << "Launch "<< clientCount << " client threads" << endl;
247
248 time00 = Util::currentTime();
249
Marc Slemko3ea00332006-08-17 01:11:13 +0000250 monitor.notifyAll();
251
252 while(threadCount > 0) {
253 monitor.wait();
254 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000255
256 time01 = Util::currentTime();
Marc Slemko3ea00332006-08-17 01:11:13 +0000257 }
Marc Slemko056f9ba2006-08-17 02:59:05 +0000258
259 long long firstTime = 9223372036854775807LL;
260 long long lastTime = 0;
Marc Slemko3ea00332006-08-17 01:11:13 +0000261
Marc Slemko056f9ba2006-08-17 02:59:05 +0000262 double averageTime = 0;
263 long long minTime = 9223372036854775807LL;
264 long long maxTime = 0;
265
266 for(set<shared_ptr<Thread> >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) {
267
268 shared_ptr<ClientThread> client = dynamic_pointer_cast<ClientThread>((*ix)->runnable());
269
270 long long delta = client->_endTime - client->_startTime;
271
272 assert(delta > 0);
273
274 if(client->_startTime < firstTime) {
275 firstTime = client->_startTime;
276 }
277
278 if(client->_endTime > lastTime) {
279 lastTime = client->_endTime;
280 }
281
282 if(delta < minTime) {
283 minTime = delta;
284 }
285
286 if(delta > maxTime) {
287 maxTime = delta;
288 }
289
290 averageTime+= delta;
291 }
292
293 averageTime /= clientCount;
294
295
296 cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl;
297
298 cerr << "done." << endl;
299
Marc Slemko3ea00332006-08-17 01:11:13 +0000300 return 0;
301}