Summary: Same paradigm as in C++ model. Allow ServerTransport to be interrupted to block an accept loop and cleanly stop serving client requests.
Reviewed By: dreiss
Test Plan: Invoke shutdown() method on a TServer
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665322
13f79535-47bb-0310-9956-
ffa450edef68
*/
public abstract void serve();
+ /**
+ * Stop the server. This is optional on a per-implementation basis. Not
+ * all servers are required to be cleanly stoppable.
+ */
+ public void stop() {}
+
}
*/
public class TSimpleServer extends TServer {
+ private boolean stopped_ = false;
+
public TSimpleServer(TProcessor processor,
TServerTransport serverTransport) {
super(new TProcessorFactory(processor), serverTransport);
public void serve() {
+ stopped_ = false;
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
return;
}
- while (true) {
+ while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
} catch (TTransportException ttx) {
// Client died, just move on
} catch (TException tx) {
- tx.printStackTrace();
+ if (!stopped_) {
+ tx.printStackTrace();
+ }
} catch (Exception x) {
- x.printStackTrace();
+ if (!stopped_) {
+ x.printStackTrace();
+ }
}
if (inputTransport != null) {
}
}
+
+ public void stop() {
+ stopped_ = true;
+ serverTransport_.interrupt();
+ }
}
// Executor service for handling client connections
private ExecutorService executorService_;
+ // Flag for stopping the server
+ private volatile boolean stopped_;
+
+ // Server options
+ private Options options_;
+
// Customizable server options
public static class Options {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
}
public TThreadPoolServer(TProcessor processor,
60,
TimeUnit.SECONDS,
executorQueue);
+
+ options_ = options;
}
return;
}
- while (true) {
+ stopped_ = false;
+ while (!stopped_) {
int failureCount = 0;
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
} catch (TTransportException ttx) {
- ++failureCount;
- ttx.printStackTrace();
+ if (!stopped_) {
+ ++failureCount;
+ ttx.printStackTrace();
+ }
}
}
+
+ executorService_.shutdown();
+ try {
+ executorService_.awaitTermination(options_.stopTimeoutVal,
+ options_.stopTimeoutUnit);
+ } catch (InterruptedException ix) {
+ // Ignore and more on
+ }
+ }
+
+ public void stop() {
+ stopped_ = true;
+ serverTransport_.interrupt();
}
private class WorkerProcess implements Runnable {
}
}
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
}
public abstract void close();
protected abstract TTransport acceptImpl() throws TTransportException;
+
+ /**
+ * Optional method implementation. This signals to the server transport
+ * that it should break out of any accept() or listen() that it is currently
+ * blocked on. This method, if implemented, MUST be thread safe, as it may
+ * be called from a different thread context than the other TServerTransport
+ * methods.
+ */
+ public void interrupt() {}
+
}