From e02385b0b4e84aed0e456cb697a471c211705eba Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Sat, 9 Jun 2007 01:21:16 +0000 Subject: [PATCH] Add thread pool option to NonblockingServer Summary: If you want requests processed outside of the I/O thread Reviewed By: jake luciani, aditya Test Plan: nb-main.cpp, in the test folder git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665132 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TNonblockingServer.cpp | 132 ++++-- lib/cpp/src/server/TNonblockingServer.h | 57 ++- lib/cpp/src/transport/TServerSocket.cpp | 4 +- test/cpp/Makefile.stress | 22 +- test/cpp/src/nb-main.cpp | 484 ++++++++++++++++++++++ 5 files changed, 659 insertions(+), 40 deletions(-) create mode 100644 test/cpp/src/nb-main.cpp diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 63378061..5c4dc8c9 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -6,6 +6,7 @@ #include "TNonblockingServer.h" +#include #include #include #include @@ -17,6 +18,50 @@ namespace facebook { namespace thrift { namespace server { using namespace facebook::thrift::protocol; using namespace facebook::thrift::transport; +using namespace std; + +class TConnection::Task: public Runnable { + public: + Task(boost::shared_ptr processor, + boost::shared_ptr input, + boost::shared_ptr output, + int taskHandle) : + processor_(processor), + input_(input), + output_(output), + taskHandle_(taskHandle) {} + + void run() { + try { + while (processor_->process(input_, output_)) { + if (!input_->getTransport()->peek()) { + break; + } + } + } catch (TTransportException& ttx) { + cerr << "TThreadedServer client died: " << ttx.what() << endl; + } catch (TException& x) { + cerr << "TThreadedServer exception: " << x.what() << endl; + } catch (...) { + cerr << "TThreadedServer uncaught exception." << endl; + } + + // Signal completion back to the libevent thread via a socketpair + int8_t b = 0; + if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) { + GlobalOutput("TNonblockingServer::Task: send"); + } + if (-1 == ::close(taskHandle_)) { + GlobalOutput("TNonblockingServer::Task: close, possible resource leak"); + } + } + + private: + boost::shared_ptr processor_; + boost::shared_ptr input_; + boost::shared_ptr output_; + int taskHandle_; +}; void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { socket_ = socket; @@ -34,6 +79,8 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { socketState_ = SOCKET_RECV; appState_ = APP_INIT; + taskHandle_ = -1; + // Set flags, which also registers the event setFlags(eventFlags); @@ -168,24 +215,59 @@ void TConnection::transition() { // and get back some data from the dispatch function inputTransport_->resetBuffer(readBuffer_, readBufferPos_); outputTransport_->resetBuffer(); - - try { - // Invoke the processor - server_->getProcessor()->process(inputProtocol_, outputProtocol_); - } catch (TTransportException &ttx) { - fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what()); - close(); - return; - } catch (TException &x) { - fprintf(stderr, "TException: Server::process() %s\n", x.what()); - close(); - return; - } catch (...) { - fprintf(stderr, "Server::process() unknown exception\n"); - close(); - return; + + if (server_->isThreadPoolProcessing()) { + // We are setting up a Task to do this work and we will wait on it + int sv[2]; + if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) { + GlobalOutput("TConnection::socketpair() failed"); + // Now we will fall through to the APP_WAIT_TASK block with no response + } else { + // Create task and dispatch to the thread manager + boost::shared_ptr task = + boost::shared_ptr(new Task(server_->getProcessor(), + inputProtocol_, + outputProtocol_, + sv[1])); + appState_ = APP_WAIT_TASK; + event_set(&taskEvent_, + taskHandle_ = sv[0], + EV_READ, + TConnection::taskHandler, + this); + + // Add the event and start up the server + if (-1 == event_add(&taskEvent_, 0)) { + GlobalOutput("TNonblockingServer::serve(): coult not event_add"); + return; + } + server_->addTask(task); + return; + } + } else { + try { + // Invoke the processor + server_->getProcessor()->process(inputProtocol_, outputProtocol_); + } catch (TTransportException &ttx) { + fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what()); + close(); + return; + } catch (TException &x) { + fprintf(stderr, "TException: Server::process() %s\n", x.what()); + close(); + return; + } catch (...) { + fprintf(stderr, "Server::process() unknown exception\n"); + close(); + return; + } } + case APP_WAIT_TASK: + // We have now finished processing a task and the result has been written + // into the outputTransport_, so we grab its contents and place them into + // the writeBuffer_ for actual writing by the libevent thread + // Get the result of the operation outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_); @@ -212,7 +294,7 @@ void TConnection::transition() { setWrite(); // Try to work the socket immediately - workSocket(); + // workSocket(); return; } @@ -232,7 +314,7 @@ void TConnection::transition() { appState_ = APP_SEND_RESULT; // Go to work on the socket right away, probably still writeable - workSocket(); + // workSocket(); return; @@ -260,7 +342,7 @@ void TConnection::transition() { setRead(); // Try to work the socket right away - workSocket(); + // workSocket(); return; @@ -283,7 +365,7 @@ void TConnection::transition() { appState_ = APP_READ_REQUEST; // Work the socket right away - workSocket(); + // workSocket(); return; @@ -315,13 +397,13 @@ void TConnection::setFlags(short eventFlags) { * * Prepares the event structure &event to be used in future calls to * event_add() and event_del(). The event will be prepared to call the - * event_handler using the 'sock' file descriptor to monitor events. + * eventHandler using the 'sock' file descriptor to monitor events. * * The events can be either EV_READ, EV_WRITE, or both, indicating * that an application can read or write from the file respectively without * blocking. * - * The event_handler will be called with the file descriptor that triggered + * The eventHandler will be called with the file descriptor that triggered * the event and the type of event which will be one of: EV_TIMEOUT, * EV_SIGNAL, EV_READ, EV_WRITE. * @@ -330,7 +412,7 @@ void TConnection::setFlags(short eventFlags) { * * Once initialized, the &event struct can be used repeatedly with * event_add() and event_del() and does not need to be reinitialized unless - * the event_handler and/or the argument to it are to be changed. However, + * the eventHandler and/or the argument to it are to be changed. However, * when an ev structure has been added to libevent using event_add() the * structure must persist until the event occurs (assuming EV_PERSIST * is not set) or is removed using event_del(). You may not reuse the same @@ -516,12 +598,12 @@ void TNonblockingServer::serve() { this); // Add the event and start up the server - if (event_add(&serverEvent, 0) == -1) { + if (-1 == event_add(&serverEvent, 0)) { GlobalOutput("TNonblockingServer::serve(): coult not event_add"); return; } - // Run libevent engine, never returns, invokes calls to event_handler + // Run libevent engine, never returns, invokes calls to eventHandler event_loop(0); } diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index e4f83468..0ce7ccb8 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,8 @@ namespace facebook { namespace thrift { namespace server { using facebook::thrift::transport::TMemoryBuffer; using facebook::thrift::protocol::TProtocol; +using facebook::thrift::concurrency::Runnable; +using facebook::thrift::concurrency::ThreadManager; // Forward declaration of class class TConnection; @@ -46,6 +49,12 @@ class TNonblockingServer : public TServer { // Whether to frame responses bool frameResponses_; + // For processing via thread pool, may be NULL + boost::shared_ptr threadManager_; + + // Is thread pool processing? + bool threadPoolProcessing_; + /** * This is a stack of all the objects that have been created but that * are NOT currently in use. When we close a connection, we place it on this @@ -66,15 +75,18 @@ class TNonblockingServer : public TServer { TNonblockingServer(boost::shared_ptr processor, boost::shared_ptr protocolFactory, - int port) : + int port, + boost::shared_ptr threadManager = boost::shared_ptr()) : TServer(processor), serverSocket_(0), port_(port), - frameResponses_(true) { + frameResponses_(true), + threadManager_(threadManager) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); setOutputTransportFactory(boost::shared_ptr(new TTransportFactory())); setInputProtocolFactory(protocolFactory); setOutputProtocolFactory(protocolFactory); + setThreadManager(threadManager); } TNonblockingServer(boost::shared_ptr processor, @@ -82,19 +94,35 @@ class TNonblockingServer : public TServer { boost::shared_ptr outputTransportFactory, boost::shared_ptr inputProtocolFactory, boost::shared_ptr outputProtocolFactory, - int port) : + int port, + boost::shared_ptr threadManager = boost::shared_ptr()) : TServer(processor), serverSocket_(0), port_(port), - frameResponses_(true) { + frameResponses_(true), + threadManager_(threadManager) { setInputTransportFactory(inputTransportFactory); setOutputTransportFactory(outputTransportFactory); setInputProtocolFactory(inputProtocolFactory); setOutputProtocolFactory(outputProtocolFactory); + setThreadManager(threadManager); } ~TNonblockingServer() {} + void setThreadManager(boost::shared_ptr threadManager) { + threadManager_ = threadManager; + threadPoolProcessing_ = (threadManager != NULL); + } + + bool isThreadPoolProcessing() { + return threadPoolProcessing_; + } + + void addTask(boost::shared_ptr task) { + threadManager_->add(task); + } + void setFrameResponses(bool frameResponses) { frameResponses_ = frameResponses; } @@ -133,6 +161,7 @@ enum TAppState { APP_INIT, APP_READ_FRAME_SIZE, APP_READ_REQUEST, + APP_WAIT_TASK, APP_SEND_FRAME_SIZE, APP_SEND_RESULT }; @@ -144,6 +173,8 @@ enum TAppState { class TConnection { private: + class Task; + // Server handle TNonblockingServer* server_; @@ -186,6 +217,12 @@ class TConnection { // Frame size int32_t frameSize_; + // Task handle + int taskHandle_; + + // Task event + struct event taskEvent_; + // Transport to read from boost::shared_ptr inputTransport_; @@ -248,9 +285,19 @@ class TConnection { // Handler wrapper static void eventHandler(int fd, short which, void* v) { - assert(fd = ((TConnection*)v)->socket_); + assert(fd == ((TConnection*)v)->socket_); ((TConnection*)v)->workSocket(); } + + // Handler wrapper for task block + static void taskHandler(int fd, short which, void* v) { + assert(fd == ((TConnection*)v)->taskHandle_); + if (-1 == ::close(((TConnection*)v)->taskHandle_)) { + GlobalOutput("TConnection::taskHandler close handle failed, resource leak"); + } + ((TConnection*)v)->transition(); + } + }; }}} // facebook::thrift::server diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp index 0d25bf02..2f6ce5d2 100644 --- a/lib/cpp/src/transport/TServerSocket.cpp +++ b/lib/cpp/src/transport/TServerSocket.cpp @@ -68,8 +68,8 @@ void TServerSocket::listen() { intSock1_ = -1; intSock2_ = -1; } else { - intSock1_ = sv[0]; - intSock2_ = sv[1]; + intSock1_ = sv[1]; + intSock2_ = sv[0]; } serverSocket_ = socket(AF_INET, SOCK_STREAM, 0); diff --git a/test/cpp/Makefile.stress b/test/cpp/Makefile.stress index a0fe4cfc..9d79541c 100644 --- a/test/cpp/Makefile.stress +++ b/test/cpp/Makefile.stress @@ -1,5 +1,5 @@ # Makefile for Thrift test project. -# +# # Author: # Marc Kwiatkowski # Aditya Agarwal @@ -18,8 +18,8 @@ endif #boost_home target: all include_paths = $(thrift_home)/lib/cpp/src \ - $(thrift_home)/lib/cpp \ - $(boost_home) + $(thrift_home)/lib/cpp \ + $(boost_home) include_flags = $(patsubst %,-I%, $(include_paths)) @@ -32,16 +32,22 @@ CC = g++ LD = g++ # Compiler flags -DCFL = -Wall -O3 -g -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -levent -CFL = -Wall -O3 -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -levent +DCFL = -Wall -O3 -g -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent +CFL = -Wall -O3 -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -all: stress-test +all: stress-test stress-test-nb -debug: stress-test-debug +debug: stress-test-debug stress-test-debug-nb stubs: ../StressTest.thrift $(THRIFT) --cpp --php ../StressTest.thrift +stress-test-debug-nb: stubs + g++ -o stress-test-nb $(DCFL) src/nb-main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp + +stress-test-nb: stubs + g++ -o stress-test-nb $(CFL) src/nb-main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp + stress-test-debug: stubs g++ -o stress-test $(DCFL) src/main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp @@ -49,4 +55,4 @@ stress-test: stubs g++ -o stress-test $(CFL) src/main.cpp ./gen-cpp/Service.cpp gen-cpp/StressTest_types.cpp clean: - rm -fr stress-test gen-cpp + rm -fr stress-test stress-test-nb gen-cpp diff --git a/test/cpp/src/nb-main.cpp b/test/cpp/src/nb-main.cpp new file mode 100644 index 00000000..7e69524d --- /dev/null +++ b/test/cpp/src/nb-main.cpp @@ -0,0 +1,484 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Service.h" + +#include + +#include +#include +#include +#include + +#include +#include +using __gnu_cxx::hash_map; +using __gnu_cxx::hash; + +using namespace std; +using namespace boost; + +using namespace facebook::thrift; +using namespace facebook::thrift::protocol; +using namespace facebook::thrift::transport; +using namespace facebook::thrift::server; +using namespace facebook::thrift::concurrency; + +using namespace test::stress; + +struct eqstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) == 0; + } +}; + +struct ltstr { + bool operator()(const char* s1, const char* s2) const { + return strcmp(s1, s2) < 0; + } +}; + + +// typedef hash_map, eqstr> count_map; +typedef map count_map; + +class Server : public ServiceIf { + public: + Server() {} + + void count(const char* method) { + MutexMonitor m(lock_); + int ct = counts_[method]; + counts_[method] = ++ct; + } + + void echoVoid() { + count("echoVoid"); + + //Sleep to simulate work + struct timeval time_struct; + time_struct.tv_sec = 0; + time_struct.tv_usec = 5000; + + select( (int) NULL, (fd_set *)NULL, (fd_set *)NULL,(fd_set *)NULL, &time_struct ); + + + + return; + } + + count_map getCount() { + MutexMonitor m(lock_); + return counts_; + } + + int8_t echoByte(const int8_t arg) {return arg;} + int32_t echoI32(const int32_t arg) {return arg;} + int64_t echoI64(const int64_t arg) {return arg;} + void echoString(string& out, const string &arg) { + if (arg != "hello") { + T_ERROR_ABORT("WRONG STRING!!!!"); + } + out = arg; + } + void echoList(vector &out, const vector &arg) { out = arg; } + void echoSet(set &out, const set &arg) { out = arg; } + void echoMap(map &out, const map &arg) { out = arg; } + +private: + count_map counts_; + Mutex lock_; + +}; + +class ClientThread: public Runnable { +public: + + ClientThread(shared_ptrtransport, shared_ptr client, Monitor& monitor, size_t& workerCount, size_t loopCount, TType loopType) : + _transport(transport), + _client(client), + _monitor(monitor), + _workerCount(workerCount), + _loopCount(loopCount), + _loopType(loopType) + {} + + void run() { + + // Wait for all worker threads to start + + {Synchronized s(_monitor); + while(_workerCount == 0) { + _monitor.wait(); + } + } + + _startTime = Util::currentTime(); + + _transport->open(); + + switch(_loopType) { + case T_VOID: loopEchoVoid(); break; + case T_BYTE: loopEchoByte(); break; + case T_I32: loopEchoI32(); break; + case T_I64: loopEchoI64(); break; + case T_STRING: loopEchoString(); break; + default: cerr << "Unexpected loop type" << _loopType << endl; break; + } + + _endTime = Util::currentTime(); + + _transport->close(); + + _done = true; + + {Synchronized s(_monitor); + + _workerCount--; + + if(_workerCount == 0) { + + _monitor.notify(); + } + } + } + + void loopEchoVoid() { + for(size_t ix = 0; ix < _loopCount; ix++) { + _client->echoVoid(); + } + } + + void loopEchoByte() { + for(size_t ix = 0; ix < _loopCount; ix++) { + int8_t arg = 1; + int8_t result; + result =_client->echoByte(arg); + assert(result == arg); + } + } + + void loopEchoI32() { + for(size_t ix = 0; ix < _loopCount; ix++) { + int32_t arg = 1; + int32_t result; + result =_client->echoI32(arg); + assert(result == arg); + } + } + + void loopEchoI64() { + for(size_t ix = 0; ix < _loopCount; ix++) { + int64_t arg = 1; + int64_t result; + result =_client->echoI64(arg); + assert(result == arg); + } + } + + void loopEchoString() { + for(size_t ix = 0; ix < _loopCount; ix++) { + string arg = "hello"; + string result; + _client->echoString(result, arg); + assert(result == arg); + } + } + + shared_ptr _transport; + shared_ptr _client; + Monitor& _monitor; + size_t& _workerCount; + size_t _loopCount; + TType _loopType; + long long _startTime; + long long _endTime; + bool _done; + Monitor _sleep; +}; + + +int main(int argc, char **argv) { + + int port = 9091; + string serverType = "simple"; + string protocolType = "binary"; + size_t workerCount = 4; + size_t clientCount = 20; + size_t loopCount = 50000; + TType loopType = T_VOID; + string callName = "echoVoid"; + bool runServer = true; + bool logRequests = false; + string requestLogPath = "./requestlog.tlog"; + bool replayRequests = false; + + ostringstream usage; + + usage << + argv[0] << " [--port=] [--server] [--server-type=] [--protocol-type=] [--workers=] [--clients=] [--loop=]" << endl << + "\tclients Number of client threads to create - 0 implies no clients, i.e. server only. Default is " << clientCount << endl << + "\thelp Prints this help text." << endl << + "\tcall Service method to call. Default is " << callName << endl << + "\tloop The number of remote thrift calls each client makes. Default is " << loopCount << endl << + "\tport The port the server and clients should bind to for thrift network connections. Default is " << port << endl << + "\tserver Run the Thrift server in this process. Default is " << runServer << endl << + "\tserver-type Type of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl << + "\tprotocol-type Type of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << + "\tlog-request Log all request to ./requestlog.tlog. Default is " << logRequests << endl << + "\treplay-request Replay requests from log file (./requestlog.tlog) Default is " << replayRequests << endl << + "\tworkers Number of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; + + + map args; + + for(int ix = 1; ix < argc; ix++) { + + string arg(argv[ix]); + + if(arg.compare(0,2, "--") == 0) { + + size_t end = arg.find_first_of("=", 2); + + string key = string(arg, 2, end - 2); + + if(end != string::npos) { + args[key] = string(arg, end + 1); + } else { + args[key] = "true"; + } + } else { + throw invalid_argument("Unexcepted command line token: "+arg); + } + } + + try { + + if(!args["clients"].empty()) { + clientCount = atoi(args["clients"].c_str()); + } + + if(!args["help"].empty()) { + cerr << usage.str(); + return 0; + } + + if(!args["loop"].empty()) { + loopCount = atoi(args["loop"].c_str()); + } + + if(!args["call"].empty()) { + callName = args["call"]; + } + + if(!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if(!args["server"].empty()) { + runServer = args["server"] == "true"; + } + + if(!args["log-request"].empty()) { + logRequests = args["log-request"] == "true"; + } + + if(!args["replay-request"].empty()) { + replayRequests = args["replay-request"] == "true"; + } + + if(!args["server-type"].empty()) { + serverType = args["server-type"]; + } + + if(!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + + } catch(exception& e) { + cerr << e.what() << endl; + cerr << usage; + } + + shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + + // Dispatcher + shared_ptr serviceHandler(new Server()); + + if (replayRequests) { + shared_ptr serviceHandler(new Server()); + shared_ptr serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Transports + shared_ptr fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + fileTransport->seekToEnd(); + + // Protocol Factory + shared_ptr protocolFactory(new TBinaryProtocolFactory()); + + TFileProcessor fileProcessor(serviceProcessor, + protocolFactory, + fileTransport); + + fileProcessor.process(0, true); + exit(0); + } + + + if(runServer) { + + shared_ptr serviceProcessor(new ServiceProcessor(serviceHandler)); + + // Protocol Factory + shared_ptr protocolFactory(new TBinaryProtocolFactory()); + + // Transport Factory + shared_ptr transportFactory; + + if (logRequests) { + // initialize the log file + shared_ptr fileTransport(new TFileTransport(requestLogPath)); + fileTransport->setChunkSize(2 * 1024 * 1024); + fileTransport->setMaxEventSize(1024 * 16); + + transportFactory = + shared_ptr(new TPipedTransportFactory(fileTransport)); + } + + shared_ptr serverThread; + + if(serverType == "simple") { + + serverThread = threadFactory->newThread(shared_ptr(new TNonblockingServer(serviceProcessor, protocolFactory,port))); + + } else if(serverType == "thread-pool") { + + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); + + threadManager->threadFactory(threadFactory); + threadManager->start(); + serverThread = threadFactory->newThread(shared_ptr(new TNonblockingServer(serviceProcessor, protocolFactory, port, threadManager))); + } + + cerr << "Starting the server on port " << port << endl; + serverThread->start(); + + // If we aren't running clients, just wait forever for external clients + + if (clientCount == 0) { + serverThread->join(); + } + } + + if (clientCount > 0) { + + Monitor monitor; + + size_t threadCount = 0; + + set > clientThreads; + + if(callName == "echoVoid") { loopType = T_VOID;} + else if(callName == "echoByte") { loopType = T_BYTE;} + else if(callName == "echoI32") { loopType = T_I32;} + else if(callName == "echoI64") { loopType = T_I64;} + else if(callName == "echoString") { loopType = T_STRING;} + else {throw invalid_argument("Unknown service call "+callName);} + + for(size_t ix = 0; ix < clientCount; ix++) { + + shared_ptr socket(new TSocket("127.0.0.1", port)); + shared_ptr framedSocket(new TFramedTransport(socket)); + shared_ptr protocol(new TBinaryProtocol(framedSocket)); + shared_ptr serviceClient(new ServiceClient(protocol)); + + clientThreads.insert(threadFactory->newThread(shared_ptr(new ClientThread(socket, serviceClient, monitor, threadCount, loopCount, loopType)))); + } + + for(std::set >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) { + (*thread)->start(); + } + + long long time00; + long long time01; + + {Synchronized s(monitor); + threadCount = clientCount; + + cerr << "Launch "<< clientCount << " client threads" << endl; + + time00 = Util::currentTime(); + + monitor.notifyAll(); + + while(threadCount > 0) { + monitor.wait(); + } + + time01 = Util::currentTime(); + } + + long long firstTime = 9223372036854775807LL; + long long lastTime = 0; + + double averageTime = 0; + long long minTime = 9223372036854775807LL; + long long maxTime = 0; + + for(set >::iterator ix = clientThreads.begin(); ix != clientThreads.end(); ix++) { + + shared_ptr client = dynamic_pointer_cast((*ix)->runnable()); + + long long delta = client->_endTime - client->_startTime; + + assert(delta > 0); + + if(client->_startTime < firstTime) { + firstTime = client->_startTime; + } + + if(client->_endTime > lastTime) { + lastTime = client->_endTime; + } + + if(delta < minTime) { + minTime = delta; + } + + if(delta > maxTime) { + maxTime = delta; + } + + averageTime+= delta; + } + + averageTime /= clientCount; + + + cout << "workers :" << workerCount << ", client : " << clientCount << ", loops : " << loopCount << ", rate : " << (clientCount * loopCount * 1000) / ((double)(time01 - time00)) << endl; + + count_map count = serviceHandler->getCount(); + count_map::iterator iter; + for (iter = count.begin(); iter != count.end(); ++iter) { + printf("%s => %d\n", iter->first, iter->second); + } + cerr << "done." << endl; + } + + return 0; +} -- 2.17.1