// for the passing of Invocations from the Selector to workers.
private ExecutorService invoker;
- protected final int worker_threads;
- protected final int stop_timeout_val;
- protected final TimeUnit stop_timeout_unit;
-
/**
* Create server with given processor, and server transport. Default server
* options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on
}
/**
- * Create server with every option fully specified.
+ * Create server with every option fully specified, with an internally managed
+ * ExecutorService
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
TProtocolFactory outputProtocolFactory,
Options options)
{
+ this(processorFactory, serverTransport,
+ outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory,
+ createInvokerPool(options),
+ options);
+ }
+
+ /**
+ * Create server with every option fully specified, and with an injected
+ * ExecutorService
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ ExecutorService executor,
+ TNonblockingServer.Options options) {
super(processorFactory, serverTransport,
outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
options);
- worker_threads = options.workerThreads;
- stop_timeout_val = options.stopTimeoutVal;
- stop_timeout_unit = options.stopTimeoutUnit;
+ invoker = executor;
}
/** @inheritDoc */
@Override
public void serve() {
- if (!startInvokerPool()) {
- return;
- }
-
// start listening, or exit
if (!startListening()) {
return;
// ungracefully shut down the invoker pool?
}
- protected boolean startInvokerPool() {
- // start the invoker pool
+ /**
+ * Helper to create an invoker pool
+ */
+ protected static ExecutorService createInvokerPool(Options options) {
+ int workerThreads = options.workerThreads;
+ int stopTimeoutVal = options.stopTimeoutVal;
+ TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
- invoker = new ThreadPoolExecutor(worker_threads, worker_threads,
- stop_timeout_val, stop_timeout_unit, queue);
+ ExecutorService invoker = new ThreadPoolExecutor(workerThreads, workerThreads,
+ stopTimeoutVal, stopTimeoutUnit, queue);
- return true;
+ return invoker;
}
protected void gracefullyShutdownInvokerPool() {