From d788b2e046355f7b08f821e68c6b5147b496599a Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Thu, 7 Sep 2006 01:26:35 +0000 Subject: [PATCH] Thrift TTransportFactory model for servers Summary: Servers need to create bufferedtransports etc. around the transports they get in a user-definable way. So use a factory pattern to allow the user to supply an object to the server that defines this behavior. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664792 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 8 ++- lib/cpp/src/TProcessor.h | 3 +- lib/cpp/src/server/TServer.h | 29 ++++++--- lib/cpp/src/server/TSimpleServer.cc | 29 ++++----- lib/cpp/src/server/TSimpleServer.h | 15 +++-- lib/cpp/src/server/TThreadPoolServer.cc | 60 +++++++++---------- lib/cpp/src/server/TThreadPoolServer.h | 12 ++-- .../src/transport/TBufferedTransportFactory.h | 33 ++++++++++ lib/cpp/src/transport/TTransport.h | 2 +- lib/cpp/src/transport/TTransportFactory.h | 33 ++++++++++ lib/java/src/server/TServer.java | 59 +++++++++++++++--- lib/java/src/server/TSimpleServer.java | 26 ++++---- lib/java/src/server/TThreadPoolServer.java | 34 +++++++---- .../src/transport/TBaseTransportFactory.java | 23 +++++++ lib/java/src/transport/TTransportFactory.java | 21 +++++++ lib/py/src/server/TServer.py | 25 +++++--- lib/py/src/transport/TSocket.py | 5 +- lib/py/src/transport/TTransport.py | 16 +++++ test/cpp/src/TestClient.cc | 34 +++++++++-- test/cpp/src/TestServer.cc | 41 ++++++------- test/java/src/TestClient.java | 56 +++++++++-------- test/py/TestServer.py | 4 +- 22 files changed, 392 insertions(+), 176 deletions(-) create mode 100644 lib/cpp/src/transport/TBufferedTransportFactory.h create mode 100644 lib/cpp/src/transport/TTransportFactory.h create mode 100644 lib/java/src/transport/TBaseTransportFactory.java create mode 100644 lib/java/src/transport/TTransportFactory.java diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index b06e06fe..554cb90f 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -1,7 +1,7 @@ lib_LTLIBRARIES = libthrift.la -common_cxxflags = -Isrc $(BOOST_CPPFLAGS) -common_ldflags = $(BOOST_LDFLAGS) +common_cxxflags = -Wall -Isrc $(BOOST_CPPFLAGS) +common_ldflags = -Wall $(BOOST_LDFLAGS) # Define the source file for the module @@ -54,7 +54,9 @@ include_transport_HEADERS = \ src/transport/TServerTransport.h \ src/transport/TSocket.h \ src/transport/TTransport.h \ - src/transport/TTransportException.h + src/transport/TTransportException.h \ + src/transport/TTransportFactory.h \ + src/transport/TBufferedTransportFactory.h include_serverdir = $(include_thriftdir)/server include_server_HEADERS = \ diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index 4cbcd650..f905b1d7 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -23,7 +23,8 @@ class TProcessor { public: virtual ~TProcessor() {} virtual bool process(shared_ptr in, shared_ptr out) = 0; - virtual bool process(shared_ptr io) { return process(io, io); } + bool process(shared_ptr io) { return process(io, io); } + protected: TProcessor() {} }; diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index d53223fa..c19302fe 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -1,7 +1,9 @@ -#ifndef T_SERVER_H -#define T_SERVER_H +#ifndef _THRIFT_SERVER_TSERVER_H_ +#define _THRIFT_SERVER_TSERVER_H_ 1 #include +#include +#include #include #include @@ -9,6 +11,7 @@ namespace facebook { namespace thrift { namespace server { using namespace facebook::thrift; +using namespace facebook::thrift::transport; using namespace boost; class TServerOptions; @@ -24,10 +27,22 @@ public: virtual void run() = 0; protected: - TServer(shared_ptr processor, shared_ptr options) : + TServer(shared_ptr processor, + shared_ptr serverTransport, + shared_ptr transportFactory, + shared_ptr options) : + processor_(processor), + serverTransport_(serverTransport), + transportFactory_(transportFactory), + options_(options) {} + + TServer(shared_ptr processor, + shared_ptr options) : processor_(processor), options_(options) {} - + shared_ptr processor_; + shared_ptr serverTransport_; + shared_ptr transportFactory_; shared_ptr options_; }; @@ -35,12 +50,12 @@ protected: * Class to encapsulate all generic server options. */ class TServerOptions { -public: + public: // TODO(mcslee): Fill in getters/setters here -protected: + protected: // TODO(mcslee): Fill data members in here }; }}} // facebook::thrift::server -#endif +#endif // #ifndef _THRIFT_SERVER_TSERVER_H_ diff --git a/lib/cpp/src/server/TSimpleServer.cc b/lib/cpp/src/server/TSimpleServer.cc index 2ad5145e..041a52fe 100644 --- a/lib/cpp/src/server/TSimpleServer.cc +++ b/lib/cpp/src/server/TSimpleServer.cc @@ -1,5 +1,4 @@ #include "server/TSimpleServer.h" -#include "transport/TBufferedTransport.h" #include "transport/TTransportException.h" #include #include @@ -15,6 +14,7 @@ namespace facebook { namespace thrift { namespace server { void TSimpleServer::run() { shared_ptr client; + pair,shared_ptr > io; try { // Start the server listening @@ -25,26 +25,21 @@ void TSimpleServer::run() { } // Fetch client from server - while (true) { - try { + try { + while (true) { client = serverTransport_->accept(); - if (client != NULL) { - // Process for as long as we can keep the processor happy! - shared_ptr bufferedClient(new TBufferedTransport(client)); - while (processor_->process(bufferedClient)) {} - } - } catch (TTransportException& ttx) { - if (client != NULL) { + io = transportFactory_->getIOTransports(client); + try { + while (processor_->process(io.first, io.second)) {} + } catch (TTransportException& ttx) { cerr << "TSimpleServer client died: " << ttx.getMessage() << endl; } - } - - // Clean up the client - if (client != NULL) { - - // Ensure no resource leaks + io.first->close(); + io.second->close(); client->close(); - } + } + } catch (TTransportException& ttx) { + cerr << "TServerTransport died on accept: " << ttx.getMessage() << endl; } // TODO(mcslee): Could this be a timeout case? Or always the real thing? diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h index a8242d4a..973ba30b 100644 --- a/lib/cpp/src/server/TSimpleServer.h +++ b/lib/cpp/src/server/TSimpleServer.h @@ -1,5 +1,5 @@ -#ifndef T_SIMPLE_SERVER_H -#define T_SIMPLE_SERVER_H +#ifndef _THRIFT_SERVER_TSIMPLESERVER_H_ +#define _THRIFT_SERVER_TSIMPLESERVER_H_ 1 #include "server/TServer.h" #include "transport/TServerTransport.h" @@ -17,18 +17,17 @@ namespace facebook { namespace thrift { namespace server { class TSimpleServer : public TServer { public: TSimpleServer(shared_ptr processor, - shared_ptr options, - shared_ptr serverTransport) : - TServer(processor, options), serverTransport_(serverTransport) {} + shared_ptr serverTransport, + shared_ptr transportFactory, + shared_ptr options) : + TServer(processor, serverTransport, transportFactory, options) {} ~TSimpleServer() {} void run(); - protected: - shared_ptr serverTransport_; }; }}} // facebook::thrift::server -#endif +#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_ diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc index d53d174c..1eab53d7 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cc +++ b/lib/cpp/src/server/TThreadPoolServer.cc @@ -1,5 +1,4 @@ #include "server/TThreadPoolServer.h" -#include "transport/TBufferedTransport.h" #include "transport/TTransportException.h" #include "concurrency/Thread.h" #include "concurrency/ThreadManager.h" @@ -15,54 +14,52 @@ using namespace facebook::thrift::transport; class TThreadPoolServer::Task: public Runnable { shared_ptr _processor; - shared_ptr _transport; - shared_ptr _bufferedTransport; + shared_ptr _input; + shared_ptr _output; public: Task(shared_ptr processor, - shared_ptr transport) : + shared_ptr input, + shared_ptr output) : _processor(processor), - _transport(transport), - _bufferedTransport(new TBufferedTransport(transport)) { + _input(input), + _output(output) { } ~Task() {} - void run() { - + void run() { while(true) { - try { - _processor->process(_bufferedTransport); - + _processor->process(_input, _output); } catch (TTransportException& ttx) { - - break; - + break; } catch(...) { - - break; + break; } } - - _bufferedTransport->close(); + _input->close(); + _output->close(); } }; TThreadPoolServer::TThreadPoolServer(shared_ptr processor, - shared_ptr options, - shared_ptr serverTransport, - shared_ptr threadManager) : - TServer(processor, options), - serverTransport_(serverTransport), + shared_ptr serverTransport, + shared_ptr transportFactory, + shared_ptr threadManager, + shared_ptr options) : + TServer(processor, serverTransport, transportFactory, options), threadManager_(threadManager) { } - + TThreadPoolServer::~TThreadPoolServer() {} void TThreadPoolServer::run() { + shared_ptr client; + pair,shared_ptr > io; + try { // Start the server listening serverTransport_->listen(); @@ -71,15 +68,14 @@ void TThreadPoolServer::run() { return; } - // Fetch client from server - - while (true) { - + while (true) { try { - - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, - shared_ptr(serverTransport_->accept())))); - + // Fetch client from server + client = serverTransport_->accept(); + // Make IO transports + io = transportFactory_->getIOTransports(client); + // Add to threadmanager pool + threadManager_->add(shared_ptr(new TThreadPoolServer::Task(processor_, io.first, io.second))); } catch (TTransportException& ttx) { break; } diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h index 827491db..34b216cc 100644 --- a/lib/cpp/src/server/TThreadPoolServer.h +++ b/lib/cpp/src/server/TThreadPoolServer.h @@ -1,5 +1,5 @@ -#ifndef T_THREADPOOL_SERVER_H -#define T_THREADPOOL_SERVER_H +#ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_ +#define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1 #include #include @@ -19,9 +19,10 @@ public: class Task; TThreadPoolServer(shared_ptr processor, - shared_ptr options, shared_ptr serverTransport, - shared_ptr threadManager); + shared_ptr transportFactory, + shared_ptr threadManager, + shared_ptr options); virtual ~TThreadPoolServer(); @@ -29,11 +30,10 @@ public: protected: - shared_ptr serverTransport_; shared_ptr threadManager_; }; }}} // facebook::thrift::server -#endif +#endif // #ifndef _THRIFT_SERVER_TTHREADPOOLSERVER_H_ diff --git a/lib/cpp/src/transport/TBufferedTransportFactory.h b/lib/cpp/src/transport/TBufferedTransportFactory.h new file mode 100644 index 00000000..c6e87b18 --- /dev/null +++ b/lib/cpp/src/transport/TBufferedTransportFactory.h @@ -0,0 +1,33 @@ +#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ +#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ 1 + +#include +#include +#include + +namespace facebook { namespace thrift { namespace transport { + +/** + * Wraps a transport into a buffered one. + * + * @author Mark Slee + */ +class TBufferedTransportFactory : public TTransportFactory { + public: + TBufferedTransportFactory() {} + + virtual ~TBufferedTransportFactory() {} + + /** + * Wraps the transport into a buffered one. + */ + virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { + boost::shared_ptr buffered(new TBufferedTransport(trans)); + return std::make_pair(buffered, buffered); + } + +}; + +}}} + +#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index 19a2cb6c..d65d25bf 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -1,7 +1,7 @@ #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_ #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1 -#include "TTransportException.h" +#include #include namespace facebook { namespace thrift { namespace transport { diff --git a/lib/cpp/src/transport/TTransportFactory.h b/lib/cpp/src/transport/TTransportFactory.h new file mode 100644 index 00000000..abd1048d --- /dev/null +++ b/lib/cpp/src/transport/TTransportFactory.h @@ -0,0 +1,33 @@ +#ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ +#define _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ 1 + +#include +#include + +namespace facebook { namespace thrift { namespace transport { + +/** + * Generic factory class to make an input and output transport out of a + * source transport. Commonly used inside servers to make input and output + * streams out of raw clients. + * + * @author Mark Slee + */ +class TTransportFactory { + public: + TTransportFactory() {} + + virtual ~TTransportFactory() {} + + /** + * Default implementation does nothing, just returns the transport given. + */ + virtual std::pair, boost::shared_ptr > getIOTransports(boost::shared_ptr trans) { + return std::make_pair(trans, trans); + } + +}; + +}}} + +#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java index 702ba694..dd83098e 100644 --- a/lib/java/src/server/TServer.java +++ b/lib/java/src/server/TServer.java @@ -1,6 +1,9 @@ package com.facebook.thrift.server; import com.facebook.thrift.TProcessor; +import com.facebook.thrift.transport.TServerTransport; +import com.facebook.thrift.transport.TTransportFactory; +import com.facebook.thrift.transport.TBaseTransportFactory; /** * Generic interface for a Thrift server. @@ -17,24 +20,64 @@ public abstract class TServer { public Options() {} } - /** Core processor */ + /** + * Core processor + */ protected TProcessor processor_; - /** Server options */ + /** + * Server options + */ protected Options options_; /** - * Default options constructor + * Server transport */ - protected TServer(TProcessor processor) { - this(processor, new Options()); - } + protected TServerTransport serverTransport_; + + /** + * Transport Factory + */ + protected TTransportFactory transportFactory_; /** - * Default constructor, all servers take a processor and some options. + * Default constructors. */ - protected TServer(TProcessor processor, Options options) { + + protected TServer(TProcessor processor, + TServerTransport serverTransport) { + this(processor, + serverTransport, + new TBaseTransportFactory(), + new Options()); + } + + protected TServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory) { + this(processor, + serverTransport, + transportFactory, + new Options()); + } + + + protected TServer(TProcessor processor, + TServerTransport serverTransport, + Options options) { + this(processor, + serverTransport, + new TBaseTransportFactory(), + options); + } + + protected TServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + Options options) { processor_ = processor; + serverTransport_ = serverTransport; + transportFactory_ = transportFactory; options_ = options; } diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java index 94b739e2..76a57628 100644 --- a/lib/java/src/server/TSimpleServer.java +++ b/lib/java/src/server/TSimpleServer.java @@ -13,19 +13,9 @@ import com.facebook.thrift.transport.TTransportException; */ public class TSimpleServer extends TServer { - private TServerTransport serverTransport_; - - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport) { - this(processor, new TServer.Options(), serverTransport); - } - - public TSimpleServer(TProcessor processor, - TServer.Options options, TServerTransport serverTransport) { - super(processor, options); - serverTransport_ = serverTransport; + super(processor, serverTransport); } public void run() { @@ -38,18 +28,24 @@ public class TSimpleServer extends TServer { while (true) { TTransport client = null; + TTransport[] io = null; try { client = serverTransport_.accept(); if (client != null) { - while (processor_.process(client, client)); + io = transportFactory_.getIOTransports(client); + while (processor_.process(io[0], io[1])); } } catch (TException tx) { tx.printStackTrace(); } - if (client != null) { - client.close(); - client = null; + if (io != null) { + if (io[0] != null) { + io[0].close(); + } + if (io[1] != null) { + io[1].close(); + } } } } diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java index d19275ee..2f5be8d6 100644 --- a/lib/java/src/server/TThreadPoolServer.java +++ b/lib/java/src/server/TThreadPoolServer.java @@ -5,6 +5,8 @@ import com.facebook.thrift.TProcessor; import com.facebook.thrift.transport.TServerTransport; import com.facebook.thrift.transport.TTransport; import com.facebook.thrift.transport.TTransportException; +import com.facebook.thrift.transport.TTransportFactory; +import com.facebook.thrift.transport.TBaseTransportFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -20,28 +22,28 @@ import java.util.concurrent.TimeUnit; */ public class TThreadPoolServer extends TServer { - // Server transport - private TServerTransport serverTransport_; - // Executor service for handling client connections private ExecutorService executorService_; // Customizable server options public static class Options extends TServer.Options { - public int port = 9190; public int minWorkerThreads = 5; public int maxWorkerThreads = Integer.MAX_VALUE; } public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport) { - this(processor, new Options(), serverTransport); + this(processor, + serverTransport, + new TBaseTransportFactory(), + new Options()); } - + public TThreadPoolServer(TProcessor processor, - Options options, - TServerTransport serverTransport) { - super(processor, options); + TServerTransport serverTransport, + TTransportFactory transportFactory, + Options options) { + super(processor, serverTransport, transportFactory, options); serverTransport_ = serverTransport; executorService_ = null; @@ -95,12 +97,22 @@ public class TThreadPoolServer extends TServer { * Loops on processing a client forever */ public void run() { + TTransport[] io = null; try { - while (processor_.process(client_, client_)) {} + io = transportFactory_.getIOTransports(client_); + while (processor_.process(io[0], io[1])) {} } catch (TException tx) { tx.printStackTrace(); } - client_.close(); + + if (io != null) { + if (io[0] != null) { + io[0].close(); + } + if (io[1] != null) { + io[1].close(); + } + } } } } diff --git a/lib/java/src/transport/TBaseTransportFactory.java b/lib/java/src/transport/TBaseTransportFactory.java new file mode 100644 index 00000000..90bbbe11 --- /dev/null +++ b/lib/java/src/transport/TBaseTransportFactory.java @@ -0,0 +1,23 @@ +package com.facebook.thrift.transport; + +/** + * Base transport factory just returns the arg transport. + * + * @author Mark Slee + */ +public class TBaseTransportFactory implements TTransportFactory { + + /** + * Returns a list of two transports (input, output) from a simple + * Transport. + * + * @param in The base transport + * @returns Array of two transports, first for input, second for output + */ + public TTransport[] getIOTransports(TTransport in) { + TTransport[] out = new TTransport[2]; + out[0] = out[1] = in; + return out; + } + +} diff --git a/lib/java/src/transport/TTransportFactory.java b/lib/java/src/transport/TTransportFactory.java new file mode 100644 index 00000000..8c7a0936 --- /dev/null +++ b/lib/java/src/transport/TTransportFactory.java @@ -0,0 +1,21 @@ +package com.facebook.thrift.transport; + +/** + * Factory class used to create an input and output transport out of a simple + * transport. This is used primarily in servers, which get Transports from + * a ServerTransport and then may want to mutate them. + * + * @author Mark Slee + */ +public interface TTransportFactory { + + /** + * Returns a list of two transports (input, output) from a simple + * Transport. + * + * @param in The base transport + * @returns Array of two transports, first for input, second for output + */ + public TTransport[] getIOTransports(TTransport in); + +} diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py index a5d5621b..53f6846d 100644 --- a/lib/py/src/server/TServer.py +++ b/lib/py/src/server/TServer.py @@ -8,8 +8,13 @@ class TServer: """Base interface for a server, which must have a run method.""" - def __init__(self, proc): - self.processor = proc + def __init__(self, processor, serverTransport, transportFactory=None): + self.processor = processor + self.serverTransport = serverTransport + if transportFactory == None: + self.transportFactory = TTransport.TTransportFactoryBase() + else: + self.transportFactory = transportFactory def run(self): pass @@ -18,18 +23,20 @@ class TSimpleServer(TServer): """Simple single-threaded server that just pumps around one transport.""" - def __init__(self, proc, trans): - TServer.__init__(self, proc) - self.transport = trans + def __init__(self, processor, serverTransport, transportFactory=None): + TServer.__init__(self, processor, serverTransport, transportFactory) def run(self): - self.transport.listen() + self.serverTransport.listen() while True: - client = TTransport.TBufferedTransport(self.transport.accept()) + client = self.serverTransport.accept() + (input, output) = self.transportFactory.getIOTransports(client) try: while True: - self.processor.process(client, client) + self.processor.process(input, output) except Exception, x: print '%s, %s, %s' % (type(x), x, traceback.format_exc()) print 'Client died.' - client.close() + + input.close() + output.close() diff --git a/lib/py/src/transport/TSocket.py b/lib/py/src/transport/TSocket.py index 61f1cff8..2c7dd3ee 100644 --- a/lib/py/src/transport/TSocket.py +++ b/lib/py/src/transport/TSocket.py @@ -21,8 +21,9 @@ class TSocket(TTransportBase): self.handle.connect((self.host, self.port)) def close(self): - self.handle.close() - self.handle = None + if self.handle != None: + self.handle.close() + self.handle = None def readAll(self, sz): buff = '' diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py index 1e8b6c60..a7eb3b02 100644 --- a/lib/py/src/transport/TTransport.py +++ b/lib/py/src/transport/TTransport.py @@ -36,6 +36,22 @@ class TServerTransportBase: def close(self): pass +class TTransportFactoryBase: + + """Base class for a Transport Factory""" + + def getIOTransports(self, trans): + return (trans, trans) + +class TBufferedTransportFactory: + + """Factory transport that builds buffered transports""" + + def getIOTransports(self, trans): + buffered = TBufferedTransport(trans) + return (buffered, buffered) + + class TBufferedTransport(TTransportBase): """Class that wraps another transport and buffers its I/O.""" diff --git a/test/cpp/src/TestClient.cc b/test/cpp/src/TestClient.cc index e12b65b7..6334f080 100644 --- a/test/cpp/src/TestClient.cc +++ b/test/cpp/src/TestClient.cc @@ -45,23 +45,28 @@ int main(int argc, char** argv) { } shared_ptr socket(new TSocket(host, port)); - shared_ptr bufferedSocket(new TBufferedTransport(socket, 2048)); + shared_ptr bufferedSocket(new TBufferedTransport(socket)); shared_ptr binaryProtocol(new TBinaryProtocol()); ThriftTestClient testClient(bufferedSocket, binaryProtocol); - + + uint64_t time_min = 0; + uint64_t time_max = 0; + uint64_t time_tot = 0; + int test = 0; for (test = 0; test < numTests; ++test) { - /** - * CONNECT TEST - */ - printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port); try { bufferedSocket->open(); } catch (TTransportException& ttx) { printf("Connect failed: %s\n", ttx.getMessage().c_str()); continue; } + + /** + * CONNECT TEST + */ + printf("Test #%d, connect %s:%d\n", test+1, host.c_str(), port); uint64_t start = now(); @@ -379,12 +384,29 @@ int main(int argc, char** argv) { } uint64_t stop = now(); + uint64_t tot = stop-start; + printf("Total time: %lu us\n", stop-start); + time_tot += tot; + if (time_min == 0 || tot < time_min) { + time_min = tot; + } + if (tot > time_max) { + time_max = tot; + } + bufferedSocket->close(); } // printf("\nSocket syscalls: %u", g_socket_syscalls); printf("\nAll tests done.\n"); + + uint64_t time_avg = time_tot / numTests; + + printf("Min time: %lu us\n", time_min); + printf("Max time: %lu us\n", time_max); + printf("Avg time: %lu us\n", time_avg); + return 0; } diff --git a/test/cpp/src/TestServer.cc b/test/cpp/src/TestServer.cc index 97d34401..f743aea0 100644 --- a/test/cpp/src/TestServer.cc +++ b/test/cpp/src/TestServer.cc @@ -4,6 +4,7 @@ #include #include #include +#include #include "ThriftTest.h" #include @@ -53,23 +54,13 @@ class TestHandler : public ThriftTestIf { } Xtruct testStruct(Xtruct thing) { - printf("testStruct({\"%s\", %d, %d, %ld})\n", - thing.string_thing.c_str(), - (int)thing.byte_thing, - thing.i32_thing, - thing.i64_thing); + printf("testStruct({\"%s\", %d, %d, %ld})\n", thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing); return thing; } Xtruct2 testNest(Xtruct2 nest) { Xtruct thing = nest.struct_thing; - printf("testNest({%d, {\"%s\", %d, %d, %ld}, %d})\n", - (int)nest.byte_thing, - thing.string_thing.c_str(), - (int)thing.byte_thing, - thing.i32_thing, - thing.i64_thing, - nest.i32_thing); + printf("testNest({%d, {\"%s\", %d, %d, %ld}, %d})\n", (int)nest.byte_thing, thing.string_thing.c_str(), (int)thing.byte_thing, thing.i32_thing, thing.i64_thing, nest.i32_thing); return nest; } @@ -205,11 +196,7 @@ class TestHandler : public ThriftTestIf { list::const_iterator x; printf("{"); for (x = xtructs.begin(); x != xtructs.end(); ++x) { - printf("{\"%s\", %d, %d, %ld}, ", - x->string_thing.c_str(), - (int)x->byte_thing, - x->i32_thing, - x->i64_thing); + printf("{\"%s\", %d, %d, %ld}, ", x->string_thing.c_str(), (int)x->byte_thing, x->i32_thing, x->i64_thing); } printf("}"); @@ -347,18 +334,23 @@ int main(int argc, char **argv) { shared_ptr testServer(new ThriftTestServer(testHandler, binaryProtocol)); - // Options - shared_ptr serverOptions(new TServerOptions()); - // Transport shared_ptr serverSocket(new TServerSocket(port)); + // Factory + shared_ptr transportFactory(new TBufferedTransportFactory()); + + // Options + shared_ptr serverOptions(new TServerOptions()); + if (serverType == "simple") { // Server TSimpleServer simpleServer(testServer, - serverOptions, - serverSocket); + serverSocket, + transportFactory, + serverOptions + ); printf("Starting the server on port %d...\n", port); simpleServer.run(); @@ -376,9 +368,10 @@ int main(int argc, char **argv) { threadManager->start(); TThreadPoolServer threadPoolServer(testServer, - serverOptions, serverSocket, - threadManager); + transportFactory, + threadManager, + serverOptions); printf("Starting the server on port %d...\n", port); threadPoolServer.run(); diff --git a/test/java/src/TestClient.java b/test/java/src/TestClient.java index 74fbfefc..fda37de8 100644 --- a/test/java/src/TestClient.java +++ b/test/java/src/TestClient.java @@ -42,13 +42,16 @@ public class TestClient { ThriftTest.Client testClient = new ThriftTest.Client(tSocket, binaryProtocol); + long timeMin = 0; + long timeMax = 0; + long timeTot = 0; + for (int test = 0; test < numTests; ++test) { /** * CONNECT TEST */ - System.out.println("Test #" + (test+1) + ", " + - "connect " + host + ":" + port); + System.out.println("Test #" + (test+1) + ", " + "connect " + host + ":" + port); try { tSocket.open(); } catch (TTransportException ttx) { @@ -56,7 +59,7 @@ public class TestClient { continue; } - long start = System.currentTimeMillis(); + long start = System.nanoTime(); /** * VOID TEST @@ -110,11 +113,7 @@ public class TestClient { out.i32_thing = -3; out.i64_thing = -5; Xtruct in = testClient.testStruct(out); - System.out.print(" = {" + - "\"" + in.string_thing + "\", " + - in.byte_thing + ", " + - in.i32_thing + ", " + - in.i64_thing + "}\n"); + System.out.print(" = {" + "\"" + in.string_thing + "\", " + in.byte_thing + ", " + in.i32_thing + ", " + in.i64_thing + "}\n"); /** * NESTED STRUCT TEST @@ -126,13 +125,7 @@ public class TestClient { out2.i32_thing = 5; Xtruct2 in2 = testClient.testNest(out2); in = in2.struct_thing; - System.out.print(" = {" + - in2.byte_thing + ", {" + - "\"" + in.string_thing + "\", " + - in.byte_thing + ", " + - in.i32_thing + ", " + - in.i64_thing + "}, " + - in2.i32_thing + "}\n"); + System.out.print(" = {" + in2.byte_thing + ", {" + "\"" + in.string_thing + "\", " + in.byte_thing + ", " + in.i32_thing + ", " + in.i64_thing + "}, " + in2.i32_thing + "}\n"); /** * MAP TEST @@ -299,19 +292,14 @@ public class TestClient { HashMap userMap = v2.userMap; System.out.print("{"); for (int k3 : userMap.keySet()) { - System.out.print(k3 + " => " + - userMap.get(k3) + ", "); + System.out.print(k3 + " => " + userMap.get(k3) + ", "); } System.out.print("}, "); ArrayList xtructs = v2.xtructs; System.out.print("{"); for (Xtruct x : xtructs) { - System.out.print("{" + - "\"" + x.string_thing + "\", " + - x.byte_thing + ", " + - x.i32_thing + ", "+ - x.i64_thing + "}, "); + System.out.print("{" + "\"" + x.string_thing + "\", " + x.byte_thing + ", " + x.i32_thing + ", "+ x.i64_thing + "}, "); } System.out.print("}"); @@ -321,14 +309,32 @@ public class TestClient { } System.out.print("}\n"); - long stop = System.currentTimeMillis(); - System.out.println("Total time: " + (stop-start) + "ms"); + long stop = System.nanoTime(); + long tot = stop-start; + + System.out.println("Total time: " + tot/1000 + "us"); + + if (timeMin == 0 || tot < timeMin) { + timeMin = tot; + } + if (tot > timeMax) { + timeMax = tot; + } + timeTot += tot; tSocket.close(); } + + long timeAvg = timeTot / numTests; + System.out.println("Min time: " + timeMin/1000 + "us"); + System.out.println("Max time: " + timeMax/1000 + "us"); + System.out.println("Avg time: " + timeAvg/1000 + "us"); + } catch (Exception x) { x.printStackTrace(); - } + } + } + } diff --git a/test/py/TestServer.py b/test/py/TestServer.py index 525ffee3..db2ad81b 100755 --- a/test/py/TestServer.py +++ b/test/py/TestServer.py @@ -5,6 +5,7 @@ sys.path.append('./gen-py') import ThriftTest from ThriftTest_types import * +from thrift.transport import TTransport from thrift.transport import TSocket from thrift.protocol import TBinaryProtocol from thrift.server import TServer @@ -54,5 +55,6 @@ transport = TSocket.TServerSocket(9090) protocol = TBinaryProtocol.TBinaryProtocol() handler = TestHandler() iface = ThriftTest.Server(handler, protocol) -server = TServer.TSimpleServer(iface, transport) +factory = TTransport.TBufferedTransportFactory() +server = TServer.TSimpleServer(iface, transport, factory) server.run() -- 2.17.1