From 6b1027183c95981c8834faf91ba3b161b29a6d76 Mon Sep 17 00:00:00 2001 From: jfarrell Date: Fri, 4 Apr 2014 11:34:42 -0400 Subject: [PATCH] THRIFT-1868:Make the TPC backlog configurable in the Java servers Client: java Patch: Jean-Daniel Cryans Makes TServerSocket backlog configurable. --- .../transport/TNonblockingServerSocket.java | 15 ++++-- .../transport/TSSLTransportFactory.java | 47 ++++++++++--------- .../thrift/transport/TServerSocket.java | 30 +++++++++--- .../thrift/transport/TServerTransport.java | 27 +++++++++++ .../thrift/async/TestTAsyncClientManager.java | 42 +++++++++-------- .../thrift/server/TestNonblockingServer.java | 2 +- .../thrift/test/TestNonblockingServer.java | 2 +- .../org/apache/thrift/test/TestServer.java | 28 +++++------ .../thrift/transport/TestTSaslTransports.java | 8 ++-- 9 files changed, 128 insertions(+), 73 deletions(-) diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java index 25b487e6..112d939a 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java @@ -54,6 +54,9 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { */ private int clientTimeout_ = 0; + public static class NonblockingAbstractServerSocketArgs extends + AbstractServerTransportArgs {} + /** * Creates just a port listening server socket */ @@ -65,7 +68,7 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { * Creates just a port listening server socket */ public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException { - this(new InetSocketAddress(port), clientTimeout); + this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout)); } public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException { @@ -73,7 +76,11 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { } public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { - clientTimeout_ = clientTimeout; + this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout)); + } + + public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException { + clientTimeout_ = args.clientTimeout; try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -83,10 +90,10 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { // Prevent 2MSL delay problem on server restarts serverSocket_.setReuseAddress(true); // Bind to listening port - serverSocket_.bind(bindAddr); + serverSocket_.bind(args.bindAddr, args.backlog); } catch (IOException ioe) { serverSocket_ = null; - throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); + throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + "."); } } diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java index 044e06ab..ca27729a 100755 --- a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java @@ -33,7 +33,7 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; /** - * A Factory for providing and setting up Client and Server SSL wrapped + * A Factory for providing and setting up Client and Server SSL wrapped * TSocket and TServerSocket */ public class TSSLTransportFactory { @@ -42,24 +42,24 @@ public class TSSLTransportFactory { * Get a SSL wrapped TServerSocket bound to the specified port. In this * configuration the default settings are used. Default settings are retrieved * from System properties that are set. - * + * * Example system properties: * -Djavax.net.ssl.trustStore= * -Djavax.net.ssl.trustStorePassword=password * -Djavax.net.ssl.keyStore= * -Djavax.net.ssl.keyStorePassword=password - * + * * @param port * @return A SSL wrapped TServerSocket * @throws TTransportException */ public static TServerSocket getServerSocket(int port) throws TTransportException { - return getServerSocket(port, 0); + return getServerSocket(port, 0); } /** * Get a default SSL wrapped TServerSocket bound to the specified port - * + * * @param port * @param clientTimeout * @return A SSL wrapped TServerSocket @@ -71,7 +71,7 @@ public class TSSLTransportFactory { /** * Get a default SSL wrapped TServerSocket bound to the specified port and interface - * + * * @param port * @param clientTimeout * @param ifAddress @@ -80,14 +80,14 @@ public class TSSLTransportFactory { */ public static TServerSocket getServerSocket(int port, int clientTimeout, boolean clientAuth, InetAddress ifAddress) throws TTransportException { SSLServerSocketFactory factory = (SSLServerSocketFactory) SSLServerSocketFactory.getDefault(); - return createServer(factory, port, clientTimeout, clientAuth, ifAddress, null); + return createServer(factory, port, clientTimeout, clientAuth, ifAddress, null); } /** - * Get a configured SSL wrapped TServerSocket bound to the specified port and interface. - * Here the TSSLTransportParameters are used to set the values for the algorithms, keystore, + * Get a configured SSL wrapped TServerSocket bound to the specified port and interface. + * Here the TSSLTransportParameters are used to set the values for the algorithms, keystore, * truststore and other settings - * + * * @param port * @param clientTimeout * @param ifAddress @@ -113,7 +113,8 @@ public class TSSLTransportFactory { if (params != null && params.cipherSuites != null) { serverSocket.setEnabledCipherSuites(params.cipherSuites); } - return new TServerSocket(serverSocket, timeout); + return new TServerSocket(new TServerSocket.ServerSocketTransportArgs(). + serverSocket(serverSocket).clientTimeout(timeout)); } catch (Exception e) { throw new TTransportException("Could not bind to port " + port, e); } @@ -121,9 +122,9 @@ public class TSSLTransportFactory { /** * Get a default SSL wrapped TSocket connected to the specified host and port. All - * the client methods return a bound connection. So there is no need to call open() on the + * the client methods return a bound connection. So there is no need to call open() on the * TTransport. - * + * * @param host * @param port * @param timeout @@ -137,7 +138,7 @@ public class TSSLTransportFactory { /** * Get a default SSL wrapped TSocket connected to the specified host and port. - * + * * @param host * @param port * @return A SSL wrapped TSocket @@ -148,9 +149,9 @@ public class TSSLTransportFactory { } /** - * Get a custom configured SSL wrapped TSocket. The SSL settings are obtained from the + * Get a custom configured SSL wrapped TSocket. The SSL settings are obtained from the * passed in TSSLTransportParameters. - * + * * @param host * @param port * @param timeout @@ -250,7 +251,7 @@ public class TSSLTransportFactory { /** * Create parameters specifying the protocol and cipher suites - * + * * @param protocol The specific protocol (TLS/SSL) can be specified with versions * @param cipherSuites */ @@ -261,7 +262,7 @@ public class TSSLTransportFactory { /** * Create parameters specifying the protocol, cipher suites and if client authentication * is required - * + * * @param protocol The specific protocol (TLS/SSL) can be specified with versions * @param cipherSuites * @param clientAuth @@ -276,7 +277,7 @@ public class TSSLTransportFactory { /** * Set the keystore, password, certificate type and the store type - * + * * @param keyStore Location of the Keystore on disk * @param keyPass Keystore password * @param keyManagerType The default is X509 @@ -296,7 +297,7 @@ public class TSSLTransportFactory { /** * Set the keystore and password - * + * * @param keyStore Location of the Keystore on disk * @param keyPass Keystore password */ @@ -306,7 +307,7 @@ public class TSSLTransportFactory { /** * Set the truststore, password, certificate type and the store type - * + * * @param trustStore Location of the Truststore on disk * @param trustPass Truststore password * @param trustManagerType The default is X509 @@ -326,7 +327,7 @@ public class TSSLTransportFactory { /** * Set the truststore and password - * + * * @param trustStore Location of the Truststore on disk * @param trustPass Truststore password */ @@ -336,7 +337,7 @@ public class TSSLTransportFactory { /** * Set if client authentication is required - * + * * @param clientAuth */ public void requireClientAuth(boolean clientAuth) { diff --git a/lib/java/src/org/apache/thrift/transport/TServerSocket.java b/lib/java/src/org/apache/thrift/transport/TServerSocket.java index 147074a4..8345d449 100644 --- a/lib/java/src/org/apache/thrift/transport/TServerSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TServerSocket.java @@ -46,19 +46,27 @@ public class TServerSocket extends TServerTransport { */ private int clientTimeout_ = 0; + public static class ServerSocketTransportArgs extends AbstractServerTransportArgs { + ServerSocket serverSocket; + + public ServerSocketTransportArgs serverSocket(ServerSocket serverSocket) { + this.serverSocket = serverSocket; + return this; + } + } + /** * Creates a server socket from underlying socket object */ - public TServerSocket(ServerSocket serverSocket) { + public TServerSocket(ServerSocket serverSocket) throws TTransportException { this(serverSocket, 0); } /** * Creates a server socket from underlying socket object */ - public TServerSocket(ServerSocket serverSocket, int clientTimeout) { - serverSocket_ = serverSocket; - clientTimeout_ = clientTimeout; + public TServerSocket(ServerSocket serverSocket, int clientTimeout) throws TTransportException { + this(new ServerSocketTransportArgs().serverSocket(serverSocket).clientTimeout(clientTimeout)); } /** @@ -80,17 +88,25 @@ public class TServerSocket extends TServerTransport { } public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { - clientTimeout_ = clientTimeout; + this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout)); + } + + public TServerSocket(ServerSocketTransportArgs args) throws TTransportException { + clientTimeout_ = args.clientTimeout; + if (args.serverSocket != null) { + this.serverSocket_ = args.serverSocket; + return; + } try { // Make server socket serverSocket_ = new ServerSocket(); // Prevent 2MSL delay problem on server restarts serverSocket_.setReuseAddress(true); // Bind to listening port - serverSocket_.bind(bindAddr); + serverSocket_.bind(args.bindAddr, args.backlog); } catch (IOException ioe) { serverSocket_ = null; - throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); + throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + "."); } } diff --git a/lib/java/src/org/apache/thrift/transport/TServerTransport.java b/lib/java/src/org/apache/thrift/transport/TServerTransport.java index e03ec4ca..424e4faa 100644 --- a/lib/java/src/org/apache/thrift/transport/TServerTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TServerTransport.java @@ -20,6 +20,7 @@ package org.apache.thrift.transport; import java.io.Closeable; +import java.net.InetSocketAddress; /** * Server transport. Object which provides client transports. @@ -27,6 +28,32 @@ import java.io.Closeable; */ public abstract class TServerTransport implements Closeable { + public static abstract class AbstractServerTransportArgs> { + int backlog = 0; // A value of 0 means the default value will be used (currently set at 50) + int clientTimeout = 0; + InetSocketAddress bindAddr; + + public T backlog(int backlog) { + this.backlog = backlog; + return (T) this; + } + + public T clientTimeout(int clientTimeout) { + this.clientTimeout = clientTimeout; + return (T) this; + } + + public T port(int port) { + this.bindAddr = new InetSocketAddress(port); + return (T) this; + } + + public T bindAddr(InetSocketAddress bindAddr) { + this.bindAddr = bindAddr; + return (T) this; + } + } + public abstract void listen() throws TTransportException; public final TTransport accept() throws TTransportException { diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index d88b8a50..12d0eaf3 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -53,7 +53,9 @@ public class TestTAsyncClientManager extends TestCase { private TAsyncClientManager clientManager_; public void setUp() throws Exception { - server_ = new THsHaServer(new Args(new TNonblockingServerSocket(ServerTestBase.PORT)).processor(new Srv.Processor(new SrvHandler()))); + server_ = new THsHaServer(new Args(new TNonblockingServerSocket( + new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(ServerTestBase.PORT))). + processor(new Srv.Processor(new SrvHandler()))); serverThread_ = new Thread(new Runnable() { public void run() { server_.serve(); @@ -79,8 +81,8 @@ public class TestTAsyncClientManager extends TestCase { Srv.AsyncClient client = getClient(); client.setTimeout(5000); basicCall(client); - } - + } + public void testTimeoutCall() throws Exception { final CountDownLatch latch = new CountDownLatch(1); Srv.AsyncClient client = getClient(); @@ -98,7 +100,7 @@ public class TestTAsyncClientManager extends TestCase { latch.countDown(); } } - + @Override public void onComplete(primitiveMethod_call response) { try { @@ -111,8 +113,8 @@ public class TestTAsyncClientManager extends TestCase { latch.await(2, TimeUnit.SECONDS); assertTrue(client.hasError()); assertTrue(client.getError() instanceof TimeoutException); - } - + } + public void testVoidCall() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); @@ -132,8 +134,8 @@ public class TestTAsyncClientManager extends TestCase { }); latch.await(1, TimeUnit.SECONDS); assertTrue(returned.get()); - } - + } + public void testOnewayCall() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); @@ -153,8 +155,8 @@ public class TestTAsyncClientManager extends TestCase { }); latch.await(1, TimeUnit.SECONDS); assertTrue(returned.get()); - } - + } + public void testParallelCalls() throws Exception { // make multiple calls with deserialization in the selector thread (repro Eric's issue) int numThreads = 50; @@ -176,13 +178,13 @@ public class TestTAsyncClientManager extends TestCase { numSuccesses += runnable.getNumSuccesses(); } assertEquals(numThreads * numCallsPerThread, numSuccesses); - } - + } + private Srv.AsyncClient getClient() throws IOException { TNonblockingSocket clientSocket = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT); return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_, clientSocket); } - + private void basicCall(Srv.AsyncClient client) throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); @@ -198,7 +200,7 @@ public class TestTAsyncClientManager extends TestCase { latch.countDown(); } } - + @Override public void onError(Exception exception) { try { @@ -213,7 +215,7 @@ public class TestTAsyncClientManager extends TestCase { latch.await(100, TimeUnit.SECONDS); assertTrue(returned.get()); } - + public class SrvHandler implements Iface { // Use this method for a standard call testing @Override @@ -232,7 +234,7 @@ public class TestTAsyncClientManager extends TestCase { } return 0; } - + @Override public void methodWithDefaultArgs(int something) throws TException { } @@ -249,20 +251,20 @@ public class TestTAsyncClientManager extends TestCase { public void onewayMethod() throws TException { } } - + private static abstract class FailureLessCallback implements AsyncMethodCallback { @Override public void onError(Exception exception) { fail(exception); } } - + private static void fail(Exception exception) { StringWriter sink = new StringWriter(); exception.printStackTrace(new PrintWriter(sink, true)); fail("unexpected error " + sink.toString()); } - + private class JankyRunnable implements Runnable { private int numCalls_; private int numSuccesses_ = 0; @@ -286,7 +288,7 @@ public class TestTAsyncClientManager extends TestCase { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean returned = new AtomicBoolean(false); client_.Janky(1, new AsyncMethodCallback() { - + @Override public void onComplete(Janky_call response) { try { diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java index 78376956..3df3bd82 100644 --- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java +++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java @@ -53,7 +53,7 @@ public class TestNonblockingServer extends ServerTestBase { try { // Transport TNonblockingServerSocket tServerSocket = - new TNonblockingServerSocket(PORT); + new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(PORT)); server = getServer(processor, tServerSocket, protoFactory, factory); diff --git a/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java index 18343b0e..41c4b650 100644 --- a/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java +++ b/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java @@ -52,7 +52,7 @@ public class TestNonblockingServer extends TestServer { // Transport TNonblockingServerSocket tServerSocket = - new TNonblockingServerSocket(port); + new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port)); TServer serverEngine; diff --git a/lib/java/test/org/apache/thrift/test/TestServer.java b/lib/java/test/org/apache/thrift/test/TestServer.java index 125a7736..ee0866e0 100644 --- a/lib/java/test/org/apache/thrift/test/TestServer.java +++ b/lib/java/test/org/apache/thrift/test/TestServer.java @@ -124,12 +124,12 @@ public class TestServer { protocol_type = args[i].split("=")[1]; protocol_type.trim(); } else if (args[i].startsWith("--transport")) { - transport_type = args[i].split("=")[1]; + transport_type = args[i].split("=")[1]; transport_type.trim(); } else if (args[i].equals("--ssl")) { ssl = true; } else if (args[i].equals("--help")) { - System.out.println("Allowed options:"); + System.out.println("Allowed options:"); System.out.println(" --help\t\t\tProduce help message"); System.out.println(" --port=arg (=" + port + ")\tPort number to connect"); System.out.println(" --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed"); @@ -143,7 +143,7 @@ public class TestServer { System.err.println("Can not parse arguments! See --help"); System.exit(1); } - + try { if (server_type.equals("simple")) { } else if (server_type.equals("thread-pool")) { @@ -156,13 +156,13 @@ public class TestServer { throw new Exception("SSL is not supported over nonblocking servers!"); } } else { - throw new Exception("Unknown server type! " + server_type); + throw new Exception("Unknown server type! " + server_type); } if (protocol_type.equals("binary")) { } else if (protocol_type.equals("json")) { } else if (protocol_type.equals("compact")) { } else { - throw new Exception("Unknown protocol type! " + protocol_type); + throw new Exception("Unknown protocol type! " + protocol_type); } if (transport_type.equals("buffered")) { } else if (transport_type.equals("framed")) { @@ -171,7 +171,7 @@ public class TestServer { throw new Exception("Unknown transport type! " + transport_type); } } catch (Exception e) { - System.err.println("Error: " + e.getMessage()); + System.err.println("Error: " + e.getMessage()); System.exit(1); } @@ -204,15 +204,15 @@ public class TestServer { TServer serverEngine = null; - if (server_type.equals("nonblocking") || + if (server_type.equals("nonblocking") || server_type.equals("threaded-selector")) { // Nonblocking servers TNonblockingServerSocket tNonblockingServerSocket = - new TNonblockingServerSocket(port); - + new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port)); + if (server_type.equals("nonblocking")) { // Nonblocking Server - TNonblockingServer.Args tNonblockingServerArgs + TNonblockingServer.Args tNonblockingServerArgs = new TNonblockingServer.Args(tNonblockingServerSocket); tNonblockingServerArgs.processor(testProcessor); tNonblockingServerArgs.protocolFactory(tProtocolFactory); @@ -221,12 +221,12 @@ public class TestServer { serverEngine = new TNonblockingServer(tNonblockingServerArgs); } else { // server_type.equals("threaded-selector") // ThreadedSelector Server - TThreadedSelectorServer.Args tThreadedSelectorServerArgs + TThreadedSelectorServer.Args tThreadedSelectorServerArgs = new TThreadedSelectorServer.Args(tNonblockingServerSocket); tThreadedSelectorServerArgs.processor(testProcessor); tThreadedSelectorServerArgs.protocolFactory(tProtocolFactory); tThreadedSelectorServerArgs.transportFactory(tTransportFactory); - + serverEngine = new TThreadedSelectorServer(tThreadedSelectorServerArgs); } } else { @@ -237,7 +237,7 @@ public class TestServer { if (ssl) { tServerSocket = TSSLTransportFactory.getServerSocket(port, 0); } else { - tServerSocket = new TServerSocket(port); + tServerSocket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(port)); } if (server_type.equals("simple")) { @@ -250,7 +250,7 @@ public class TestServer { serverEngine = new TSimpleServer(tServerArgs); } else { // server_type.equals("threadpool") // ThreadPool Server - TThreadPoolServer.Args tThreadPoolServerArgs + TThreadPoolServer.Args tThreadPoolServerArgs = new TThreadPoolServer.Args(tServerSocket); tThreadPoolServerArgs.processor(testProcessor); tThreadPoolServerArgs.protocolFactory(tProtocolFactory); diff --git a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java index 41d08f60..80e53b9f 100644 --- a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java +++ b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java @@ -73,7 +73,7 @@ public class TestTSaslTransports extends TestCase { + "score and seven years ago our fathers brought forth on this " + "continent a new nation, conceived in liberty, and dedicated to the " + "proposition that all men are created equal."; - + private static final String testMessage2 = "I have a dream that one day " + "this nation will rise up and live out the true meaning of its creed: " + "'We hold these truths to be self-evident, that all men are created equal.'"; @@ -123,7 +123,9 @@ public class TestTSaslTransports extends TestCase { } private void internalRun() throws Exception { - TServerSocket serverSocket = new TServerSocket(ServerTestBase.PORT); + TServerSocket serverSocket = new TServerSocket( + new TServerSocket.ServerSocketTransportArgs(). + port(ServerTestBase.PORT)); try { acceptAndWrite(serverSocket); } finally { @@ -280,7 +282,7 @@ public class TestTSaslTransports extends TestCase { public void run() { try { // Transport - TServerSocket socket = new TServerSocket(PORT); + TServerSocket socket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(PORT)); TTransportFactory factory = new TSaslServerTransport.Factory( WRAPPED_MECHANISM, SERVICE, HOST, WRAPPED_PROPS, -- 2.17.1