From: Roger Meier Date: Tue, 7 Jun 2011 17:59:07 +0000 (+0000) Subject: THRIFT-1198 C++ TestClient and Server Improvements (add Unix Domain Socket, HTTP... X-Git-Tag: 0.7.0~77 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=ca142b0bc812394d91d3a07e4e8b6d0b306d5d4b;p=common%2Fthrift.git THRIFT-1198 C++ TestClient and Server Improvements (add Unix Domain Socket, HTTP, JSON) git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1133116 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/test/cpp/Thrift-test.mk b/test/cpp/Thrift-test.mk index 6987e33f..e0a624cb 100644 --- a/test/cpp/Thrift-test.mk +++ b/test/cpp/Thrift-test.mk @@ -46,8 +46,8 @@ CC = g++ LD = g++ # Compiler flags -DCFL = -Wall -O3 -g -I. -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -LFL = -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent +DCFL = -Wall -O3 -g -I. -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -lboost_program_options +LFL = -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -lboost_program_options CCFL = -Wall -O3 -I. -I./gen-cpp $(include_flags) CFL = $(CCFL) $(LFL) diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp index 417a7a14..23d7dcd8 100644 --- a/test/cpp/src/TestClient.cpp +++ b/test/cpp/src/TestClient.cpp @@ -17,15 +17,19 @@ * under the License. */ -#include +#include #include #include #include +#include +#include #include #include #include #include +#include + #include "ThriftTest.h" #define __STDC_FORMAT_MACROS @@ -56,30 +60,66 @@ int main(int argc, char** argv) { string host = "localhost"; int port = 9090; int numTests = 1; - bool framed = false; bool ssl = false; + string transport_type = "buffered"; + string protocol_type = "binary"; + string domain_socket = ""; + + program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("host", program_options::value(&host)->default_value(host), "Host to connect") + ("port", program_options::value(&port)->default_value(port), "Port number to connect") + ("domain-socket", program_options::value(&domain_socket)->default_value(domain_socket), "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port") + ("transport", program_options::value(&transport_type)->default_value(transport_type), "Transport: buffered, framed, http") + ("protocol", program_options::value(&protocol_type)->default_value(protocol_type), "Protocol: binary, json") + ("ssl", "Encrypted Transport using SSL") + ("testloops,n", program_options::value(&numTests)->default_value(numTests), "Number of Tests") + ; + + program_options::variables_map vm; + program_options::store(program_options::parse_command_line(argc, argv, desc), vm); + program_options::notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } - for (int i = 0; i < argc; ++i) { - if (strcmp(argv[i], "-h") == 0) { - char* pch = strtok(argv[++i], ":"); - if (pch != NULL) { - host = string(pch); + try { + if (!protocol_type.empty()) { + if (protocol_type == "binary") { + } else if (protocol_type == "json") { + } else { + throw invalid_argument("Unknown protocol type "+protocol_type); } - pch = strtok(NULL, ":"); - if (pch != NULL) { - port = atoi(pch); + } + + if (!transport_type.empty()) { + if (transport_type == "buffered") { + } else if (transport_type == "framed") { + } else if (transport_type == "http") { + } else { + throw invalid_argument("Unknown transport type "+transport_type); } - } else if (strcmp(argv[i], "-n") == 0) { - numTests = atoi(argv[++i]); - } else if (strcmp(argv[i], "-f") == 0) { - framed = true; - } else if (strcmp(argv[i], "--ssl") == 0) { - ssl = true; } + + } catch (std::exception& e) { + cerr << e.what() << endl; + cout << desc << "\n"; + return 1; } + if (vm.count("ssl")) { + ssl = true; + } + + shared_ptr transport; + shared_ptr protocol; + shared_ptr socket; shared_ptr factory; + if (ssl) { factory = shared_ptr(new TSSLSocketFactory()); factory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); @@ -87,22 +127,42 @@ int main(int argc, char** argv) { factory->authenticate(true); socket = factory->createSocket(host, port); } else { - socket = shared_ptr(new TSocket(host, port)); + if (domain_socket != "") { + socket = shared_ptr(new TSocket(domain_socket)); + port = 0; + } + else { + socket = shared_ptr(new TSocket(host, port)); + } } - shared_ptr transport; - - if (framed) { + if (transport_type.compare("http") == 0) { + shared_ptr httpSocket(new THttpClient(socket, host, "/service")); + transport = httpSocket; + } else if (transport_type.compare("framed") == 0){ shared_ptr framedSocket(new TFramedTransport(socket)); transport = framedSocket; - } else { + } else{ shared_ptr bufferedSocket(new TBufferedTransport(socket)); transport = bufferedSocket; } - shared_ptr< TBinaryProtocolT > protocol( - new TBinaryProtocolT(transport)); - ThriftTestClientT< TBinaryProtocolT > testClient(protocol); + if (protocol_type.compare("json") == 0) { + shared_ptr jsonProtocol(new TJSONProtocol(transport)); + protocol = jsonProtocol; + } else{ + shared_ptr binaryProtocol(new TBinaryProtocol(transport)); + protocol = binaryProtocol; + } + + // Connection info + cout << "Connecting (" << transport_type << "/" << protocol_type << ") to: " << domain_socket; + if (port != 0) { + cout << host << ":" << port; + } + cout << endl; + + ThriftTestClient testClient(protocol); uint64_t time_min = 0; uint64_t time_max = 0; diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 84016470..bb3cd434 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,8 @@ #include #include #include +#include +#include #include #include "ThriftTest.h" @@ -34,6 +37,8 @@ #include #include +#include + #define __STDC_FORMAT_MACROS #include #include @@ -324,97 +329,100 @@ class TestProcessorEventHandler : public TProcessorEventHandler { int main(int argc, char **argv) { - int port = 9090; - string serverType = "simple"; - string protocolType = "binary"; - size_t workerCount = 4; bool ssl = false; - - ostringstream usage; - - usage << - argv[0] << " [--port=] [--server-type=] [--protocol-type=] [--workers=] [--processor-events]" << endl << - - "\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\". Default is " << serverType << endl << - - "\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << - - "\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; - - map args; - - for (int ix = 1; ix < argc; ix++) { - string arg(argv[ix]); - if (arg.compare(0,2, "--") == 0) { - size_t end = arg.find_first_of("=", 2); - if (end != string::npos) { - args[string(arg, 2, end - 2)] = string(arg, end + 1); + string transport_type = "buffered"; + string protocol_type = "binary"; + string server_type = "simple"; + string domain_socket = ""; + size_t workers = 4; + + + program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port", program_options::value(&port)->default_value(port), "Port number to listen") + ("domain-socket", program_options::value(&domain_socket)->default_value(domain_socket), + "Unix Domain Socket (e.g. /tmp/ThriftTest.thrift)") + ("server-type", program_options::value(&server_type)->default_value(server_type), + "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"") + ("transport", program_options::value(&transport_type)->default_value(transport_type), + "transport: buffered, framed, http") + ("protocol", program_options::value(&protocol_type)->default_value(protocol_type), + "protocol: binary, json") + ("ssl", "Encrypted Transport using SSL") + ("processor-events", "processor-events") + ("workers,n", program_options::value(&workers)->default_value(workers), + "Number of thread pools workers. Only valid for thread-pool server type") + ; + + program_options::variables_map vm; + program_options::store(program_options::parse_command_line(argc, argv, desc), vm); + program_options::notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } + + try { + if (!server_type.empty()) { + if (server_type == "simple") { + } else if (server_type == "thread-pool") { + } else if (server_type == "threaded") { + } else if (server_type == "nonblocking") { } else { - args[string(arg, 2)] = "true"; + throw invalid_argument("Unknown server type "+server_type); } - } else { - throw invalid_argument("Unexcepted command line token: "+arg); - } - } - - try { - - if (!args["port"].empty()) { - port = atoi(args["port"].c_str()); } - - if (!args["server-type"].empty()) { - serverType = args["server-type"]; - if (serverType == "simple") { - } else if (serverType == "thread-pool") { - } else if (serverType == "threaded") { - } else if (serverType == "nonblocking") { + + if (!protocol_type.empty()) { + if (protocol_type == "binary") { + } else if (protocol_type == "json") { } else { - throw invalid_argument("Unknown server type "+serverType); + throw invalid_argument("Unknown protocol type "+protocol_type); } } - if (!args["protocol-type"].empty()) { - protocolType = args["protocol-type"]; - if (protocolType == "binary") { - } else if (protocolType == "ascii") { - throw invalid_argument("ASCII protocol not supported"); - } else if (protocolType == "xml") { - throw invalid_argument("XML protocol not supported"); + if (!transport_type.empty()) { + if (transport_type == "buffered") { + } else if (transport_type == "framed") { + } else if (transport_type == "http") { } else { - throw invalid_argument("Unknown protocol type "+protocolType); + throw invalid_argument("Unknown transport type "+transport_type); } } - if (!args["workers"].empty()) { - workerCount = atoi(args["workers"].c_str()); - } } catch (std::exception& e) { cerr << e.what() << endl; - cerr << usage; + cout << desc << "\n"; + return 1; } - if (args["ssl"] == "true") { + if (vm.count("ssl")) { ssl = true; signal(SIGPIPE, SIG_IGN); } // Dispatcher - shared_ptr protocolFactory( - new TBinaryProtocolFactoryT()); + shared_ptr protocolFactory; + if (protocol_type == "json") { + shared_ptr jsonProtocolFactory(new TJSONProtocolFactory()); + protocolFactory = jsonProtocolFactory; + } else { + shared_ptr binaryProtocolFactory(new TBinaryProtocolFactoryT()); + protocolFactory = binaryProtocolFactory; + } + // Processor shared_ptr testHandler(new TestHandler()); - - shared_ptr testProcessor( - new ThriftTestProcessorT< TBinaryProtocolT >(testHandler)); - - - if (!args["processor-events"].empty()) { + shared_ptr testProcessor(new ThriftTestProcessor(testHandler)); + + if (vm.count("processor-events")) { testProcessor->setEventHandler(shared_ptr( new TestProcessorEventHandler())); } - + // Transport shared_ptr sslSocketFactory; shared_ptr serverSocket; @@ -426,26 +434,51 @@ int main(int argc, char **argv) { sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); serverSocket = shared_ptr(new TSSLServerSocket(port, sslSocketFactory)); } else { - serverSocket = shared_ptr(new TServerSocket(port)); + if (domain_socket != "") { + unlink(domain_socket.c_str()); + serverSocket = shared_ptr(new TServerSocket(domain_socket)); + port = 0; + } + else { + serverSocket = shared_ptr(new TServerSocket(port)); + } } + // Factory - shared_ptr transportFactory(new TBufferedTransportFactory()); + shared_ptr transportFactory; + + if (transport_type == "http") { + shared_ptr httpTransportFactory(new THttpServerTransportFactory()); + transportFactory = httpTransportFactory; + } else if (transport_type == "framed") { + shared_ptr framedTransportFactory(new TFramedTransportFactory()); + transportFactory = framedTransportFactory; + } else { + shared_ptr bufferedTransportFactory(new TBufferedTransportFactory()); + transportFactory = bufferedTransportFactory; + } - if (serverType == "simple") { + // Server Info + cout << "Starting \"" << server_type << "\" server (" + << transport_type << "/" << protocol_type << ") listen on: " << domain_socket; + if (port != 0) { + cout << port; + } + cout << endl; - // Server + // Server + if (server_type == "simple") { TSimpleServer simpleServer(testProcessor, - serverSocket, + serverSocket, transportFactory, protocolFactory); - printf("Starting the server on port %d...\n", port); simpleServer.serve(); - } else if (serverType == "thread-pool") { + } else if (server_type == "thread-pool") { shared_ptr threadManager = - ThreadManager::newSimpleThreadManager(workerCount); + ThreadManager::newSimpleThreadManager(workers); shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); @@ -455,30 +488,27 @@ int main(int argc, char **argv) { threadManager->start(); TThreadPoolServer threadPoolServer(testProcessor, - serverSocket, + serverSocket, transportFactory, protocolFactory, - threadManager); + threadManager); - printf("Starting the server on port %d...\n", port); threadPoolServer.serve(); - } else if (serverType == "threaded") { + } else if (server_type == "threaded") { TThreadedServer threadedServer(testProcessor, serverSocket, transportFactory, protocolFactory); - printf("Starting the server on port %d...\n", port); threadedServer.serve(); - } else if (serverType == "nonblocking") { + } else if (server_type == "nonblocking") { TNonblockingServer nonblockingServer(testProcessor, port); - printf("Starting the nonblocking server on port %d...\n", port); nonblockingServer.serve(); } - printf("done.\n"); + cout << "done." << endl; return 0; }