From e703ed2ce689fce7b57af1f7299ccbfdc99adaee Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Fri, 22 Oct 2010 20:23:57 +0000 Subject: [PATCH] THRIFT-745. java: Make it easier to instantiate servers This patch replaces the multitude of constructors with builder-esque Args objects for each server and single constructor. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1026482 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/thrift/server/THsHaServer.java | 219 ++++-------------- .../thrift/server/TNonblockingServer.java | 119 ++-------- .../src/org/apache/thrift/server/TServer.java | 118 ++++++---- .../apache/thrift/server/TSimpleServer.java | 51 +--- .../thrift/server/TThreadPoolServer.java | 130 +++-------- .../thrift/async/TestTAsyncClientManager.java | 13 +- .../apache/thrift/server/TestHsHaServer.java | 3 +- .../thrift/server/TestNonblockingServer.java | 3 +- .../transport/TestTSSLTransportFactory.java | 3 +- .../thrift/transport/TestTSaslTransports.java | 3 +- 10 files changed, 174 insertions(+), 488 deletions(-) diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java index 207d8e8e..f3dfd0a5 100644 --- a/lib/java/src/org/apache/thrift/server/THsHaServer.java +++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java @@ -26,11 +26,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.thrift.TProcessor; -import org.apache.thrift.TProcessorFactory; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,185 +38,65 @@ public class THsHaServer extends TNonblockingServer { private static final Logger LOGGER = LoggerFactory.getLogger(THsHaServer.class.getName()); - // This wraps all the functionality of queueing and thread pool management - // for the passing of Invocations from the Selector to workers. - private ExecutorService invoker; - - /** - * Create server with given processor, and server transport. Default server - * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on - * both input and output transports. A TProcessorFactory will be created that - * always returns the specified processor. - */ - public THsHaServer( TProcessor processor, - TNonblockingServerTransport serverTransport) { - this(processor, serverTransport, new Options()); - } - - /** - * Create server with given processor, server transport, and server options - * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on - * both input and output transports. A TProcessorFactory will be created that - * always returns the specified processor. - */ - public THsHaServer( TProcessor processor, - TNonblockingServerTransport serverTransport, - Options options) { - this(new TProcessorFactory(processor), serverTransport, options); - } + public static class Args extends AbstractNonblockingServerArgs { + private int workerThreads = 5; + private int stopTimeoutVal = 60; + private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + private ExecutorService executorService = null; - /** - * Create server with specified processor factory and server transport. Uses - * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is - * used on both input and output transports. - */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport) { - this(processorFactory, serverTransport, new Options()); - } + public Args(TNonblockingServerTransport transport) { + super(transport); + } - /** - * Create server with specified processor factory, server transport, and server - * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on - * both input and output transports. - */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - Options options) { - this(processorFactory, serverTransport, new TFramedTransport.Factory(), - new TBinaryProtocol.Factory(), options); - } + public Args workerThreads(int i) { + workerThreads = i; + return this; + } - /** - * Server with specified processor, server transport, and in/out protocol - * factory. Defaults will be used for in/out transport factory and server - * options. - */ - public THsHaServer( TProcessor processor, - TNonblockingServerTransport serverTransport, - TProtocolFactory protocolFactory) { - this(processor, serverTransport, protocolFactory, new Options()); - } + public int getWorkerThreads() { + return workerThreads; + } - /** - * Server with specified processor, server transport, and in/out protocol - * factory. Defaults will be used for in/out transport factory and server - * options. - */ - public THsHaServer( TProcessor processor, - TNonblockingServerTransport serverTransport, - TProtocolFactory protocolFactory, - Options options) { - this(new TProcessorFactory(processor), serverTransport, - new TFramedTransport.Factory(), - protocolFactory, protocolFactory, - options); - } + public int getStopTimeoutVal() { + return stopTimeoutVal; + } - /** - * Create server with specified processor, server transport, in/out - * transport factory, in/out protocol factory, and default server options. A - * processor factory will be created that always returns the specified - * processor. - */ - public THsHaServer( TProcessor processor, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory transportFactory, - TProtocolFactory protocolFactory) { - this(new TProcessorFactory(processor), serverTransport, - transportFactory, protocolFactory); - } + public Args stopTimeoutVal(int stopTimeoutVal) { + this.stopTimeoutVal = stopTimeoutVal; + return this; + } - /** - * Create server with specified processor factory, server transport, in/out - * transport factory, in/out protocol factory, and default server options. - */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory transportFactory, - TProtocolFactory protocolFactory) { - this(processorFactory, serverTransport, - transportFactory, - protocolFactory, protocolFactory, new Options()); - } + public TimeUnit getStopTimeoutUnit() { + return stopTimeoutUnit; + } - /** - * Create server with specified processor factory, server transport, in/out - * transport factory, in/out protocol factory, and server options. - */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory transportFactory, - TProtocolFactory protocolFactory, - Options options) { - this(processorFactory, serverTransport, - transportFactory, - protocolFactory, protocolFactory, - options); - } + public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { + this.stopTimeoutUnit = stopTimeoutUnit; + return this; + } - /** - * Create server with everything specified, except use default server options. - */ - public THsHaServer( TProcessor processor, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - this(new TProcessorFactory(processor), serverTransport, - outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - } + public ExecutorService getExecutorService() { + return executorService; + } - /** - * Create server with everything specified, except use default server options. - */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) - { - this(processorFactory, serverTransport, - outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, new Options()); + public Args executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } } - /** - * Create server with every option fully specified, with an internally managed - * ExecutorService - */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - Options options) - { - this(processorFactory, serverTransport, - outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, - createInvokerPool(options), - options); - } + // This wraps all the functionality of queueing and thread pool management + // for the passing of Invocations from the Selector to workers. + private ExecutorService invoker; /** * Create server with every option fully specified, and with an injected * ExecutorService */ - public THsHaServer( TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - ExecutorService executor, - TNonblockingServer.Options options) { - super(processorFactory, serverTransport, - outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, - options); - - invoker = executor; + public THsHaServer(Args args) { + super(args); + + invoker = args.executorService == null ? createInvokerPool(args) : args.executorService; } /** @inheritDoc */ @@ -255,7 +130,7 @@ public class THsHaServer extends TNonblockingServer { /** * Helper to create an invoker pool */ - protected static ExecutorService createInvokerPool(Options options) { + protected static ExecutorService createInvokerPool(Args options) { int workerThreads = options.workerThreads; int stopTimeoutVal = options.stopTimeoutVal; TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; @@ -326,10 +201,4 @@ public class THsHaServer extends TNonblockingServer { frameBuffer.invoke(); } } - - public static class Options extends TNonblockingServer.Options { - public int workerThreads = 5; - public int stopTimeoutVal = 60; - public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; - } } diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 0d60f711..3587bf7d 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -31,11 +31,7 @@ import java.util.Set; import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; -import org.apache.thrift.TProcessor; -import org.apache.thrift.TProcessorFactory; -import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TMemoryInputTransport; @@ -61,6 +57,21 @@ public class TNonblockingServer extends TServer { private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServer.class.getName()); + public static class Args extends AbstractNonblockingServerArgs { + public Args(TNonblockingServerTransport transport) { + super(transport); + } + } + + public static abstract class AbstractNonblockingServerArgs> extends AbstractServerArgs { + public long maxReadBufferBytes = Long.MAX_VALUE; + + public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { + super(transport); + transportFactory(new TFramedTransport.Factory()); + } + } + // Flag for stopping the server private volatile boolean stopped_ = true; @@ -73,95 +84,14 @@ public class TNonblockingServer extends TServer { */ private final long MAX_READ_BUFFER_BYTES; - protected final Options options_; - /** * How many bytes are currently allocated to read buffers. */ private long readBufferBytesAllocated = 0; - /** - * Create server with given processor and server transport, using - * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input - * and output transports. A TProcessorFactory will be created that always - * returns the specified processor. - */ - public TNonblockingServer(TProcessor processor, - TNonblockingServerTransport serverTransport) { - this(new TProcessorFactory(processor), serverTransport); - } - - /** - * Create server with specified processor factory and server transport. - * TBinaryProtocol is assumed. TFramedTransport.Factory is used on both input - * and output transports. - */ - public TNonblockingServer(TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport) { - this(processorFactory, serverTransport, - new TFramedTransport.Factory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); - } - - public TNonblockingServer(TProcessor processor, - TNonblockingServerTransport serverTransport, - TProtocolFactory protocolFactory) { - this(processor, serverTransport, - new TFramedTransport.Factory(), - protocolFactory, protocolFactory); - } - - public TNonblockingServer(TProcessor processor, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory transportFactory, - TProtocolFactory protocolFactory) { - this(processor, serverTransport, - transportFactory, - protocolFactory, protocolFactory); - } - - public TNonblockingServer(TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory transportFactory, - TProtocolFactory protocolFactory) { - this(processorFactory, serverTransport, - transportFactory, - protocolFactory, protocolFactory); - } - - public TNonblockingServer(TProcessor processor, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - this(new TProcessorFactory(processor), serverTransport, - outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - } - - public TNonblockingServer(TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - this(processorFactory, serverTransport, - outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, - new Options()); - } - - public TNonblockingServer(TProcessorFactory processorFactory, - TNonblockingServerTransport serverTransport, - TFramedTransport.Factory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - Options options) { - super(processorFactory, serverTransport, - null, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - options_ = options; - options_.validate(); - MAX_READ_BUFFER_BYTES = options.maxReadBufferBytes; + public TNonblockingServer(AbstractNonblockingServerArgs args) { + super(args); + MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; } /** @@ -772,17 +702,4 @@ public class TNonblockingServer extends TServer { } } } // FrameBuffer - - - public static class Options { - public long maxReadBufferBytes = Long.MAX_VALUE; - - public Options() {} - - public void validate() { - if (maxReadBufferBytes <= 1024) { - throw new IllegalArgumentException("You must allocate at least 1KB to the read buffer."); - } - } - } } diff --git a/lib/java/src/org/apache/thrift/server/TServer.java b/lib/java/src/org/apache/thrift/server/TServer.java index 34093be4..0af66d39 100644 --- a/lib/java/src/org/apache/thrift/server/TServer.java +++ b/lib/java/src/org/apache/thrift/server/TServer.java @@ -19,6 +19,7 @@ package org.apache.thrift.server; +import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; @@ -31,6 +32,67 @@ import org.apache.thrift.transport.TTransportFactory; */ public abstract class TServer { + public static class Args extends AbstractServerArgs { + public Args(TServerTransport transport) { + super(transport); + } + } + + public static abstract class AbstractServerArgs> { + final TServerTransport serverTransport; + TProcessorFactory processorFactory; + TTransportFactory inputTransportFactory = new TTransportFactory(); + TTransportFactory outputTransportFactory = new TTransportFactory(); + TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory(); + TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory(); + + public AbstractServerArgs(TServerTransport transport) { + serverTransport = transport; + } + + public T processorFactory(TProcessorFactory factory) { + this.processorFactory = factory; + return (T) this; + } + + public T processor(TProcessor processor) { + this.processorFactory = new TProcessorFactory(processor); + return (T) this; + } + + public T transportFactory(TTransportFactory factory) { + this.inputTransportFactory = factory; + this.outputTransportFactory = factory; + return (T) this; + } + + public T inputTransportFactory(TTransportFactory factory) { + this.inputTransportFactory = factory; + return (T) this; + } + + public T outputTransportFactory(TTransportFactory factory) { + this.outputTransportFactory = factory; + return (T) this; + } + + public T protocolFactory(TProtocolFactory factory) { + this.inputProtocolFactory = factory; + this.outputProtocolFactory = factory; + return (T) this; + } + + public T inputProtocolFactory(TProtocolFactory factory) { + this.inputProtocolFactory = factory; + return (T) this; + } + + public T outputProtocolFactory(TProtocolFactory factory) { + this.outputProtocolFactory = factory; + return (T) this; + } + } + /** * Core processor */ @@ -63,55 +125,13 @@ public abstract class TServer { private boolean isServing; - /** - * Default constructors. - */ - - protected TServer(TProcessorFactory processorFactory, - TServerTransport serverTransport) { - this(processorFactory, - serverTransport, - new TTransportFactory(), - new TTransportFactory(), - new TBinaryProtocol.Factory(), - new TBinaryProtocol.Factory()); - } - - protected TServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory transportFactory) { - this(processorFactory, - serverTransport, - transportFactory, - transportFactory, - new TBinaryProtocol.Factory(), - new TBinaryProtocol.Factory()); - } - - protected TServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - this(processorFactory, - serverTransport, - transportFactory, - transportFactory, - protocolFactory, - protocolFactory); - } - - protected TServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - processorFactory_ = processorFactory; - serverTransport_ = serverTransport; - inputTransportFactory_ = inputTransportFactory; - outputTransportFactory_ = outputTransportFactory; - inputProtocolFactory_ = inputProtocolFactory; - outputProtocolFactory_ = outputProtocolFactory; + protected TServer(AbstractServerArgs args) { + processorFactory_ = args.processorFactory; + serverTransport_ = args.serverTransport; + inputTransportFactory_ = args.inputTransportFactory; + outputTransportFactory_ = args.outputTransportFactory; + inputProtocolFactory_ = args.inputProtocolFactory; + outputProtocolFactory_ = args.outputProtocolFactory; } /** diff --git a/lib/java/src/org/apache/thrift/server/TSimpleServer.java b/lib/java/src/org/apache/thrift/server/TSimpleServer.java index 97ba0ad7..ef1b10a5 100644 --- a/lib/java/src/org/apache/thrift/server/TSimpleServer.java +++ b/lib/java/src/org/apache/thrift/server/TSimpleServer.java @@ -21,13 +21,9 @@ package org.apache.thrift.server; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; -import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,53 +37,10 @@ public class TSimpleServer extends TServer { private boolean stopped_ = false; - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport) { - super(new TProcessorFactory(processor), serverTransport); + public TSimpleServer(AbstractServerArgs args) { + super(args); } - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - super(new TProcessorFactory(processor), serverTransport, transportFactory, protocolFactory); - } - - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - super(new TProcessorFactory(processor), serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - } - - public TSimpleServer(TProcessorFactory processorFactory, - TServerTransport serverTransport) { - super(processorFactory, serverTransport); - } - - public TSimpleServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - super(processorFactory, serverTransport, transportFactory, protocolFactory); - } - - public TSimpleServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - super(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - } - - public void serve() { stopped_ = false; try { diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java index 6af22084..85537bf2 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java @@ -20,21 +20,16 @@ package org.apache.thrift.server; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; -import org.apache.thrift.TProcessorFactory; -import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,126 +40,53 @@ import org.slf4j.LoggerFactory; * */ public class TThreadPoolServer extends TServer { - private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName()); - // Executor service for handling client connections - private ExecutorService executorService_; - - // Flag for stopping the server - private volatile boolean stopped_; - - // Server options - private Options options_; - - // Customizable server options - public static class Options { + public static class Args extends AbstractServerArgs { public int minWorkerThreads = 5; public int maxWorkerThreads = Integer.MAX_VALUE; public int stopTimeoutVal = 60; public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; - } - - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport) { - this(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); - } - public TThreadPoolServer(TProcessorFactory processorFactory, - TServerTransport serverTransport) { - this(processorFactory, serverTransport, - new TTransportFactory(), new TTransportFactory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); - } - - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport, - TProtocolFactory protocolFactory) { - this(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), - protocolFactory, protocolFactory); - } + public Args(TServerTransport transport) { + super(transport); + } - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - this(processor, serverTransport, - transportFactory, transportFactory, - protocolFactory, protocolFactory); - } + public Args minWorkerThreads(int n) { + minWorkerThreads = n; + return this; + } - public TThreadPoolServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - this(processorFactory, serverTransport, - transportFactory, transportFactory, - protocolFactory, protocolFactory); + public Args maxWorkerThreads(int n) { + maxWorkerThreads = n; + return this; + } } - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - this(new TProcessorFactory(processor), serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - } + // Executor service for handling client connections + private ExecutorService executorService_; - public TThreadPoolServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory) { - super(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - options_ = new Options(); - executorService_ = Executors.newCachedThreadPool(); - } + // Flag for stopping the server + private volatile boolean stopped_; - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - Options options) { - this(new TProcessorFactory(processor), serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, - options); - } + private final TimeUnit stopTimeoutUnit; - public TThreadPoolServer(TProcessorFactory processorFactory, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - Options options) { - super(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); + private final long stopTimeoutVal; - executorService_ = null; + public TThreadPoolServer(Args args) { + super(args); SynchronousQueue executorQueue = new SynchronousQueue(); - executorService_ = new ThreadPoolExecutor(options.minWorkerThreads, - options.maxWorkerThreads, + stopTimeoutUnit = args.stopTimeoutUnit; + stopTimeoutVal = args.stopTimeoutVal; + + executorService_ = new ThreadPoolExecutor(args.minWorkerThreads, + args.maxWorkerThreads, 60, TimeUnit.SECONDS, executorQueue); - - options_ = options; } @@ -198,7 +120,7 @@ public class TThreadPoolServer extends TServer { // exception. If we don't do this, then we'll shut down prematurely. We want // to let the executorService clear it's task queue, closing client sockets // appropriately. - long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal); + long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal); long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try { diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index 72a57bce..d88b8a50 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -34,6 +34,7 @@ import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.ServerTestBase; import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.server.THsHaServer.Args; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingSocket; @@ -46,13 +47,13 @@ import thrift.test.Srv.AsyncClient.primitiveMethod_call; import thrift.test.Srv.AsyncClient.voidMethod_call; public class TestTAsyncClientManager extends TestCase { - + private THsHaServer server_; private Thread serverThread_; private TAsyncClientManager clientManager_; - + public void setUp() throws Exception { - server_ = new THsHaServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(ServerTestBase.PORT)); + server_ = new THsHaServer(new Args(new TNonblockingServerSocket(ServerTestBase.PORT)).processor(new Srv.Processor(new SrvHandler()))); serverThread_ = new Thread(new Runnable() { public void run() { server_.serve(); @@ -62,18 +63,18 @@ public class TestTAsyncClientManager extends TestCase { clientManager_ = new TAsyncClientManager(); Thread.sleep(500); } - + public void tearDown() throws Exception { server_.stop(); clientManager_.stop(); serverThread_.join(); } - + public void testBasicCall() throws Exception { Srv.AsyncClient client = getClient(); basicCall(client); } - + public void testBasicCallWithTimeout() throws Exception { Srv.AsyncClient client = getClient(); client.setTimeout(5000); diff --git a/lib/java/test/org/apache/thrift/server/TestHsHaServer.java b/lib/java/test/org/apache/thrift/server/TestHsHaServer.java index f80560bd..6638a333 100644 --- a/lib/java/test/org/apache/thrift/server/TestHsHaServer.java +++ b/lib/java/test/org/apache/thrift/server/TestHsHaServer.java @@ -20,10 +20,11 @@ package org.apache.thrift.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.THsHaServer.Args; import org.apache.thrift.transport.TNonblockingServerSocket; public class TestHsHaServer extends TestNonblockingServer { protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) { - return new THsHaServer(processor, socket, protoFactory); + return new THsHaServer(new Args(socket).processor(processor).protocolFactory(protoFactory)); } } diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java index e2024351..52b62c34 100644 --- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java +++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java @@ -21,6 +21,7 @@ package org.apache.thrift.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TNonblockingServer.Args; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TTransport; @@ -31,7 +32,7 @@ public class TestNonblockingServer extends ServerTestBase { private TServer server; protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) { - return new TNonblockingServer(processor, socket, protoFactory); + return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory)); } @Override diff --git a/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java b/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java index 6066f00d..4bba4511 100644 --- a/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java +++ b/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java @@ -28,6 +28,7 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.ServerTestBase; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TSimpleServer; +import org.apache.thrift.server.TServer.Args; public class TestTSSLTransportFactory extends ServerTestBase { private Thread serverThread; @@ -52,7 +53,7 @@ public class TestTSSLTransportFactory extends ServerTestBase { public void run() { try { TServerTransport serverTransport = TSSLTransportFactory.getServerSocket(PORT); - server = new TSimpleServer(processor, serverTransport); + server = new TSimpleServer(new Args(serverTransport).processor(processor)); server.serve(); } catch (TTransportException e) { e.printStackTrace(); diff --git a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java index 10da6d10..dfd087f0 100644 --- a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java +++ b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java @@ -44,6 +44,7 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.ServerTestBase; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TSimpleServer; +import org.apache.thrift.server.TServer.Args; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -284,7 +285,7 @@ public class TestTSaslTransports extends TestCase { TTransportFactory factory = new TSaslServerTransport.Factory( WRAPPED_MECHANISM, SERVICE, HOST, WRAPPED_PROPS, new TestSaslCallbackHandler(PASSWORD)); - server = new TSimpleServer(processor, socket, factory, protoFactory); + server = new TSimpleServer(new Args(socket).processor(processor).transportFactory(factory).protocolFactory(protoFactory)); // Run it LOGGER.debug("Starting the server on port {}", PORT); -- 2.17.1