From: David Reiss Date: Wed, 2 Jul 2008 23:55:04 +0000 (+0000) Subject: (THRIFT-5) A TNonblockingServers (single-threaded and thread-pool) for Java X-Git-Tag: 0.2.0~483 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=03b574feada3862d460c7fc15dd1fb90687ff428;p=common%2Fthrift.git (THRIFT-5) A TNonblockingServers (single-threaded and thread-pool) for Java This patch adds two Thrift servers for Java that both use non-blocking I/O to avoid locking up worker threads for idle connections. The two classes are - TNonblockingServer, which supports single-threaded serving - THsHaServer, which performs I/O in one thread and method invocations in a configurable thread pool. To support these servers, TNonblockingServerSocket and TNonblockingSocket have been added. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@673550 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/java/src/com/facebook/thrift/server/THsHaServer.java b/lib/java/src/com/facebook/thrift/server/THsHaServer.java new file mode 100644 index 00000000..a8764ec6 --- /dev/null +++ b/lib/java/src/com/facebook/thrift/server/THsHaServer.java @@ -0,0 +1,288 @@ + +package com.facebook.thrift.server; + +import com.facebook.thrift.TException; +import com.facebook.thrift.TProcessor; +import com.facebook.thrift.TProcessorFactory; +import com.facebook.thrift.protocol.TProtocol; +import com.facebook.thrift.protocol.TProtocolFactory; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.transport.TNonblockingServerTransport; +import com.facebook.thrift.transport.TTransport; +import com.facebook.thrift.transport.TFramedTransport; +import com.facebook.thrift.transport.TNonblockingTransport; +import com.facebook.thrift.transport.TTransportException; +import com.facebook.thrift.transport.TTransportFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import java.io.IOException; + +/** + * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. + * Like TNonblockingServer, it relies on the use of TFramedTransport. + */ +public class THsHaServer extends TNonblockingServer { + + // This wraps all the functionality of queueing and thread pool management + // for the passing of Invocations from the Selector to workers. + private ExecutorService invoker; + + private final Options options; + + /** + * Create server with given processor, and server transport. Default server + * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on + * both input and output transports. A TProcessorFactory will be created that + * always returns the specified processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport) { + this(processor, serverTransport, new Options()); + } + + /** + * Create server with given processor, server transport, and server options + * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on + * both input and output transports. A TProcessorFactory will be created that + * always returns the specified processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + Options options) { + this(new TProcessorFactory(processor), serverTransport, options); + } + + /** + * Create server with specified processor factory and server transport. Uses + * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is + * used on both input and output transports. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport) { + this(processorFactory, serverTransport, new Options()); + } + + /** + * Create server with specified processor factory, server transport, and server + * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on + * both input and output transports. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + Options options) { + this(processorFactory, serverTransport, new TFramedTransport.Factory(), + new TBinaryProtocol.Factory(), options); + } + + /** + * Server with specified processor, server transport, and in/out protocol + * factory. Defaults will be used for in/out transport factory and server + * options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, protocolFactory, new Options()); + } + + /** + * Server with specified processor, server transport, and in/out protocol + * factory. Defaults will be used for in/out transport factory and server + * options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory, + Options options) { + this(processor, serverTransport, new TFramedTransport.Factory(), + protocolFactory); + } + + /** + * Create server with specified processor, server transport, in/out + * transport factory, in/out protocol factory, and default server options. A + * processor factory will be created that always returns the specified + * processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + transportFactory, protocolFactory); + } + + /** + * Create server with specified processor factory, server transport, in/out + * transport factory, in/out protocol factory, and default server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, new Options()); + } + + /** + * Create server with specified processor factory, server transport, in/out + * transport factory, in/out protocol factory, and server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory, + Options options) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + options); + } + + /** + * Create server with everything specified, except use default server options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + /** + * Create server with everything specified, except use default server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) + { + this(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, new Options()); + } + + /** + * Create server with every option fully specified. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) + { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + this.options = options; + } + + /** @inheritdoc */ + @Override + public void serve() { + if (!startInvokerPool()) { + return; + } + + // start listening, or exit + if (!startListening()) { + return; + } + + // start the selector, or exit + if (!startSelectorThread()) { + return; + } + + // this will block while we serve + joinSelector(); + + gracefullyShutdownInvokerPool(); + + // do a little cleanup + stopListening(); + + // ungracefully shut down the invoker pool? + } + + protected boolean startInvokerPool() { + // start the invoker pool + LinkedBlockingQueue queue = new LinkedBlockingQueue(); + invoker = new ThreadPoolExecutor(options.minWorkerThreads, + options.maxWorkerThreads, options.stopTimeoutVal, options.stopTimeoutUnit, + queue); + + return true; + } + + protected void gracefullyShutdownInvokerPool() { + // try to gracefully shut down the executor service + invoker.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = 10000; + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + /** + * We override the standard invoke method here to queue the invocation for + * invoker service instead of immediately invoking. The thread pool takes care of the rest. + */ + @Override + protected void requestInvoke(FrameBuffer frameBuffer) { + invoker.execute(new Invocation(frameBuffer)); + } + + /** + * An Invocation represents a method call that is prepared to execute, given + * an idle worker thread. It contains the input and output protocols the + * thread's processor should use to perform the usual Thrift invocation. + */ + private class Invocation implements Runnable { + + private final FrameBuffer frameBuffer; + + public Invocation(final FrameBuffer frameBuffer) { + this.frameBuffer = frameBuffer; + } + + public void run() { + frameBuffer.invoke(); + } + } + + public static class Options { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + } +} diff --git a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java new file mode 100644 index 00000000..d09ec67a --- /dev/null +++ b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java @@ -0,0 +1,678 @@ + +package com.facebook.thrift.server; + +import com.facebook.thrift.TException; +import com.facebook.thrift.TProcessor; +import com.facebook.thrift.TProcessorFactory; +import com.facebook.thrift.protocol.TProtocol; +import com.facebook.thrift.protocol.TProtocolFactory; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.transport.TNonblockingServerTransport; +import com.facebook.thrift.transport.TIOStreamTransport; +import com.facebook.thrift.transport.TTransport; +import com.facebook.thrift.transport.TFramedTransport; +import com.facebook.thrift.transport.TNonblockingTransport; +import com.facebook.thrift.transport.TTransportException; +import com.facebook.thrift.transport.TTransportFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.Iterator; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import java.io.IOException; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.channels.spi.SelectorProvider; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedChannelException; + +/** + * A nonblocking TServer implementation. This allows for fairness amongst all + * connected clients in terms of invocations. + * + * This server is inherently single-threaded. If you want a limited thread pool + * coupled with invocation-fairness, see THsHaServer. + * + * To use this server, you MUST use a TFramedTransport at the outermost + * transport, otherwise this server will be unable to determine when a whole + * method call has been read off the wire. Clients must also use TFramedTransport. + */ +public class TNonblockingServer extends TServer { + private static final Logger LOGGER = + Logger.getLogger(TNonblockingServer.class.getName()); + + // Flag for stopping the server + private volatile boolean stopped_; + + private SelectThread selectThread_; + + /** + * Create server with given processor and server transport, using + * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input + * and output transports. A TProcessorFactory will be created that always + * returns the specified processor. + */ + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport) { + this(new TProcessorFactory(processor), serverTransport); + } + + /** + * Create server with specified processor factory and server transport. + * TBinaryProtocol is assumed. TFramedTransport.Factory is used on both input + * and output transports. + */ + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport) { + this(processorFactory, serverTransport, + new TFramedTransport.Factory(), new TFramedTransport.Factory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); + } + + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, + new TFramedTransport.Factory(), new TFramedTransport.Factory(), + protocolFactory, protocolFactory); + } + + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory); + } + + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory); + } + + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + /** + * Begin accepting connections and processing invocations. + */ + public void serve() { + // start listening, or exit + if (!startListening()) { + return; + } + + // start the selector, or exit + if (!startSelectorThread()) { + return; + } + + // this will block while we serve + joinSelector(); + + // do a little cleanup + stopListening(); + } + + /** + * Have the server transport start accepting connections. + * + * @return true if we started listening successfully, false if something went + * wrong. + */ + protected boolean startListening() { + try { + serverTransport_.listen(); + return true; + } catch (TTransportException ttx) { + LOGGER.log(Level.SEVERE, "Failed to start listening on server socket!", ttx); + return false; + } + } + + /** + * Stop listening for conections. + */ + protected void stopListening() { + serverTransport_.close(); + } + + /** + * Start the selector thread running to deal with clients. + * + * @return true if everything went ok, false if we couldn't start for some + * reason. + */ + protected boolean startSelectorThread() { + // start the selector + try { + selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_); + selectThread_.start(); + return true; + } catch (IOException e) { + LOGGER.log(Level.SEVERE, "Failed to start selector thread!", e); + return false; + } + } + + /** + * Block until the selector exits. + */ + protected void joinSelector() { + // wait until the selector thread exits + try { + selectThread_.join(); + } catch (InterruptedException e) { + // for now, just silently ignore. technically this means we'll have less of + // a graceful shutdown as a result. + } + } + + /** + * Stop serving and shut everything down. + */ + public void stop() { + stopped_ = true; + selectThread_.wakeupSelector(); + } + + /** + * Perform an invocation. This method could behave several different ways + * - invoke immediately inline, queue for separate execution, etc. + */ + protected void requestInvoke(FrameBuffer frameBuffer) { + frameBuffer.invoke(); + } + + /** + * A FrameBuffer wants to change its selection preferences, but might not be + * in the select thread. + */ + protected void requestSelectInterestChange(FrameBuffer frameBuffer) { + selectThread_.requestSelectInterestChange(frameBuffer); + } + + /** + * The thread that will be doing all the selecting, managing new connections + * and those that still need to be read. + */ + protected class SelectThread extends Thread { + + private final TNonblockingServerTransport serverTransport; + private final Selector selector; + + // List of FrameBuffers that want to change their selection interests. + private final Set selectInterestChanges = + new HashSet(); + + /** + * Set up the SelectorThread. + */ + public SelectThread(final TNonblockingServerTransport serverTransport) + throws IOException { + this.serverTransport = serverTransport; + this.selector = SelectorProvider.provider().openSelector(); + serverTransport.registerSelector(selector); + } + + /** + * The work loop. Handles both selecting (all IO operations) and managing + * the selection preferences of all existing connections. + */ + public void run() { + while (!stopped_) { + select(); + processInterestChanges(); + } + } + + /** + * If the selector is blocked, wake it up. + */ + public void wakeupSelector() { + selector.wakeup(); + } + + /** + * Add FrameBuffer to the list of select interest changes and wake up the + * selector if it's blocked. When the select() call exits, it'll give the + * FrameBuffer a chance to change its interests. + */ + public void requestSelectInterestChange(FrameBuffer frameBuffer) { + synchronized (selectInterestChanges) { + selectInterestChanges.add(frameBuffer); + } + // wakeup the selector, if it's currently blocked. + selector.wakeup(); + } + + /** + * Select and process IO events appropriately: + * If there are connections to be accepted, accept them. + * If there are existing connections with data waiting to be read, read it, + * bufferring until a whole frame has been read. + * If there are any pending responses, buffer them until their target client + * is available, and then send the data. + */ + private void select() { + try { + // wait for io events. + selector.select(); + + // process the io events we received + Iterator selectedKeys = selector.selectedKeys().iterator(); + while (!stopped_ && selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + // skip if not valid + if (!key.isValid()) { + cleanupSelectionkey(key); + continue; + } + + // if the key is marked Accept, then it has to be the server + // transport. + if (key.isAcceptable()) { + handleAccept(); + } else if (key.isReadable()) { + // deal with reads + handleRead(key); + } else if (key.isWritable()) { + // deal with writes + handleWrite(key); + } + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Got an IOException while selecting!", e); + } + } + + /** + * Check to see if there are any FrameBuffers that have switched their + * interest type from read to write or vice versa. + */ + private void processInterestChanges() { + synchronized (selectInterestChanges) { + for (FrameBuffer fb : selectInterestChanges) { + fb.changeSelectInterests(); + } + selectInterestChanges.clear(); + } + } + + /** + * Accept a new connection. + */ + private void handleAccept() throws IOException { + SelectionKey clientKey = null; + try { + // accept the connection + TNonblockingTransport client = (TNonblockingTransport)serverTransport.accept(); + clientKey = client.registerSelector(selector, SelectionKey.OP_READ); + + // add this key to the map + FrameBuffer frameBuffer = new FrameBuffer(client, clientKey); + clientKey.attach(frameBuffer); + } catch (TTransportException tte) { + // something went wrong accepting. + cleanupSelectionkey(clientKey); + LOGGER.log(Level.WARNING, "Exception trying to accept!", tte); + tte.printStackTrace(); + } + } + + /** + * Do the work required to read from a readable client. If the frame is + * fully read, then invoke the method call. + */ + private void handleRead(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer)key.attachment(); + if (buffer.read()) { + // if the buffer's frame read is complete, invoke the method. + if (buffer.isFrameFullyRead()) { + requestInvoke(buffer); + } + } else { + cleanupSelectionkey(key); + } + } + + /** + * Let a writable client get written, if there's data to be written. + */ + private void handleWrite(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer)key.attachment(); + if (!buffer.write()) { + cleanupSelectionkey(key); + } + } + + /** + * Do connection-close cleanup on a given SelectionKey. + */ + private void cleanupSelectionkey(SelectionKey key) { + // remove the records from the two maps + FrameBuffer buffer = (FrameBuffer)key.attachment(); + if (buffer != null) { + // close the buffer + buffer.close(); + } + // cancel the selection key + key.cancel(); + } + } // SelectorThread + + /** + * Class that implements a sort of state machine around the interaction with + * a client and an invoker. It manages reading the frame size and frame data, + * getting it handed off as wrapped transports, and then the writing of + * reponse data back to the client. In the process it manages flipping the + * read and write bits on the selection key for its client. + */ + protected class FrameBuffer { + // + // Possible states for the FrameBuffer state machine. + // + // in the midst of reading the frame size off the wire + private static final int READING_FRAME_SIZE = 1; + // reading the actual frame data now, but not all the way done yet + private static final int READING_FRAME = 2; + // completely read the frame, so an invocation can now happen + private static final int READ_FRAME_COMPLETE = 3; + // waiting to get switched to listening for write events + private static final int AWAITING_REGISTER_WRITE = 4; + // started writing response data, not fully complete yet + private static final int WRITING = 6; + // another thread wants this framebuffer to go back to reading + private static final int AWAITING_REGISTER_READ = 7; + + // + // Instance variables + // + + // the actual transport hooked up to the client. + private final TNonblockingTransport trans_; + + // the SelectionKey that corresponds to our transport + private final SelectionKey selectionKey_; + + // where in the process of reading/writing are we? + private int state_ = READING_FRAME_SIZE; + + // the ByteBuffer we'll be using to write and read, depending on the state + private ByteBuffer buffer_; + + private ByteArrayOutputStream response_; + + public FrameBuffer( final TNonblockingTransport trans, + final SelectionKey selectionKey) { + trans_ = trans; + selectionKey_ = selectionKey; + buffer_ = ByteBuffer.allocate(4); + } + + /** + * Give this FrameBuffer a chance to read. The selector loop should have + * received a read event for this FrameBuffer. + * + * @return true if the connection should live on, false if it should be + * closed + */ + public boolean read() { + if (state_ == READING_FRAME_SIZE) { + // try to read the frame size completely + if (!internalRead()) { + return false; + } + + // if the frame size has been read completely, then prepare to read the + // actual frame. + if (buffer_.remaining() == 0) { + // pull out the frame size as an integer. + int frameSize = buffer_.getInt(0); + if (frameSize <= 0) { + LOGGER.severe("Read an invalid frame size of " + frameSize + + ". Are you using TFramedTransport on the client side?"); + return false; + } + // reallocate the readbuffer as a frame-sized buffer + buffer_ = ByteBuffer.allocate(frameSize + 4); + // put the frame size at the head of the buffer + buffer_.putInt(frameSize); + + state_ = READING_FRAME; + } else { + // this skips the check of READING_FRAME state below, since we can't + // possibly go on to that state if there's data left to be read at + // this one. + return true; + } + } + + // it is possible to fall through from the READING_FRAME_SIZE section + // to READING_FRAME if there's already some frame data available once + // READING_FRAME_SIZE is complete. + + if (state_ == READING_FRAME) { + if (!internalRead()) { + return false; + } + + // since we're already in the select loop here for sure, we can just + // modify our selection key directly. + if (buffer_.remaining() == 0) { + // get rid of the read select interests + selectionKey_.interestOps(0); + state_ = READ_FRAME_COMPLETE; + } + + return true; + } + + // if we fall through to this point, then the state must be invalid. + LOGGER.severe("Read was called but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to write its output to the final client. + */ + public boolean write() { + if (state_ == WRITING) { + try { + if (trans_.write(buffer_) < 0) { + return false; + } + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Got an IOException during write!", e); + return false; + } + + // we're done writing. now we need to switch back to reading. + if (buffer_.remaining() == 0) { + prepareRead(); + } + return true; + } + + LOGGER.severe("Write was called, but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to set its interest to write, once data + * has come in. + */ + public void changeSelectInterests() { + if (state_ == AWAITING_REGISTER_WRITE) { + // set the OP_WRITE interest + selectionKey_.interestOps(SelectionKey.OP_WRITE); + state_ = WRITING; + } else if (state_ == AWAITING_REGISTER_READ) { + prepareRead(); + } else { + LOGGER.severe( + "changeSelectInterest was called, but state is invalid (" + + state_ + ")"); + } + } + + /** + * Shut the connection down. + */ + public void close() { + trans_.close(); + } + + /** + * Check if this FrameBuffer has a full frame read. + */ + public boolean isFrameFullyRead() { + return state_ == READ_FRAME_COMPLETE; + } + + /** + * After the processor has processed the invocation, whatever thread is + * managing invocations should call this method on this FrameBuffer so we + * know it's time to start trying to write again. Also, if it turns out + * that there actually isn't any data in the response buffer, we'll skip + * trying to write and instead go back to reading. + */ + public void responseReady() { + // capture the data we want to write as a byte array. + byte[] bytes = response_.toByteArray(); + + if (bytes.length <= 0) { + // go straight to reading again. this was probably an async method + state_ = AWAITING_REGISTER_READ; + } else { + buffer_ = ByteBuffer.wrap(bytes); + + // set state that we're waiting to be switched to write. we do this + // asynchronously through requestSelectInterestChange() because there is a + // possibility that we're not in the main thread, and thus currently + // blocked in select(). (this functionality is in place for the sake of + // the HsHa server.) + state_ = AWAITING_REGISTER_WRITE; + } + requestSelectInterestChange(); + } + + /** + * Actually invoke the method signified by this FrameBuffer. + */ + public void invoke() { + TTransport inTrans = getInputTransport(); + TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); + TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); + + try { + processorFactory_.getProcessor(inTrans).process(inProt, outProt); + responseReady(); + } catch (TException te) { + LOGGER.log(Level.WARNING, "Exception while invoking!", te); + } + } + + /** + * Wrap the read buffer in a memory-based transport so a processor can read + * the data it needs to handle an invocation. + */ + private TTransport getInputTransport() { + return inputTransportFactory_.getTransport(new TIOStreamTransport( + new ByteArrayInputStream(buffer_.array()))); + } + + /** + * Get the transport that should be used by the invoker for responding. + */ + private TTransport getOutputTransport() { + response_ = new ByteArrayOutputStream(); + return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); + } + + /** + * Perform a read into buffer. + * + * @return true if the read succeeded, false if there was an error or the + * connection closed. + */ + private boolean internalRead() { + try { + if (trans_.read(buffer_) < 0) { + return false; + } + return true; + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Got an IOException in internalRead!", e); + return false; + } + } + + /** + * We're done writing, so reset our interest ops and change state accordingly. + */ + private void prepareRead() { + // we can set our interest directly without using the queue because + // we're in the select thread. + selectionKey_.interestOps(SelectionKey.OP_READ); + // get ready for another go-around + buffer_ = ByteBuffer.allocate(4); + state_ = READING_FRAME_SIZE; + } + + /** + * When this FrameBuffer needs to change it's select interests and execution + * might not be in the select thread, then this method will make sure the + * interest change gets done when the select thread wakes back up. When the + * current thread is the select thread, then it just does the interest change + * immediately. + */ + private void requestSelectInterestChange() { + if (Thread.currentThread() == selectThread_) { + changeSelectInterests(); + } else { + TNonblockingServer.this.requestSelectInterestChange(this); + } + } + } // FrameBuffer + +} diff --git a/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java b/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java index a9ad4b3b..e7dfcef7 100644 --- a/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java +++ b/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java @@ -43,6 +43,15 @@ public class TFramedTransport extends TTransport { */ private boolean frameWrite_ = true; + public static class Factory extends TTransportFactory { + public Factory() { + } + + public TTransport getTransport(TTransport base) { + return new TFramedTransport(base); + } + } + /** * Constructor wraps around another tranpsort */ diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java new file mode 100644 index 00000000..44ed0d8d --- /dev/null +++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java @@ -0,0 +1,143 @@ + +package com.facebook.thrift.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; + +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.ClosedChannelException; + +/** + * Wrapper around ServerSocketChannel + */ +public class TNonblockingServerSocket extends TNonblockingServerTransport { + + /** + * This channel is where all the nonblocking magic happens. + */ + private ServerSocketChannel serverSocketChannel = null; + + /** + * Underlying serversocket object + */ + private ServerSocket serverSocket_ = null; + + /** + * Port to listen on + */ + private int port_ = 0; + + /** + * Timeout for client sockets from accept + */ + private int clientTimeout_ = 0; + + /** + * Creates a server socket from underlying socket object + */ + // public TNonblockingServerSocket(ServerSocket serverSocket) { + // this(serverSocket, 0); + // } + + /** + * Creates a server socket from underlying socket object + */ + // public TNonblockingServerSocket(ServerSocket serverSocket, int clientTimeout) { + // serverSocket_ = serverSocket; + // clientTimeout_ = clientTimeout; + // } + + /** + * Creates just a port listening server socket + */ + public TNonblockingServerSocket(int port) throws TTransportException { + this(port, 0); + } + + /** + * Creates just a port listening server socket + */ + public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException { + port_ = port; + clientTimeout_ = clientTimeout; + try { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + + // Make server socket + serverSocket_ = serverSocketChannel.socket(); + // Prevent 2MSL delay problem on server restarts + serverSocket_.setReuseAddress(true); + // Bind to listening port + serverSocket_.bind(new InetSocketAddress(port_)); + } catch (IOException ioe) { + serverSocket_ = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public void listen() throws TTransportException { + // Make sure not to block on accept + if (serverSocket_ != null) { + try { + serverSocket_.setSoTimeout(0); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + } + + protected TNonblockingSocket acceptImpl() throws TTransportException { + if (serverSocket_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); + } + try { + SocketChannel socketChannel = serverSocketChannel.accept(); + if (socketChannel == null) { + return null; + } + + TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); + tsocket.setTimeout(clientTimeout_); + return tsocket; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public void registerSelector(Selector selector) { + try { + // Register the server socket channel, indicating an interest in + // accepting new connections + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + } catch (ClosedChannelException e) { + // this shouldn't happen, ideally... + // TODO: decide what to do with this. + } + } + + public void close() { + if (serverSocket_ != null) { + try { + serverSocket_.close(); + } catch (IOException iox) { + System.err.println("WARNING: Could not close server socket: " + + iox.getMessage()); + } + serverSocket_ = null; + } + } + + public void interrupt() { + // The thread-safeness of this is dubious, but Java documentation suggests + // that it is safe to do this from a different thread context + close(); + } + +} diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java new file mode 100644 index 00000000..7911851c --- /dev/null +++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java @@ -0,0 +1,12 @@ + +package com.facebook.thrift.transport; + +import java.nio.channels.Selector; + +/** + * Server transport that can be operated in a nonblocking fashion. + */ +public abstract class TNonblockingServerTransport extends TServerTransport { + + public abstract void registerSelector(Selector selector); +} diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java new file mode 100644 index 00000000..07c03e3c --- /dev/null +++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java @@ -0,0 +1,259 @@ + +package com.facebook.thrift.transport; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; + +/** + * Socket implementation of the TTransport interface. To be commented soon! + */ +public class TNonblockingSocket extends TNonblockingTransport { + + private SocketChannel socketChannel = null; + + /** + * Wrapped Socket object + */ + private Socket socket_ = null; + + /** + * Remote host + */ + private String host_ = null; + + /** + * Remote port + */ + private int port_ = 0; + + /** + * Socket timeout + */ + private int timeout_ = 0; + + /** + * Constructor that takes an already created socket. + * + * @param socketChannel Already created SocketChannel object + * @throws TTransportException if there is an error setting up the streams + */ + public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException { + try { + // make it a nonblocking channel + socketChannel.configureBlocking(false); + } catch (IOException e) { + throw new TTransportException(e); + } + + this.socketChannel = socketChannel; + this.socket_ = socketChannel.socket(); + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + } catch (SocketException sx) { + sx.printStackTrace(); + } + + // if (isOpen()) { + // try { + // // inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); + // // outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); + // } catch (IOException iox) { + // close(); + // throw new TTransportException(TTransportException.NOT_OPEN, iox); + // } + // } + } + + // This is all for the clientside stuff. Not sure that we'll actually be supporting that yet. + // /** + // * Creates a new unconnected socket that will connect to the given host + // * on the given port. + // * + // * @param host Remote host + // * @param port Remote port + // */ + // public TNonblockingSocket(String host, int port) { + // this(host, port, 0); + // } + // + // /** + // * Creates a new unconnected socket that will connect to the given host + // * on the given port. + // * + // * @param host Remote host + // * @param port Remote port + // * @param timeout Socket timeout + // */ + // public TSocket(String host, int port, int timeout) { + // host_ = host; + // port_ = port; + // timeout_ = timeout; + // initSocket(); + // } + + + /** + * Register this socket with the specified selector for both read and write + * operations. + * + * @param selector + * @return the selection key for this socket. + */ + public SelectionKey registerSelector(Selector selector, int interests) throws IOException { + // Register the new SocketChannel with our Selector, indicating + // we'd like to be notified when there's data waiting to be read + return socketChannel.register(selector, interests); + } + + /** + * Initializes the socket object + */ + private void initSocket() { + socket_ = new Socket(); + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + socket_.setSoTimeout(timeout_); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + + /** + * Sets the socket timeout + * + * @param timeout Milliseconds timeout + */ + public void setTimeout(int timeout) { + timeout_ = timeout; + try { + socket_.setSoTimeout(timeout); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + + /** + * Returns a reference to the underlying socket. + */ + public Socket getSocket() { + if (socket_ == null) { + initSocket(); + } + return socket_; + } + + /** + * Checks whether the socket is connected. + */ + public boolean isOpen() { + if (socket_ == null) { + return false; + } + return socket_.isConnected(); + } + + /** + * Connects the socket, creating a new socket object if necessary. + */ + public void open() throws TTransportException { + throw new RuntimeException("Not implemented yet"); + // if (isOpen()) { + // throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); + // } + // + // if (host_.length() == 0) { + // throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); + // } + // if (port_ <= 0) { + // throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port."); + // } + // + // if (socket_ == null) { + // initSocket(); + // } + // + // try { + // socket_.connect(new InetSocketAddress(host_, port_)); + // inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); + // outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); + // } catch (IOException iox) { + // close(); + // throw new TTransportException(TTransportException.NOT_OPEN, iox); + // } + } + + /** + * Perform a nonblocking read into buffer. + */ + public int read(ByteBuffer buffer) throws IOException { + return socketChannel.read(buffer); + } + + + /** + * Reads from the underlying input stream if not null. + */ + public int read(byte[] buf, int off, int len) throws TTransportException { + if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) { + throw new TTransportException(TTransportException.NOT_OPEN, + "Cannot read from write-only socket channel"); + } + try { + return socketChannel.read(ByteBuffer.wrap(buf, off, len)); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } + + /** + * Perform a nonblocking write of the data in buffer; + */ + public int write(ByteBuffer buffer) throws IOException { + return socketChannel.write(buffer); + } + + /** + * Writes to the underlying output stream if not null. + */ + public void write(byte[] buf, int off, int len) throws TTransportException { + if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) { + throw new TTransportException(TTransportException.NOT_OPEN, + "Cannot write to write-only socket channel"); + } + try { + socketChannel.write(ByteBuffer.wrap(buf, off, len)); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } + + /** + * Flushes the underlying output stream if not null. + */ + public void flush() throws TTransportException { + // Not supported by SocketChannel. + } + + /** + * Closes the socket. + */ + public void close() { + try { + socketChannel.close(); + } catch (IOException e) { + // silently ignore. + } + } + +} diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java new file mode 100644 index 00000000..a7024905 --- /dev/null +++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java @@ -0,0 +1,12 @@ +package com.facebook.thrift.transport; + +import java.io.IOException; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.ByteBuffer; + +public abstract class TNonblockingTransport extends TTransport { + public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; + public abstract int read(ByteBuffer buffer) throws IOException; + public abstract int write(ByteBuffer buffer) throws IOException; +} diff --git a/lib/java/src/com/facebook/thrift/transport/TTransport.java b/lib/java/src/com/facebook/thrift/transport/TTransport.java index 2c108703..3d911bfc 100644 --- a/lib/java/src/com/facebook/thrift/transport/TTransport.java +++ b/lib/java/src/com/facebook/thrift/transport/TTransport.java @@ -56,7 +56,7 @@ public abstract class TTransport { throws TTransportException; /** - * Guarantees that all of len bytes are + * Guarantees that all of len bytes are actually read off the transport. * * @param buf Array to read into * @param off Index to start reading at @@ -71,7 +71,7 @@ public abstract class TTransport { while (got < len) { ret = read(buf, off+got, len-got); if (ret <= 0) { - throw new TTransportException("Cannot read. Remote side has closed."); + throw new TTransportException("Cannot read. Remote side has closed. Tried to read " + len + " bytes, but only got " + got + " bytes."); } got += ret; } diff --git a/test/java/TestNonblockingServer b/test/java/TestNonblockingServer new file mode 100644 index 00000000..cb330d43 --- /dev/null +++ b/test/java/TestNonblockingServer @@ -0,0 +1,2 @@ +#!/bin/bash -v +java -server -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $* diff --git a/test/java/src/TestClient.java b/test/java/src/TestClient.java index 379761d8..bc76e830 100644 --- a/test/java/src/TestClient.java +++ b/test/java/src/TestClient.java @@ -38,6 +38,8 @@ public class TestClient { boolean framedInput = true; boolean framedOutput = true; + int socketTimeout = 1000; + try { for (int i = 0; i < args.length; ++i) { if (args[i].equals("-h")) { @@ -53,6 +55,8 @@ public class TestClient { url = args[++i]; } else if (args[i].equals("-n")) { numTests = Integer.valueOf(args[++i]); + } else if (args[i].equals("-timeout")) { + socketTimeout = Integer.valueOf(args[++i]); } } } catch (Exception x) { @@ -65,7 +69,7 @@ public class TestClient { transport = new THttpClient(url); } else { TSocket socket = new TSocket(host, port); - socket.setTimeout(1000); + socket.setTimeout(socketTimeout); transport = socket; if (framed) { transport = new TFramedTransport(transport, diff --git a/test/java/src/TestNonblockingServer.java b/test/java/src/TestNonblockingServer.java new file mode 100644 index 00000000..450d67a5 --- /dev/null +++ b/test/java/src/TestNonblockingServer.java @@ -0,0 +1,70 @@ +package com.facebook.thrift.test; + +import com.facebook.thrift.TException; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.protocol.TProtocol; +import com.facebook.thrift.protocol.TProtocolFactory; +import com.facebook.thrift.server.TServer; +import com.facebook.thrift.server.TSimpleServer; +import com.facebook.thrift.server.TNonblockingServer; +import com.facebook.thrift.server.THsHaServer; +import com.facebook.thrift.transport.TNonblockingServerSocket; +import com.facebook.thrift.transport.TNonblockingServerTransport; +import com.facebook.thrift.transport.TFramedTransport; + +// Generated code +import thrift.test.*; + +import java.net.ServerSocket; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; + + +public class TestNonblockingServer extends TestServer { + public static void main(String [] args) { + try { + int port = 9090; + boolean hsha = false; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-p")) { + port = Integer.valueOf(args[i++]); + } else if (args[i].equals("-hsha")) { + hsha = true; + } + } + + // Processor + TestHandler testHandler = + new TestHandler(); + ThriftTest.Processor testProcessor = + new ThriftTest.Processor(testHandler); + + // Transport + TNonblockingServerSocket tServerSocket = + new TNonblockingServerSocket(port); + + TServer serverEngine; + + if (hsha) { + // HsHa Server + serverEngine = new THsHaServer(testProcessor, tServerSocket); + } else { + // Nonblocking Server + serverEngine = new TNonblockingServer(testProcessor, tServerSocket); + } + + // Run it + System.out.println("Starting the server on port " + port + "..."); + serverEngine.serve(); + + } catch (Exception x) { + x.printStackTrace(); + } + System.out.println("done."); + } +}