From 3ea003377001947319cf04c0eb6351a6e5facc2e Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Thu, 17 Aug 2006 01:11:13 +0000 Subject: [PATCH] Added stress test for thrift benchmarks Modified TServer - made it a subclass of concurrency::Runnable to allow servers to be handed directly to concurreny::ThreadFactory when creating server threads. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664759 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TServer.h | 3 +- lib/cpp/src/test/Makefile | 50 ++++++ lib/cpp/src/test/StressTest.thrift | 11 ++ lib/cpp/src/test/main.cc | 254 +++++++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 lib/cpp/src/test/Makefile create mode 100644 lib/cpp/src/test/StressTest.thrift create mode 100644 lib/cpp/src/test/main.cc diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 68728f2d..d53223fa 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -2,6 +2,7 @@ #define T_SERVER_H #include +#include #include @@ -17,7 +18,7 @@ class TServerOptions; * * @author Mark Slee */ -class TServer { +class TServer : public concurrency::Runnable { public: virtual ~TServer() {} virtual void run() = 0; diff --git a/lib/cpp/src/test/Makefile b/lib/cpp/src/test/Makefile new file mode 100644 index 00000000..73206cd7 --- /dev/null +++ b/lib/cpp/src/test/Makefile @@ -0,0 +1,50 @@ +# Makefile for Thrift test project. +# +# Author: +# Marc Kwiatkowski + + +ifndef thrift_home +thrift_home=../../../../build +endif #thrift_home + +target: all + +ifndef thirdparty +thirdparty = ../../../../../../../thirdparty +endif #thirdparty + +ifndef boost_home +boost_home = /usr/local/include/boost-1_33_1 +endif #thrift_home +target: all + +include_paths = $(thrift_home)/include/thrift \ + $(boost_home) + +include_flags = $(patsubst %,-I%, $(include_paths)) + +# Tools +THRIFT = thrift +CC = g++ +LD = g++ + +# Compiler flags +DCFL = -Wall -O3 -g -I cpp-gen $(include_flags) -L$(thrift_home)/lib -lthrift +CFL = -Wall -O3 -I cpp-gen $(include_flags) -L$(thrift_home)/lib -lthrift + +all: stress-test + +debug: stress-test-debug + +stubs: StressTest.thrift + $(THRIFT) --cpp StressTest.thrift + +stress-test-debug: stubs + g++ -o stress-test $(DCFL) main.cc cpp-gen/StressTest.cc + +stress-test: stubs + g++ -o stress-test $(CFL) main.cc cpp-gen/StressTest.cc + +clean: + rm -fr stress-test cpp-gen diff --git a/lib/cpp/src/test/StressTest.thrift b/lib/cpp/src/test/StressTest.thrift new file mode 100644 index 00000000..89e2f9b2 --- /dev/null +++ b/lib/cpp/src/test/StressTest.thrift @@ -0,0 +1,11 @@ +namespace test.stress + +service Service { + + void echoVoid(), + byte echoByte(byte arg), + u16 echoU16(u16 arg), + u32 echoU32(u32 arg), + u64 echoU64(u64 arg), +} + diff --git a/lib/cpp/src/test/main.cc b/lib/cpp/src/test/main.cc new file mode 100644 index 00000000..8cee758d --- /dev/null +++ b/lib/cpp/src/test/main.cc @@ -0,0 +1,254 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "StressTest.h" + +#include +#include +#include +#include + +using namespace std; + +using namespace facebook::thrift; +using namespace facebook::thrift::protocol; +using namespace facebook::thrift::transport; +using namespace facebook::thrift::server; + +using namespace test::stress; + +class Server : public ServiceServerIf { + public: + Server(shared_ptr protocol) : + ServiceServerIf(protocol) {} + + void echoVoid() {return;} + uint8_t echoByte(uint8_t arg) {return arg;} + uint16_t echoU16(uint16_t arg) {return arg;} + uint32_t echoU32(uint32_t arg) {return arg;} + uint64_t echoU64(uint64_t arg) {return arg;} +}; + +class ClientThread: public Runnable { +public: + + ClientThread(shared_ptrtransport, shared_ptr client, Monitor& monitor, size_t& workerCount, size_t loopCount) : + _transport(transport), + _client(client), + _monitor(monitor), + _workerCount(workerCount), + _loopCount(loopCount) + {} + + void run() { + + // Wait for all worker threads to start + + {Synchronized s(_monitor); + while(_workerCount == 0) { + _monitor.wait(); + } + } + + _startTime = Util::currentTime(); + + _transport->open(); + + //uint64_t arg = 0; + //uint64_t result = 0; + + for(size_t ix = 0; ix < _loopCount; ix++) { + // result = _client->echoU64(arg); + // assert(result == arg); + _client->echoVoid(); + //arg++; + } + + _endTime = Util::currentTime(); + + _transport->close(); + + _done = true; + + {Synchronized s(_monitor); + + _workerCount--; + + if(_workerCount == 0) { + + _monitor.notify(); + } + } + } + +private: + shared_ptr _transport; + shared_ptr _client; + Monitor& _monitor; + size_t& _workerCount; + size_t _loopCount; + long long _startTime; + long long _endTime; + bool _done; + Monitor _sleep; +}; + + +int main(int argc, char **argv) { + + int port = 9090; + string serverType = "thread-pool"; + string protocolType = "binary"; + size_t workerCount = 4; + size_t clientCount = 10; + size_t loopCount = 10000; + + ostringstream usage; + + usage << + argv[0] << " [--port=] [--server-type=] [--protocol-type=] [--workers=]" << endl << + + "\t\tserver-type\t\ttype of server, \"simple\" or \"thread-pool\". Default is " << serverType << endl << + + "\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << + + "\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; + + map args; + + for(int ix = 1; ix < argc; ix++) { + + string arg(argv[ix]); + + if(arg.compare(0,2, "--") == 0) { + + size_t end = arg.find_first_of("=", 2); + + if(end != string::npos) { + args[string(arg, 2, end - 2)] = string(arg, end + 1); + } else { + args[string(arg, 2, end - 2)] = "true"; + } + ix++; + } else { + throw invalid_argument("Unexcepted command line token: "+arg); + } + } + + try { + + if(!args["port"].empty()) { + port = atoi(args["port"].c_str()); + } + + if(!args["server-type"].empty()) { + serverType = args["server-type"]; + + if(serverType == "simple") { + + } else if(serverType == "thread-pool") { + + } else { + + throw invalid_argument("Unknown server type "+serverType); + } + } + + if(!args["workers"].empty()) { + workerCount = atoi(args["workers"].c_str()); + } + + if(!args["clients"].empty()) { + clientCount = atoi(args["clients"].c_str()); + } + + if(!args["loop"].empty()) { + loopCount = atoi(args["loop"].c_str()); + } + } catch(exception& e) { + cerr << e.what() << endl; + cerr << usage; + } + + // Dispatcher + shared_ptr binaryProtocol(new TBinaryProtocol); + + shared_ptr server(new Server(binaryProtocol)); + + // Options + shared_ptr serverOptions(new TServerOptions()); + + // Transport + shared_ptr serverSocket(new TServerSocket(port)); + + // ThreadFactory + + shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + + shared_ptr serverThread; + + if(serverType == "simple") { + + serverThread = threadFactory->newThread(shared_ptr(new TSimpleServer(server, serverOptions, serverSocket))); + + } else if(serverType == "thread-pool") { + + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + serverThread = threadFactory->newThread(shared_ptr(new TThreadPoolServer(server, + serverOptions, + serverSocket, + threadManager))); + } + + cout << "Starting the server on port " << port << endl; + + serverThread->start(); + + Monitor monitor; + + size_t threadCount = 0; + + set > clientThreads; + + for(size_t ix = 0; ix < clientCount; ix++) { + + shared_ptr socket(new TSocket("127.0.01", port)); + shared_ptr bufferedSocket(new TBufferedTransport(socket, 2048)); + shared_ptr binaryProtocol(new TBinaryProtocol()); + shared_ptr serviceClient(new ServiceClient(bufferedSocket, binaryProtocol)); + + clientThreads.insert(threadFactory->newThread(shared_ptr(new ClientThread(bufferedSocket, serviceClient, monitor, threadCount, loopCount)))); + } + + for(std::set >::const_iterator thread = clientThreads.begin(); thread != clientThreads.end(); thread++) { + (*thread)->start(); + } + + cout << endl; + + {Synchronized s(monitor); + threadCount = clientCount; + + cout << "Launch "<< clientCount << " client threads" << endl; + monitor.notifyAll(); + + while(threadCount > 0) { + monitor.wait(); + } + } + + printf("done.\n"); + return 0; +} -- 2.17.1