From ffcddd688aefb42191999d72726ef15de23fd4e3 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Wed, 6 Sep 2006 20:37:03 +0000 Subject: [PATCH] Thrift multithreaded Java server Summary: Ported the Pillar multithreaded Java server to Thrift git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664791 13f79535-47bb-0310-9956-ffa450edef68 --- lib/java/src/server/TServer.java | 7 ++ lib/java/src/server/TSimpleServer.java | 6 ++ lib/java/src/server/TThreadPoolServer.java | 106 +++++++++++++++++++++ lib/java/src/transport/TServerSocket.java | 19 +++- test/java/src/TestServer.java | 22 ++--- 5 files changed, 147 insertions(+), 13 deletions(-) create mode 100644 lib/java/src/server/TThreadPoolServer.java diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java index 38ef81fa..702ba694 100644 --- a/lib/java/src/server/TServer.java +++ b/lib/java/src/server/TServer.java @@ -23,6 +23,13 @@ public abstract class TServer { /** Server options */ protected Options options_; + /** + * Default options constructor + */ + protected TServer(TProcessor processor) { + this(processor, new Options()); + } + /** * Default constructor, all servers take a processor and some options. */ diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java index 352a6de9..94b739e2 100644 --- a/lib/java/src/server/TSimpleServer.java +++ b/lib/java/src/server/TSimpleServer.java @@ -15,6 +15,12 @@ public class TSimpleServer extends TServer { private TServerTransport serverTransport_; + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport) { + this(processor, new TServer.Options(), serverTransport); + } + + public TSimpleServer(TProcessor processor, TServer.Options options, TServerTransport serverTransport) { diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java new file mode 100644 index 00000000..d19275ee --- /dev/null +++ b/lib/java/src/server/TThreadPoolServer.java @@ -0,0 +1,106 @@ +package com.facebook.thrift.server; + +import com.facebook.thrift.TException; +import com.facebook.thrift.TProcessor; +import com.facebook.thrift.transport.TServerTransport; +import com.facebook.thrift.transport.TTransport; +import com.facebook.thrift.transport.TTransportException; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * Server which uses Java's built in ThreadPool management to spawn off + * a worker pool that + * + * @author Mark Slee + */ +public class TThreadPoolServer extends TServer { + + // Server transport + private TServerTransport serverTransport_; + + // Executor service for handling client connections + private ExecutorService executorService_; + + // Customizable server options + public static class Options extends TServer.Options { + public int port = 9190; + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport) { + this(processor, new Options(), serverTransport); + } + + public TThreadPoolServer(TProcessor processor, + Options options, + TServerTransport serverTransport) { + super(processor, options); + serverTransport_ = serverTransport; + executorService_ = null; + + LinkedBlockingQueue executorQueue = + new LinkedBlockingQueue(); + + executorService_ = new ThreadPoolExecutor(options.minWorkerThreads, + options.maxWorkerThreads, + 60, + TimeUnit.SECONDS, + executorQueue); + } + + + public void run() { + try { + serverTransport_.listen(); + } catch (TTransportException ttx) { + ttx.printStackTrace(); + return; + } + + while (true) { + try { + TTransport client = serverTransport_.accept(); + WorkerProcess wp = new WorkerProcess(client); + executorService_.execute(wp); + } catch (TTransportException ttx) { + ttx.printStackTrace(); + } + } + } + + private class WorkerProcess implements Runnable { + + /** + * Client that this services. + */ + private TTransport client_; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private WorkerProcess(TTransport client) { + client_ = client; + } + + /** + * Loops on processing a client forever + */ + public void run() { + try { + while (processor_.process(client_, client_)) {} + } catch (TException tx) { + tx.printStackTrace(); + } + client_.close(); + } + } +} diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java index a885fa14..8a8421d2 100644 --- a/lib/java/src/transport/TServerSocket.java +++ b/lib/java/src/transport/TServerSocket.java @@ -1,6 +1,7 @@ package com.facebook.thrift.transport; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; @@ -11,12 +12,26 @@ import java.net.Socket; */ public class TServerSocket extends TServerTransport { - private ServerSocket serverSocket_; - + private ServerSocket serverSocket_ = null; + private int port_ = 0; + public TServerSocket(ServerSocket serverSocket) { serverSocket_ = serverSocket; } + public TServerSocket(int port) throws TTransportException { + port_ = port; + try { + serverSocket_ = new ServerSocket(); + serverSocket_.setReuseAddress(true); + serverSocket_.setSoTimeout(0); + serverSocket_.bind(new InetSocketAddress(port_)); + } catch (IOException ioe) { + serverSocket_ = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + public void listen() throws TTransportException {} protected TSocket acceptImpl() throws TTransportException { diff --git a/test/java/src/TestServer.java b/test/java/src/TestServer.java index 8e3e4ed3..b5277770 100644 --- a/test/java/src/TestServer.java +++ b/test/java/src/TestServer.java @@ -5,6 +5,7 @@ import com.facebook.thrift.protocol.TBinaryProtocol; import com.facebook.thrift.protocol.TProtocol; import com.facebook.thrift.server.TServer; import com.facebook.thrift.server.TSimpleServer; +import com.facebook.thrift.server.TThreadPoolServer; import com.facebook.thrift.transport.TServerSocket; import com.facebook.thrift.transport.TServerTransport; @@ -243,23 +244,22 @@ public class TestServer { ThriftTest.Server testServer = new ThriftTest.Server(testHandler, binaryProtocol); - // Options - TServer.Options serverOptions = - new TServer.Options(); - // Transport - ServerSocket serverSocket = - new ServerSocket(port); TServerSocket tServerSocket = - new TServerSocket(serverSocket); + new TServerSocket(port); + + TServer serverEngine; - // Server - TSimpleServer simpleServer = - new TSimpleServer(testServer, serverOptions, tServerSocket); + // Simple Server + // serverEngine = new TSimpleServer(testServer, tServerSocket); + + // ThreadPool Server + serverEngine = new TThreadPoolServer(testServer, tServerSocket); // Run it System.out.println("Starting the server on port " + port + "..."); - simpleServer.run(); + serverEngine.run(); + } catch (Exception x) { x.printStackTrace(); } -- 2.17.1