From: Mark Slee Date: Sat, 3 Nov 2007 05:30:32 +0000 (+0000) Subject: Add programatic shutdown option to Java Thrift servers X-Git-Tag: 0.2.0~1150 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=0502e61fb99d4aab9717e2ba42ac6d9987ea5e2d;p=common%2Fthrift.git Add programatic shutdown option to Java Thrift servers Summary: Same paradigm as in C++ model. Allow ServerTransport to be interrupted to block an accept loop and cleanly stop serving client requests. Reviewed By: dreiss Test Plan: Invoke shutdown() method on a TServer git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665322 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java index d5f95f62..2ac322ae 100644 --- a/lib/java/src/server/TServer.java +++ b/lib/java/src/server/TServer.java @@ -105,4 +105,10 @@ public abstract class TServer { */ public abstract void serve(); + /** + * Stop the server. This is optional on a per-implementation basis. Not + * all servers are required to be cleanly stoppable. + */ + public void stop() {} + } diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java index 181d8e4f..cb127c72 100644 --- a/lib/java/src/server/TSimpleServer.java +++ b/lib/java/src/server/TSimpleServer.java @@ -23,6 +23,8 @@ import com.facebook.thrift.transport.TTransportException; */ public class TSimpleServer extends TServer { + private boolean stopped_ = false; + public TSimpleServer(TProcessor processor, TServerTransport serverTransport) { super(new TProcessorFactory(processor), serverTransport); @@ -71,6 +73,7 @@ public class TSimpleServer extends TServer { public void serve() { + stopped_ = false; try { serverTransport_.listen(); } catch (TTransportException ttx) { @@ -78,7 +81,7 @@ public class TSimpleServer extends TServer { return; } - while (true) { + while (!stopped_) { TTransport client = null; TProcessor processor = null; TTransport inputTransport = null; @@ -98,9 +101,13 @@ public class TSimpleServer extends TServer { } catch (TTransportException ttx) { // Client died, just move on } catch (TException tx) { - tx.printStackTrace(); + if (!stopped_) { + tx.printStackTrace(); + } } catch (Exception x) { - x.printStackTrace(); + if (!stopped_) { + x.printStackTrace(); + } } if (inputTransport != null) { @@ -113,4 +120,9 @@ public class TSimpleServer extends TServer { } } + + public void stop() { + stopped_ = true; + serverTransport_.interrupt(); + } } diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java index 0945fbe3..94eb5a6b 100644 --- a/lib/java/src/server/TThreadPoolServer.java +++ b/lib/java/src/server/TThreadPoolServer.java @@ -34,10 +34,18 @@ public class TThreadPoolServer extends TServer { // Executor service for handling client connections private ExecutorService executorService_; + // Flag for stopping the server + private volatile boolean stopped_; + + // Server options + private Options options_; + // Customizable server options public static class Options { public int minWorkerThreads = 5; public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; } public TThreadPoolServer(TProcessor processor, @@ -120,6 +128,8 @@ public class TThreadPoolServer extends TServer { 60, TimeUnit.SECONDS, executorQueue); + + options_ = options; } @@ -131,17 +141,33 @@ public class TThreadPoolServer extends TServer { return; } - while (true) { + stopped_ = false; + while (!stopped_) { int failureCount = 0; try { TTransport client = serverTransport_.accept(); WorkerProcess wp = new WorkerProcess(client); executorService_.execute(wp); } catch (TTransportException ttx) { - ++failureCount; - ttx.printStackTrace(); + if (!stopped_) { + ++failureCount; + ttx.printStackTrace(); + } } } + + executorService_.shutdown(); + try { + executorService_.awaitTermination(options_.stopTimeoutVal, + options_.stopTimeoutUnit); + } catch (InterruptedException ix) { + // Ignore and more on + } + } + + public void stop() { + stopped_ = true; + serverTransport_.interrupt(); } private class WorkerProcess implements Runnable { diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java index 9badf1ac..cc6d9004 100644 --- a/lib/java/src/transport/TServerSocket.java +++ b/lib/java/src/transport/TServerSocket.java @@ -112,4 +112,10 @@ public class TServerSocket extends TServerTransport { } } + public void interrupt() { + // The thread-safeness of this is dubious, but Java documentation suggests + // that it is safe to do this from a different thread context + close(); + } + } diff --git a/lib/java/src/transport/TServerTransport.java b/lib/java/src/transport/TServerTransport.java index 2ffc8df8..872ac3cf 100644 --- a/lib/java/src/transport/TServerTransport.java +++ b/lib/java/src/transport/TServerTransport.java @@ -26,4 +26,14 @@ public abstract class TServerTransport { public abstract void close(); protected abstract TTransport acceptImpl() throws TTransportException; + + /** + * Optional method implementation. This signals to the server transport + * that it should break out of any accept() or listen() that it is currently + * blocked on. This method, if implemented, MUST be thread safe, as it may + * be called from a different thread context than the other TServerTransport + * methods. + */ + public void interrupt() {} + }