From 5fc71fd56a522dc2e1b0be5012848276bc105752 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Thu, 18 Aug 2011 00:02:50 +0000 Subject: [PATCH] THRIFT-1167. java: Java nonblocking server with more than one thread for select and handling IO This patch refactors the nonblocking server hierarchy and adds in a new server that has a threaded selector pool as well as a threaded invoker pool. Patch: Steve Jiang git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1158977 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/AbstractNonblockingServer.java | 552 +++++++++++++++ .../org/apache/thrift/server/Invocation.java | 20 + .../org/apache/thrift/server/THsHaServer.java | 70 +- .../thrift/server/TNonblockingServer.java | 532 +-------------- .../server/TThreadedSelectorServer.java | 646 ++++++++++++++++++ .../server/TestThreadedSelectorServer.java | 30 + 6 files changed, 1294 insertions(+), 556 deletions(-) create mode 100644 lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java create mode 100644 lib/java/src/org/apache/thrift/server/Invocation.java create mode 100644 lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java create mode 100644 lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java new file mode 100644 index 00000000..2bd74fa8 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.server; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides common methods and classes used by nonblocking TServer + * implementations. + */ +public abstract class AbstractNonblockingServer extends TServer { + protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); + + public static abstract class AbstractNonblockingServerArgs> extends AbstractServerArgs { + public long maxReadBufferBytes = Long.MAX_VALUE; + + public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { + super(transport); + transportFactory(new TFramedTransport.Factory()); + } + } + + /** + * The maximum amount of memory we will allocate to client IO buffers at a + * time. Without this limit, the server will gladly allocate client buffers + * right into an out of memory exception, rather than waiting. + */ + private final long MAX_READ_BUFFER_BYTES; + + /** + * How many bytes are currently allocated to read buffers. + */ + private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); + + public AbstractNonblockingServer(AbstractNonblockingServerArgs args) { + super(args); + MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; + } + + /** + * Begin accepting connections and processing invocations. + */ + public void serve() { + // start any IO threads + if (!startThreads()) { + return; + } + + // start listening, or exit + if (!startListening()) { + return; + } + + setServing(true); + + // this will block while we serve + waitForShutdown(); + + setServing(false); + + // do a little cleanup + stopListening(); + } + + /** + * Starts any threads required for serving. + * + * @return true if everything went ok, false if threads could not be started. + */ + protected abstract boolean startThreads(); + + /** + * A method that will block until when threads handling the serving have been + * shut down. + */ + protected abstract void waitForShutdown(); + + /** + * Have the server transport start accepting connections. + * + * @return true if we started listening successfully, false if something went + * wrong. + */ + protected boolean startListening() { + try { + serverTransport_.listen(); + return true; + } catch (TTransportException ttx) { + LOGGER.error("Failed to start listening on server socket!", ttx); + return false; + } + } + + /** + * Stop listening for connections. + */ + protected void stopListening() { + serverTransport_.close(); + } + + /** + * Perform an invocation. This method could behave several different ways - + * invoke immediately inline, queue for separate execution, etc. + * + * @return true if invocation was successfully requested, which is not a + * guarantee that invocation has completed. False if the request + * failed. + */ + protected abstract boolean requestInvoke(FrameBuffer frameBuffer); + + /** + * An abstract thread that handles selecting on a set of transports and + * {@link FrameBuffer FrameBuffers} associated with selected keys + * corresponding to requests. + */ + protected abstract class AbstractSelectThread extends Thread { + protected final Selector selector; + + // List of FrameBuffers that want to change their selection interests. + protected final Set selectInterestChanges = new HashSet(); + + public AbstractSelectThread() throws IOException { + this.selector = SelectorProvider.provider().openSelector(); + } + + /** + * If the selector is blocked, wake it up. + */ + public void wakeupSelector() { + selector.wakeup(); + } + + /** + * Add FrameBuffer to the list of select interest changes and wake up the + * selector if it's blocked. When the select() call exits, it'll give the + * FrameBuffer a chance to change its interests. + */ + public void requestSelectInterestChange(FrameBuffer frameBuffer) { + synchronized (selectInterestChanges) { + selectInterestChanges.add(frameBuffer); + } + // wakeup the selector, if it's currently blocked. + selector.wakeup(); + } + + /** + * Check to see if there are any FrameBuffers that have switched their + * interest type from read to write or vice versa. + */ + protected void processInterestChanges() { + synchronized (selectInterestChanges) { + for (FrameBuffer fb : selectInterestChanges) { + fb.changeSelectInterests(); + } + selectInterestChanges.clear(); + } + } + + /** + * Do the work required to read from a readable client. If the frame is + * fully read, then invoke the method call. + */ + protected void handleRead(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (!buffer.read()) { + cleanupSelectionKey(key); + return; + } + + // if the buffer's frame read is complete, invoke the method. + if (buffer.isFrameFullyRead()) { + if (!requestInvoke(buffer)) { + cleanupSelectionKey(key); + } + } + } + + /** + * Let a writable client get written, if there's data to be written. + */ + protected void handleWrite(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (!buffer.write()) { + cleanupSelectionKey(key); + } + } + + /** + * Do connection-close cleanup on a given SelectionKey. + */ + protected void cleanupSelectionKey(SelectionKey key) { + // remove the records from the two maps + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (buffer != null) { + // close the buffer + buffer.close(); + } + // cancel the selection key + key.cancel(); + } + } // SelectThread + + /** + * Possible states for the FrameBuffer state machine. + */ + private enum FrameBufferState { + // in the midst of reading the frame size off the wire + READING_FRAME_SIZE, + // reading the actual frame data now, but not all the way done yet + READING_FRAME, + // completely read the frame, so an invocation can now happen + READ_FRAME_COMPLETE, + // waiting to get switched to listening for write events + AWAITING_REGISTER_WRITE, + // started writing response data, not fully complete yet + WRITING, + // another thread wants this framebuffer to go back to reading + AWAITING_REGISTER_READ, + // we want our transport and selection key invalidated in the selector + // thread + AWAITING_CLOSE + } + + /** + * Class that implements a sort of state machine around the interaction with a + * client and an invoker. It manages reading the frame size and frame data, + * getting it handed off as wrapped transports, and then the writing of + * response data back to the client. In the process it manages flipping the + * read and write bits on the selection key for its client. + */ + protected class FrameBuffer { + // the actual transport hooked up to the client. + private final TNonblockingTransport trans_; + + // the SelectionKey that corresponds to our transport + private final SelectionKey selectionKey_; + + // the SelectThread that owns the registration of our transport + private final AbstractSelectThread selectThread_; + + // where in the process of reading/writing are we? + private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE; + + // the ByteBuffer we'll be using to write and read, depending on the state + private ByteBuffer buffer_; + + private TByteArrayOutputStream response_; + + public FrameBuffer(final TNonblockingTransport trans, + final SelectionKey selectionKey, + final AbstractSelectThread selectThread) { + trans_ = trans; + selectionKey_ = selectionKey; + selectThread_ = selectThread; + buffer_ = ByteBuffer.allocate(4); + } + + /** + * Give this FrameBuffer a chance to read. The selector loop should have + * received a read event for this FrameBuffer. + * + * @return true if the connection should live on, false if it should be + * closed + */ + public boolean read() { + if (state_ == FrameBufferState.READING_FRAME_SIZE) { + // try to read the frame size completely + if (!internalRead()) { + return false; + } + + // if the frame size has been read completely, then prepare to read the + // actual frame. + if (buffer_.remaining() == 0) { + // pull out the frame size as an integer. + int frameSize = buffer_.getInt(0); + if (frameSize <= 0) { + LOGGER.error("Read an invalid frame size of " + frameSize + + ". Are you using TFramedTransport on the client side?"); + return false; + } + + // if this frame will always be too large for this server, log the + // error and close the connection. + if (frameSize > MAX_READ_BUFFER_BYTES) { + LOGGER.error("Read a frame size of " + frameSize + + ", which is bigger than the maximum allowable buffer size for ALL connections."); + return false; + } + + // if this frame will push us over the memory limit, then return. + // with luck, more memory will free up the next time around. + if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { + return true; + } + + // increment the amount of memory allocated to read buffers + readBufferBytesAllocated.addAndGet(frameSize); + + // reallocate the readbuffer as a frame-sized buffer + buffer_ = ByteBuffer.allocate(frameSize); + + state_ = FrameBufferState.READING_FRAME; + } else { + // this skips the check of READING_FRAME state below, since we can't + // possibly go on to that state if there's data left to be read at + // this one. + return true; + } + } + + // it is possible to fall through from the READING_FRAME_SIZE section + // to READING_FRAME if there's already some frame data available once + // READING_FRAME_SIZE is complete. + + if (state_ == FrameBufferState.READING_FRAME) { + if (!internalRead()) { + return false; + } + + // since we're already in the select loop here for sure, we can just + // modify our selection key directly. + if (buffer_.remaining() == 0) { + // get rid of the read select interests + selectionKey_.interestOps(0); + state_ = FrameBufferState.READ_FRAME_COMPLETE; + } + + return true; + } + + // if we fall through to this point, then the state must be invalid. + LOGGER.error("Read was called but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to write its output to the final client. + */ + public boolean write() { + if (state_ == FrameBufferState.WRITING) { + try { + if (trans_.write(buffer_) < 0) { + return false; + } + } catch (IOException e) { + LOGGER.warn("Got an IOException during write!", e); + return false; + } + + // we're done writing. now we need to switch back to reading. + if (buffer_.remaining() == 0) { + prepareRead(); + } + return true; + } + + LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to set its interest to write, once data + * has come in. + */ + public void changeSelectInterests() { + if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) { + // set the OP_WRITE interest + selectionKey_.interestOps(SelectionKey.OP_WRITE); + state_ = FrameBufferState.WRITING; + } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) { + prepareRead(); + } else if (state_ == FrameBufferState.AWAITING_CLOSE) { + close(); + selectionKey_.cancel(); + } else { + LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")"); + } + } + + /** + * Shut the connection down. + */ + public void close() { + // if we're being closed due to an error, we might have allocated a + // buffer that we need to subtract for our memory accounting. + if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) { + readBufferBytesAllocated.addAndGet(-buffer_.array().length); + } + trans_.close(); + } + + /** + * Check if this FrameBuffer has a full frame read. + */ + public boolean isFrameFullyRead() { + return state_ == FrameBufferState.READ_FRAME_COMPLETE; + } + + /** + * After the processor has processed the invocation, whatever thread is + * managing invocations should call this method on this FrameBuffer so we + * know it's time to start trying to write again. Also, if it turns out that + * there actually isn't any data in the response buffer, we'll skip trying + * to write and instead go back to reading. + */ + public void responseReady() { + // the read buffer is definitely no longer in use, so we will decrement + // our read buffer count. we do this here as well as in close because + // we'd like to free this read memory up as quickly as possible for other + // clients. + readBufferBytesAllocated.addAndGet(-buffer_.array().length); + + if (response_.len() == 0) { + // go straight to reading again. this was probably an oneway method + state_ = FrameBufferState.AWAITING_REGISTER_READ; + buffer_ = null; + } else { + buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); + + // set state that we're waiting to be switched to write. we do this + // asynchronously through requestSelectInterestChange() because there is + // a possibility that we're not in the main thread, and thus currently + // blocked in select(). (this functionality is in place for the sake of + // the HsHa server.) + state_ = FrameBufferState.AWAITING_REGISTER_WRITE; + } + requestSelectInterestChange(); + } + + /** + * Actually invoke the method signified by this FrameBuffer. + */ + public void invoke() { + TTransport inTrans = getInputTransport(); + TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); + TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); + + try { + processorFactory_.getProcessor(inTrans).process(inProt, outProt); + responseReady(); + return; + } catch (TException te) { + LOGGER.warn("Exception while invoking!", te); + } catch (Exception e) { + LOGGER.error("Unexpected exception while invoking!", e); + } + // This will only be reached when there is an exception. + state_ = FrameBufferState.AWAITING_CLOSE; + requestSelectInterestChange(); + } + + /** + * Wrap the read buffer in a memory-based transport so a processor can read + * the data it needs to handle an invocation. + */ + private TTransport getInputTransport() { + return new TMemoryInputTransport(buffer_.array()); + } + + /** + * Get the transport that should be used by the invoker for responding. + */ + private TTransport getOutputTransport() { + response_ = new TByteArrayOutputStream(); + return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); + } + + /** + * Perform a read into buffer. + * + * @return true if the read succeeded, false if there was an error or the + * connection closed. + */ + private boolean internalRead() { + try { + if (trans_.read(buffer_) < 0) { + return false; + } + return true; + } catch (IOException e) { + LOGGER.warn("Got an IOException in internalRead!", e); + return false; + } + } + + /** + * We're done writing, so reset our interest ops and change state + * accordingly. + */ + private void prepareRead() { + // we can set our interest directly without using the queue because + // we're in the select thread. + selectionKey_.interestOps(SelectionKey.OP_READ); + // get ready for another go-around + buffer_ = ByteBuffer.allocate(4); + state_ = FrameBufferState.READING_FRAME_SIZE; + } + + /** + * When this FrameBuffer needs to change its select interests and execution + * might not be in its select thread, then this method will make sure the + * interest change gets done when the select thread wakes back up. When the + * current thread is this FrameBuffer's select thread, then it just does the + * interest change immediately. + */ + private void requestSelectInterestChange() { + if (Thread.currentThread() == this.selectThread_) { + changeSelectInterests(); + } else { + this.selectThread_.requestSelectInterestChange(this); + } + } + } // FrameBuffer +} diff --git a/lib/java/src/org/apache/thrift/server/Invocation.java b/lib/java/src/org/apache/thrift/server/Invocation.java new file mode 100644 index 00000000..e8210f41 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/Invocation.java @@ -0,0 +1,20 @@ +package org.apache.thrift.server; + +import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer; + +/** + * An Invocation represents a method call that is prepared to execute, given + * an idle worker thread. It contains the input and output protocols the + * thread's processor should use to perform the usual Thrift invocation. + */ +class Invocation implements Runnable { + private final FrameBuffer frameBuffer; + + public Invocation(final FrameBuffer frameBuffer) { + this.frameBuffer = frameBuffer; + } + + public void run() { + frameBuffer.invoke(); + } +} \ No newline at end of file diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java index f3dfd0a5..35411540 100644 --- a/lib/java/src/org/apache/thrift/server/THsHaServer.java +++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java @@ -27,16 +27,12 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.thrift.transport.TNonblockingServerTransport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. * Like TNonblockingServer, it relies on the use of TFramedTransport. */ public class THsHaServer extends TNonblockingServer { - private static final Logger LOGGER = - LoggerFactory.getLogger(THsHaServer.class.getName()); public static class Args extends AbstractNonblockingServerArgs { private int workerThreads = 5; @@ -85,46 +81,30 @@ public class THsHaServer extends TNonblockingServer { } } + // This wraps all the functionality of queueing and thread pool management // for the passing of Invocations from the Selector to workers. - private ExecutorService invoker; + private final ExecutorService invoker; + + private final Args args; /** - * Create server with every option fully specified, and with an injected - * ExecutorService + * Create the server with the specified Args configuration */ public THsHaServer(Args args) { super(args); invoker = args.executorService == null ? createInvokerPool(args) : args.executorService; + this.args = args; } - /** @inheritDoc */ + /** + * @inheritDoc + */ @Override - public void serve() { - // start listening, or exit - if (!startListening()) { - return; - } - - // start the selector, or exit - if (!startSelectorThread()) { - return; - } - - setServing(true); - - // this will block while we serve + protected void waitForShutdown() { joinSelector(); - gracefullyShutdownInvokerPool(); - - setServing(false); - - // do a little cleanup - stopListening(); - - // ungracefully shut down the invoker pool? } /** @@ -136,12 +116,13 @@ public class THsHaServer extends TNonblockingServer { TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; LinkedBlockingQueue queue = new LinkedBlockingQueue(); - ExecutorService invoker = new ThreadPoolExecutor(workerThreads, workerThreads, - stopTimeoutVal, stopTimeoutUnit, queue); + ExecutorService invoker = new ThreadPoolExecutor(workerThreads, + workerThreads, stopTimeoutVal, stopTimeoutUnit, queue); return invoker; } + protected void gracefullyShutdownInvokerPool() { // try to gracefully shut down the executor service invoker.shutdown(); @@ -150,7 +131,7 @@ public class THsHaServer extends TNonblockingServer { // exception. If we don't do this, then we'll shut down prematurely. We want // to let the executorService clear it's task queue, closing client sockets // appropriately. - long timeoutMS = 10000; + long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try { @@ -166,7 +147,8 @@ public class THsHaServer extends TNonblockingServer { /** * We override the standard invoke method here to queue the invocation for - * invoker service instead of immediately invoking. The thread pool takes care of the rest. + * invoker service instead of immediately invoking. The thread pool takes care + * of the rest. */ @Override protected boolean requestInvoke(FrameBuffer frameBuffer) { @@ -181,24 +163,6 @@ public class THsHaServer extends TNonblockingServer { } protected Runnable getRunnable(FrameBuffer frameBuffer){ - return new Invocation(frameBuffer); - } - - /** - * An Invocation represents a method call that is prepared to execute, given - * an idle worker thread. It contains the input and output protocols the - * thread's processor should use to perform the usual Thrift invocation. - */ - private class Invocation implements Runnable { - - private final FrameBuffer frameBuffer; - - public Invocation(final FrameBuffer frameBuffer) { - this.frameBuffer = frameBuffer; - } - - public void run() { - frameBuffer.invoke(); - } + return new Invocation(frameBuffer); } } diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index d44d460b..7afd4b35 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -21,27 +21,12 @@ package org.apache.thrift.server; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.HashSet; import java.util.Iterator; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.thrift.TByteArrayOutputStream; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.transport.TMemoryInputTransport; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingTransport; -import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A nonblocking TServer implementation. This allows for fairness amongst all @@ -54,9 +39,7 @@ import org.slf4j.LoggerFactory; * transport, otherwise this server will be unable to determine when a whole * method call has been read off the wire. Clients must also use TFramedTransport. */ -public class TNonblockingServer extends TServer { - private static final Logger LOGGER = - LoggerFactory.getLogger(TNonblockingServer.class.getName()); +public class TNonblockingServer extends AbstractNonblockingServer { public static class Args extends AbstractNonblockingServerArgs { public Args(TNonblockingServerTransport transport) { @@ -64,97 +47,29 @@ public class TNonblockingServer extends TServer { } } - public static abstract class AbstractNonblockingServerArgs> extends AbstractServerArgs { - public long maxReadBufferBytes = Long.MAX_VALUE; - - public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { - super(transport); - transportFactory(new TFramedTransport.Factory()); - } - } - // Flag for stopping the server private volatile boolean stopped_ = true; - private SelectThread selectThread_; - - /** - * The maximum amount of memory we will allocate to client IO buffers at a - * time. Without this limit, the server will gladly allocate client buffers - * right into an out of memory exception, rather than waiting. - */ - private final long MAX_READ_BUFFER_BYTES; - - /** - * How many bytes are currently allocated to read buffers. - */ - private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); + private SelectAcceptThread selectAcceptThread_; public TNonblockingServer(AbstractNonblockingServerArgs args) { super(args); - MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; - } - - /** - * Begin accepting connections and processing invocations. - */ - public void serve() { - // start listening, or exit - if (!startListening()) { - return; - } - - // start the selector, or exit - if (!startSelectorThread()) { - return; - } - - setServing(true); - - // this will block while we serve - joinSelector(); - - setServing(false); - - // do a little cleanup - stopListening(); } - /** - * Have the server transport start accepting connections. - * - * @return true if we started listening successfully, false if something went - * wrong. - */ - protected boolean startListening() { - try { - serverTransport_.listen(); - return true; - } catch (TTransportException ttx) { - LOGGER.error("Failed to start listening on server socket!", ttx); - return false; - } - } - - /** - * Stop listening for connections. - */ - protected void stopListening() { - serverTransport_.close(); - } /** - * Start the selector thread running to deal with clients. + * Start the selector thread to deal with accepts and client messages. * * @return true if everything went ok, false if we couldn't start for some * reason. */ - protected boolean startSelectorThread() { + @Override + protected boolean startThreads() { // start the selector try { - selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_); + selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_); stopped_ = false; - selectThread_.start(); + selectAcceptThread_.start(); return true; } catch (IOException e) { LOGGER.error("Failed to start selector thread!", e); @@ -162,13 +77,18 @@ public class TNonblockingServer extends TServer { } } + @Override + protected void waitForShutdown() { + joinSelector(); + } + /** - * Block until the selector exits. + * Block until the selector thread exits. */ protected void joinSelector() { // wait until the selector thread exits try { - selectThread_.join(); + selectAcceptThread_.join(); } catch (InterruptedException e) { // for now, just silently ignore. technically this means we'll have less of // a graceful shutdown as a result. @@ -178,10 +98,11 @@ public class TNonblockingServer extends TServer { /** * Stop serving and shut everything down. */ + @Override public void stop() { stopped_ = true; - if (selectThread_ != null) { - selectThread_.wakeupSelector(); + if (selectAcceptThread_ != null) { + selectAcceptThread_.wakeupSelector(); } } @@ -189,43 +110,33 @@ public class TNonblockingServer extends TServer { * Perform an invocation. This method could behave several different ways * - invoke immediately inline, queue for separate execution, etc. */ + @Override protected boolean requestInvoke(FrameBuffer frameBuffer) { frameBuffer.invoke(); return true; } - /** - * A FrameBuffer wants to change its selection preferences, but might not be - * in the select thread. - */ - protected void requestSelectInterestChange(FrameBuffer frameBuffer) { - selectThread_.requestSelectInterestChange(frameBuffer); - } public boolean isStopped() { - return selectThread_.isStopped(); + return selectAcceptThread_.isStopped(); } /** * The thread that will be doing all the selecting, managing new connections * and those that still need to be read. */ - protected class SelectThread extends Thread { + protected class SelectAcceptThread extends AbstractSelectThread { + // The server transport on which new client transports will be accepted private final TNonblockingServerTransport serverTransport; - private final Selector selector; - - // List of FrameBuffers that want to change their selection interests. - private final Set selectInterestChanges = - new HashSet(); /** - * Set up the SelectorThread. + * Set up the thread that will handle the non-blocking accepts, reads, and + * writes. */ - public SelectThread(final TNonblockingServerTransport serverTransport) + public SelectAcceptThread(final TNonblockingServerTransport serverTransport) throws IOException { this.serverTransport = serverTransport; - this.selector = SelectorProvider.provider().openSelector(); serverTransport.registerSelector(selector); } @@ -250,26 +161,6 @@ public class TNonblockingServer extends TServer { } } - /** - * If the selector is blocked, wake it up. - */ - public void wakeupSelector() { - selector.wakeup(); - } - - /** - * Add FrameBuffer to the list of select interest changes and wake up the - * selector if it's blocked. When the select() call exits, it'll give the - * FrameBuffer a chance to change its interests. - */ - public void requestSelectInterestChange(FrameBuffer frameBuffer) { - synchronized (selectInterestChanges) { - selectInterestChanges.add(frameBuffer); - } - // wakeup the selector, if it's currently blocked. - selector.wakeup(); - } - /** * Select and process IO events appropriately: * If there are connections to be accepted, accept them. @@ -291,7 +182,7 @@ public class TNonblockingServer extends TServer { // skip if not valid if (!key.isValid()) { - cleanupSelectionkey(key); + cleanupSelectionKey(key); continue; } @@ -314,19 +205,6 @@ public class TNonblockingServer extends TServer { } } - /** - * Check to see if there are any FrameBuffers that have switched their - * interest type from read to write or vice versa. - */ - private void processInterestChanges() { - synchronized (selectInterestChanges) { - for (FrameBuffer fb : selectInterestChanges) { - fb.changeSelectInterests(); - } - selectInterestChanges.clear(); - } - } - /** * Accept a new connection. */ @@ -339,368 +217,16 @@ public class TNonblockingServer extends TServer { clientKey = client.registerSelector(selector, SelectionKey.OP_READ); // add this key to the map - FrameBuffer frameBuffer = new FrameBuffer(client, clientKey); + FrameBuffer frameBuffer = new FrameBuffer(client, clientKey, + SelectAcceptThread.this); clientKey.attach(frameBuffer); } catch (TTransportException tte) { // something went wrong accepting. LOGGER.warn("Exception trying to accept!", tte); tte.printStackTrace(); - if (clientKey != null) cleanupSelectionkey(clientKey); + if (clientKey != null) cleanupSelectionKey(clientKey); if (client != null) client.close(); } } - - /** - * Do the work required to read from a readable client. If the frame is - * fully read, then invoke the method call. - */ - private void handleRead(SelectionKey key) { - FrameBuffer buffer = (FrameBuffer)key.attachment(); - if (!buffer.read()) { - cleanupSelectionkey(key); - return; - } - - // if the buffer's frame read is complete, invoke the method. - if (buffer.isFrameFullyRead()) { - if (!requestInvoke(buffer)) { - cleanupSelectionkey(key); - } - } - } - - /** - * Let a writable client get written, if there's data to be written. - */ - private void handleWrite(SelectionKey key) { - FrameBuffer buffer = (FrameBuffer)key.attachment(); - if (!buffer.write()) { - cleanupSelectionkey(key); - } - } - - /** - * Do connection-close cleanup on a given SelectionKey. - */ - private void cleanupSelectionkey(SelectionKey key) { - // remove the records from the two maps - FrameBuffer buffer = (FrameBuffer)key.attachment(); - if (buffer != null) { - // close the buffer - buffer.close(); - } - // cancel the selection key - key.cancel(); - } - } // SelectorThread - - /** - * Class that implements a sort of state machine around the interaction with - * a client and an invoker. It manages reading the frame size and frame data, - * getting it handed off as wrapped transports, and then the writing of - * response data back to the client. In the process it manages flipping the - * read and write bits on the selection key for its client. - */ - protected class FrameBuffer { - // - // Possible states for the FrameBuffer state machine. - // - // in the midst of reading the frame size off the wire - private static final int READING_FRAME_SIZE = 1; - // reading the actual frame data now, but not all the way done yet - private static final int READING_FRAME = 2; - // completely read the frame, so an invocation can now happen - private static final int READ_FRAME_COMPLETE = 3; - // waiting to get switched to listening for write events - private static final int AWAITING_REGISTER_WRITE = 4; - // started writing response data, not fully complete yet - private static final int WRITING = 6; - // another thread wants this framebuffer to go back to reading - private static final int AWAITING_REGISTER_READ = 7; - // we want our transport and selection key invalidated in the selector thread - private static final int AWAITING_CLOSE = 8; - - // - // Instance variables - // - - // the actual transport hooked up to the client. - public final TNonblockingTransport trans_; - - // the SelectionKey that corresponds to our transport - private final SelectionKey selectionKey_; - - // where in the process of reading/writing are we? - private int state_ = READING_FRAME_SIZE; - - // the ByteBuffer we'll be using to write and read, depending on the state - private ByteBuffer buffer_; - - private TByteArrayOutputStream response_; - - public FrameBuffer( final TNonblockingTransport trans, - final SelectionKey selectionKey) { - trans_ = trans; - selectionKey_ = selectionKey; - buffer_ = ByteBuffer.allocate(4); - } - - /** - * Give this FrameBuffer a chance to read. The selector loop should have - * received a read event for this FrameBuffer. - * - * @return true if the connection should live on, false if it should be - * closed - */ - public boolean read() { - if (state_ == READING_FRAME_SIZE) { - // try to read the frame size completely - if (!internalRead()) { - return false; - } - - // if the frame size has been read completely, then prepare to read the - // actual frame. - if (buffer_.remaining() == 0) { - // pull out the frame size as an integer. - int frameSize = buffer_.getInt(0); - if (frameSize <= 0) { - LOGGER.error("Read an invalid frame size of " + frameSize - + ". Are you using TFramedTransport on the client side?"); - return false; - } - - // if this frame will always be too large for this server, log the - // error and close the connection. - if (frameSize > MAX_READ_BUFFER_BYTES) { - LOGGER.error("Read a frame size of " + frameSize - + ", which is bigger than the maximum allowable buffer size for ALL connections."); - return false; - } - - // if this frame will push us over the memory limit, then return. - // with luck, more memory will free up the next time around. - if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { - return true; - } - - // increment the amount of memory allocated to read buffers - readBufferBytesAllocated.addAndGet(frameSize); - - // reallocate the readbuffer as a frame-sized buffer - buffer_ = ByteBuffer.allocate(frameSize); - - state_ = READING_FRAME; - } else { - // this skips the check of READING_FRAME state below, since we can't - // possibly go on to that state if there's data left to be read at - // this one. - return true; - } - } - - // it is possible to fall through from the READING_FRAME_SIZE section - // to READING_FRAME if there's already some frame data available once - // READING_FRAME_SIZE is complete. - - if (state_ == READING_FRAME) { - if (!internalRead()) { - return false; - } - - // since we're already in the select loop here for sure, we can just - // modify our selection key directly. - if (buffer_.remaining() == 0) { - // get rid of the read select interests - selectionKey_.interestOps(0); - state_ = READ_FRAME_COMPLETE; - } - - return true; - } - - // if we fall through to this point, then the state must be invalid. - LOGGER.error("Read was called but state is invalid (" + state_ + ")"); - return false; - } - - /** - * Give this FrameBuffer a chance to write its output to the final client. - */ - public boolean write() { - if (state_ == WRITING) { - try { - if (trans_.write(buffer_) < 0) { - return false; - } - } catch (IOException e) { - LOGGER.warn("Got an IOException during write!", e); - return false; - } - - // we're done writing. now we need to switch back to reading. - if (buffer_.remaining() == 0) { - prepareRead(); - } - return true; - } - - LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); - return false; - } - - /** - * Give this FrameBuffer a chance to set its interest to write, once data - * has come in. - */ - public void changeSelectInterests() { - if (state_ == AWAITING_REGISTER_WRITE) { - // set the OP_WRITE interest - selectionKey_.interestOps(SelectionKey.OP_WRITE); - state_ = WRITING; - } else if (state_ == AWAITING_REGISTER_READ) { - prepareRead(); - } else if (state_ == AWAITING_CLOSE){ - close(); - selectionKey_.cancel(); - } else { - LOGGER.error( - "changeSelectInterest was called, but state is invalid (" - + state_ + ")"); - } - } - - /** - * Shut the connection down. - */ - public void close() { - // if we're being closed due to an error, we might have allocated a - // buffer that we need to subtract for our memory accounting. - if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) { - readBufferBytesAllocated.addAndGet(-buffer_.array().length); - } - trans_.close(); - } - - /** - * Check if this FrameBuffer has a full frame read. - */ - public boolean isFrameFullyRead() { - return state_ == READ_FRAME_COMPLETE; - } - - /** - * After the processor has processed the invocation, whatever thread is - * managing invocations should call this method on this FrameBuffer so we - * know it's time to start trying to write again. Also, if it turns out - * that there actually isn't any data in the response buffer, we'll skip - * trying to write and instead go back to reading. - */ - public void responseReady() { - // the read buffer is definitely no longer in use, so we will decrement - // our read buffer count. we do this here as well as in close because - // we'd like to free this read memory up as quickly as possible for other - // clients. - readBufferBytesAllocated.addAndGet(-buffer_.array().length); - - if (response_.len() == 0) { - // go straight to reading again. this was probably an oneway method - state_ = AWAITING_REGISTER_READ; - buffer_ = null; - } else { - buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); - - // set state that we're waiting to be switched to write. we do this - // asynchronously through requestSelectInterestChange() because there is a - // possibility that we're not in the main thread, and thus currently - // blocked in select(). (this functionality is in place for the sake of - // the HsHa server.) - state_ = AWAITING_REGISTER_WRITE; - } - requestSelectInterestChange(); - } - - /** - * Actually invoke the method signified by this FrameBuffer. - */ - public void invoke() { - TTransport inTrans = getInputTransport(); - TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); - TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); - - try { - processorFactory_.getProcessor(inTrans).process(inProt, outProt); - responseReady(); - return; - } catch (TException te) { - LOGGER.warn("Exception while invoking!", te); - } catch (Exception e) { - LOGGER.error("Unexpected exception while invoking!", e); - } - // This will only be reached when there is an exception. - state_ = AWAITING_CLOSE; - requestSelectInterestChange(); - } - - /** - * Wrap the read buffer in a memory-based transport so a processor can read - * the data it needs to handle an invocation. - */ - private TTransport getInputTransport() { - return new TMemoryInputTransport(buffer_.array()); - } - - /** - * Get the transport that should be used by the invoker for responding. - */ - private TTransport getOutputTransport() { - response_ = new TByteArrayOutputStream(); - return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); - } - - /** - * Perform a read into buffer. - * - * @return true if the read succeeded, false if there was an error or the - * connection closed. - */ - private boolean internalRead() { - try { - if (trans_.read(buffer_) < 0) { - return false; - } - return true; - } catch (IOException e) { - LOGGER.warn("Got an IOException in internalRead!", e); - return false; - } - } - - /** - * We're done writing, so reset our interest ops and change state accordingly. - */ - private void prepareRead() { - // we can set our interest directly without using the queue because - // we're in the select thread. - selectionKey_.interestOps(SelectionKey.OP_READ); - // get ready for another go-around - buffer_ = ByteBuffer.allocate(4); - state_ = READING_FRAME_SIZE; - } - - /** - * When this FrameBuffer needs to change it's select interests and execution - * might not be in the select thread, then this method will make sure the - * interest change gets done when the select thread wakes back up. When the - * current thread is the select thread, then it just does the interest change - * immediately. - */ - private void requestSelectInterestChange() { - if (Thread.currentThread() == selectThread_) { - changeSelectInterests(); - } else { - TNonblockingServer.this.requestSelectInterestChange(this); - } - } - } // FrameBuffer + } // SelectAcceptThread } diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java new file mode 100644 index 00000000..4cf5f1b5 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.server; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Half-Sync/Half-Async server with a separate pool of threads to handle + * non-blocking I/O. Accepts are handled on a single thread, and a configurable + * number of nonblocking selector threads manage reading and writing of client + * connections. A synchronous worker thread pool handles processing of requests. + * + * Performs better than TNonblockingServer/THsHaServer in multi-core + * environments when the the bottleneck is CPU on the single selector thread + * handling I/O. In addition, because the accept handling is decoupled from + * reads/writes and invocation, the server has better ability to handle back- + * pressure from new connections (e.g. stop accepting when busy). + * + * Like TNonblockingServer, it relies on the use of TFramedTransport. + */ +public class TThreadedSelectorServer extends AbstractNonblockingServer { + private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName()); + + public static class Args extends AbstractNonblockingServerArgs { + + /** The number of threads for selecting on already-accepted connections */ + public int selectorThreads = 2; + /** + * The size of the executor service (if none is specified) that will handle + * invocations. This may be set to 0, in which case invocations will be + * handled directly on the selector threads (as is in TNonblockingServer) + */ + private int workerThreads = 5; + /** Time to wait for server to stop gracefully */ + private int stopTimeoutVal = 60; + private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + /** The ExecutorService for handling dispatched requests */ + private ExecutorService executorService = null; + /** + * The size of the blocking queue per selector thread for passing accepted + * connections to the selector thread + */ + private int acceptQueueSizePerThread = 4; + + /** + * Determines the strategy for handling new accepted connections. + */ + public static enum AcceptPolicy { + /** + * Require accepted connection registration to be handled by the executor. + * If the worker pool is saturated, further accepts will be closed + * immediately. Slightly increases latency due to an extra scheduling. + */ + FAIR_ACCEPT, + /** + * Handle the accepts as fast as possible, disregarding the status of the + * executor service. + */ + FAST_ACCEPT + } + + private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT; + + public Args(TNonblockingServerTransport transport) { + super(transport); + } + + public Args selectorThreads(int i) { + selectorThreads = i; + return this; + } + + public int getSelectorThreads() { + return selectorThreads; + } + + public Args workerThreads(int i) { + workerThreads = i; + return this; + } + + public int getWorkerThreads() { + return workerThreads; + } + + public int getStopTimeoutVal() { + return stopTimeoutVal; + } + + public Args stopTimeoutVal(int stopTimeoutVal) { + this.stopTimeoutVal = stopTimeoutVal; + return this; + } + + public TimeUnit getStopTimeoutUnit() { + return stopTimeoutUnit; + } + + public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { + this.stopTimeoutUnit = stopTimeoutUnit; + return this; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public Args executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public int getAcceptQueueSizePerThread() { + return acceptQueueSizePerThread; + } + + public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) { + this.acceptQueueSizePerThread = acceptQueueSizePerThread; + return this; + } + + public AcceptPolicy getAcceptPolicy() { + return acceptPolicy; + } + + public Args acceptPolicy(AcceptPolicy acceptPolicy) { + this.acceptPolicy = acceptPolicy; + return this; + } + + public void validate() { + if (selectorThreads <= 0) { + throw new IllegalArgumentException("selectorThreads must be positive."); + } + if (workerThreads < 0) { + throw new IllegalArgumentException("workerThreads must be non-negative."); + } + if (acceptQueueSizePerThread <= 0) { + throw new IllegalArgumentException("acceptQueueSizePerThread must be positive."); + } + } + } + + // Flag for stopping the server + private volatile boolean stopped_ = true; + + // The thread handling all accepts + private AcceptThread acceptThread; + + // Threads handling events on client transports + private final Set selectorThreads = new HashSet(); + + // This wraps all the functionality of queueing and thread pool management + // for the passing of Invocations from the selector thread(s) to the workers + // (if any). + private final ExecutorService invoker; + + private final Args args; + + /** + * Create the server with the specified Args configuration + */ + public TThreadedSelectorServer(Args args) { + super(args); + args.validate(); + invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService; + this.args = args; + } + + /** + * Start the accept and selector threads running to deal with clients. + * + * @return true if everything went ok, false if we couldn't start for some + * reason. + */ + @Override + protected boolean startThreads() { + try { + for (int i = 0; i < args.selectorThreads; ++i) { + selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); + } + acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, + createSelectorThreadLoadBalancer(selectorThreads)); + stopped_ = false; + for (SelectorThread thread : selectorThreads) { + thread.start(); + } + acceptThread.start(); + return true; + } catch (IOException e) { + LOGGER.error("Failed to start threads!", e); + return false; + } + } + + /** + * Joins the accept and selector threads and shuts down the executor service. + */ + @Override + protected void waitForShutdown() { + try { + joinThreads(); + } catch (InterruptedException e) { + // Non-graceful shutdown occurred + LOGGER.error("Interrupted while joining threads!", e); + } + gracefullyShutdownInvokerPool(); + } + + protected void joinThreads() throws InterruptedException { + // wait until the io threads exit + acceptThread.join(); + for (SelectorThread thread : selectorThreads) { + thread.join(); + } + } + + /** + * Stop serving and shut everything down. + */ + @Override + public void stop() { + stopped_ = true; + + // Stop queuing connect attempts asap + stopListening(); + + if (acceptThread != null) { + acceptThread.wakeupSelector(); + } + if (selectorThreads != null) { + for (SelectorThread thread : selectorThreads) { + if (thread != null) + thread.wakeupSelector(); + } + } + } + + protected void gracefullyShutdownInvokerPool() { + // try to gracefully shut down the executor service + invoker.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + /** + * We override the standard invoke method here to queue the invocation for + * invoker service instead of immediately invoking. If there is no thread + * pool, handle the invocation inline on this thread + */ + @Override + protected boolean requestInvoke(FrameBuffer frameBuffer) { + Runnable invocation = getRunnable(frameBuffer); + if (invoker != null) { + try { + invoker.execute(invocation); + return true; + } catch (RejectedExecutionException rx) { + LOGGER.warn("ExecutorService rejected execution!", rx); + return false; + } + } else { + // Invoke on the caller's thread + invocation.run(); + return true; + } + } + + protected Runnable getRunnable(FrameBuffer frameBuffer) { + return new Invocation(frameBuffer); + } + + /** + * Helper to create the invoker if one is not specified + */ + protected static ExecutorService createDefaultExecutor(Args options) { + return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null; + } + + private static BlockingQueue createDefaultAcceptQueue(int queueSize) { + if (queueSize == 0) { + // Unbounded queue + return new LinkedBlockingQueue(); + } + return new ArrayBlockingQueue(queueSize); + } + + /** + * The thread that selects on the server transport (listen socket) and accepts + * new connections to hand off to the IO selector threads + */ + protected class AcceptThread extends Thread { + + // The listen socket to accept on + private final TNonblockingServerTransport serverTransport; + private final Selector acceptSelector; + + private final SelectorThreadLoadBalancer threadChooser; + + /** + * Set up the AcceptThead + * + * @throws IOException + */ + public AcceptThread(TNonblockingServerTransport serverTransport, + SelectorThreadLoadBalancer threadChooser) throws IOException { + this.serverTransport = serverTransport; + this.threadChooser = threadChooser; + this.acceptSelector = SelectorProvider.provider().openSelector(); + this.serverTransport.registerSelector(acceptSelector); + } + + /** + * The work loop. Selects on the server transport and accepts. If there was + * a server transport that had blocking accepts, and returned on blocking + * client transports, that should be used instead + */ + public void run() { + try { + while (!stopped_) { + select(); + } + } catch (Throwable t) { + LOGGER.error("run() exiting due to uncaught error", t); + } finally { + // This will wake up the selector threads + TThreadedSelectorServer.this.stop(); + } + } + + /** + * If the selector is blocked, wake it up. + */ + public void wakeupSelector() { + acceptSelector.wakeup(); + } + + /** + * Select and process IO events appropriately: If there are connections to + * be accepted, accept them. + */ + private void select() { + try { + // wait for connect events. + acceptSelector.select(); + + // process the io events we received + Iterator selectedKeys = acceptSelector.selectedKeys().iterator(); + while (!stopped_ && selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + // skip if not valid + if (!key.isValid()) { + continue; + } + + if (key.isAcceptable()) { + handleAccept(); + } else { + LOGGER.warn("Unexpected state in select! " + key.interestOps()); + } + } + } catch (IOException e) { + LOGGER.warn("Got an IOException while selecting!", e); + } + } + + /** + * Accept a new connection. + */ + private void handleAccept() { + final TNonblockingTransport client = doAccept(); + if (client != null) { + // Pass this connection to a selector thread + final SelectorThread targetThread = threadChooser.nextThread(); + + if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { + doAddAccept(targetThread, client); + } else { + // FAIR_ACCEPT + try { + invoker.submit(new Runnable() { + public void run() { + doAddAccept(targetThread, client); + } + }); + } catch (RejectedExecutionException rx) { + LOGGER.warn("ExecutorService rejected accept registration!", rx); + // close immediately + client.close(); + } + } + } + } + + private TNonblockingTransport doAccept() { + try { + return (TNonblockingTransport) serverTransport.accept(); + } catch (TTransportException tte) { + // something went wrong accepting. + LOGGER.warn("Exception trying to accept!", tte); + return null; + } + } + + private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { + if (!thread.addAcceptedConnection(client)) { + client.close(); + } + } + } // AcceptThread + + /** + * The SelectorThread(s) will be doing all the selecting on accepted active + * connections. + */ + protected class SelectorThread extends AbstractSelectThread { + + // Accepted connections added by the accept thread. + private final BlockingQueue acceptedQueue; + + /** + * Set up the SelectorThread with an unbounded queue for incoming accepts. + * + * @throws IOException + * if a selector cannot be created + */ + public SelectorThread() throws IOException { + this(new LinkedBlockingQueue()); + } + + /** + * Set up the SelectorThread with an bounded queue for incoming accepts. + * + * @throws IOException + * if a selector cannot be created + */ + public SelectorThread(int maxPendingAccepts) throws IOException { + this(createDefaultAcceptQueue(maxPendingAccepts)); + } + + /** + * Set up the SelectorThread with a specified queue for connections. + * + * @param acceptedQueue + * The BlockingQueue implementation for holding incoming accepted + * connections. + * @throws IOException + * if a selector cannot be created. + */ + public SelectorThread(BlockingQueue acceptedQueue) throws IOException { + this.acceptedQueue = acceptedQueue; + } + + /** + * Hands off an accepted connection to be handled by this thread. This + * method will block if the queue for new connections is at capacity. + * + * @param accepted + * The connection that has been accepted. + * @return true if the connection has been successfully added. + */ + public boolean addAcceptedConnection(TNonblockingTransport accepted) { + try { + acceptedQueue.put(accepted); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while adding accepted connection!", e); + return false; + } + selector.wakeup(); + return true; + } + + /** + * The work loop. Handles selecting (read/write IO), dispatching, and + * managing the selection preferences of all existing connections. + */ + public void run() { + try { + while (!stopped_) { + select(); + processAcceptedConnections(); + processInterestChanges(); + } + } catch (Throwable t) { + LOGGER.error("run() exiting due to uncaught error", t); + } finally { + // This will wake up the accept thread and the other selector threads + TThreadedSelectorServer.this.stop(); + } + } + + /** + * Select and process IO events appropriately: If there are existing + * connections with data waiting to be read, read it, buffering until a + * whole frame has been read. If there are any pending responses, buffer + * them until their target client is available, and then send the data. + */ + private void select() { + try { + // wait for io events. + selector.select(); + + // process the io events we received + Iterator selectedKeys = selector.selectedKeys().iterator(); + while (!stopped_ && selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + // skip if not valid + if (!key.isValid()) { + cleanupSelectionKey(key); + continue; + } + + if (key.isReadable()) { + // deal with reads + handleRead(key); + } else if (key.isWritable()) { + // deal with writes + handleWrite(key); + } else { + LOGGER.warn("Unexpected state in select! " + key.interestOps()); + } + } + } catch (IOException e) { + LOGGER.warn("Got an IOException while selecting!", e); + } + } + + private void processAcceptedConnections() { + // Register accepted connections + while (!stopped_) { + TNonblockingTransport accepted = acceptedQueue.poll(); + if (accepted == null) { + break; + } + registerAccepted(accepted); + } + } + + private void registerAccepted(TNonblockingTransport accepted) { + SelectionKey clientKey = null; + try { + clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); + + FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); + clientKey.attach(frameBuffer); + } catch (IOException e) { + LOGGER.warn("Failed to register accepted connection to selector!", e); + if (clientKey != null) { + cleanupSelectionKey(clientKey); + } + accepted.close(); + } + } + } // SelectorThread + + /** + * Creates a SelectorThreadLoadBalancer to be used by the accept thread for + * assigning newly accepted connections across the threads. + */ + protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection threads) { + return new SelectorThreadLoadBalancer(threads); + } + + /** + * A round robin load balancer for choosing selector threads for new + * connections. + */ + protected class SelectorThreadLoadBalancer { + private final Collection threads; + private Iterator nextThreadIterator; + + public SelectorThreadLoadBalancer(Collection threads) { + if (threads.isEmpty()) { + throw new IllegalArgumentException("At least one selector thread is required"); + } + this.threads = Collections.unmodifiableList(new ArrayList(threads)); + nextThreadIterator = this.threads.iterator(); + } + + public SelectorThread nextThread() { + // Choose a selector thread (round robin) + if (!nextThreadIterator.hasNext()) { + nextThreadIterator = threads.iterator(); + } + return nextThreadIterator.next(); + } + } +} diff --git a/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java b/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java new file mode 100644 index 00000000..ed729a29 --- /dev/null +++ b/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.server; + +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TThreadedSelectorServer.Args; +import org.apache.thrift.transport.TNonblockingServerSocket; + +public class TestThreadedSelectorServer extends TestNonblockingServer { + protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) { + return new TThreadedSelectorServer(new Args(socket).processor(processor).protocolFactory(protoFactory)); + } +} -- 2.17.1