From: David Reiss Date: Fri, 21 Nov 2008 23:07:05 +0000 (+0000) Subject: THRIFT-81. java: TNonblockingServer: Support a limit on read buffer size X-Git-Tag: 0.2.0~400 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=04c315946c68034a51376bab368a4c4c2c8a8ba5;p=common%2Fthrift.git THRIFT-81. java: TNonblockingServer: Support a limit on read buffer size This change makes it possible to set a maximum amount of memory that TNonblockingServer will use for all read buffers (combined). If it is exceeded, no new data will be read from clients until memory is freed. The current implementation does a busy wait in the main thread when this happens. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@719741 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java b/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java index 94ce0036..4d3ffbf9 100644 --- a/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java +++ b/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java @@ -19,6 +19,11 @@ public class TByteArrayOutputStream extends ByteArrayOutputStream { super(size); } + public TByteArrayOutputStream() { + super(); + } + + public byte[] get() { return buf; } diff --git a/lib/java/src/com/facebook/thrift/server/THsHaServer.java b/lib/java/src/com/facebook/thrift/server/THsHaServer.java index a8764ec6..8790d845 100644 --- a/lib/java/src/com/facebook/thrift/server/THsHaServer.java +++ b/lib/java/src/com/facebook/thrift/server/THsHaServer.java @@ -33,7 +33,10 @@ public class THsHaServer extends TNonblockingServer { // for the passing of Invocations from the Selector to workers. private ExecutorService invoker; - private final Options options; + protected final int MIN_WORKER_THREADS; + protected final int MAX_WORKER_THREADS; + protected final int STOP_TIMEOUT_VAL; + protected final TimeUnit STOP_TIMEOUT_UNIT; /** * Create server with given processor, and server transport. Default server @@ -188,8 +191,13 @@ public class THsHaServer extends TNonblockingServer { { super(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory); - this.options = options; + inputProtocolFactory, outputProtocolFactory, + options); + + MIN_WORKER_THREADS = options.minWorkerThreads; + MAX_WORKER_THREADS = options.maxWorkerThreads; + STOP_TIMEOUT_VAL = options.stopTimeoutVal; + STOP_TIMEOUT_UNIT = options.stopTimeoutUnit; } /** @inheritdoc */ @@ -223,9 +231,8 @@ public class THsHaServer extends TNonblockingServer { protected boolean startInvokerPool() { // start the invoker pool LinkedBlockingQueue queue = new LinkedBlockingQueue(); - invoker = new ThreadPoolExecutor(options.minWorkerThreads, - options.maxWorkerThreads, options.stopTimeoutVal, options.stopTimeoutUnit, - queue); + invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS, + STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue); return true; } @@ -279,7 +286,7 @@ public class THsHaServer extends TNonblockingServer { } } - public static class Options { + public static class Options extends TNonblockingServer.Options { public int minWorkerThreads = 5; public int maxWorkerThreads = Integer.MAX_VALUE; public int stopTimeoutVal = 60; diff --git a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java index 47fdd5f2..8e9d3bd1 100644 --- a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java +++ b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java @@ -14,6 +14,7 @@ import com.facebook.thrift.transport.TFramedTransport; import com.facebook.thrift.transport.TNonblockingTransport; import com.facebook.thrift.transport.TTransportException; import com.facebook.thrift.transport.TTransportFactory; +import com.facebook.thrift.TByteArrayOutputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -59,6 +60,20 @@ public class TNonblockingServer extends TServer { private SelectThread selectThread_; + /** + * The maximum amount of memory we will allocate to client IO buffers at a + * time. Without this limit, the server will gladly allocate client buffers + * right into an out of memory exception, rather than waiting. + */ + private final long MAX_READ_BUFFER_BYTES; + + protected final Options options_; + + /** + * How many bytes are currently allocated to read buffers. + */ + private long readBufferBytesAllocated = 0; + /** * Create server with given processor and server transport, using * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input @@ -125,9 +140,25 @@ public class TNonblockingServer extends TServer { TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory) { + this(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + new Options()); + } + + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) { super(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, inputProtocolFactory, outputProtocolFactory); + options_ = options; + options_.validate(); + MAX_READ_BUFFER_BYTES = options.maxReadBufferBytes; } /** @@ -446,7 +477,7 @@ public class TNonblockingServer extends TServer { // the ByteBuffer we'll be using to write and read, depending on the state private ByteBuffer buffer_; - private ByteArrayOutputStream response_; + private TByteArrayOutputStream response_; public FrameBuffer( final TNonblockingTransport trans, final SelectionKey selectionKey) { @@ -479,6 +510,24 @@ public class TNonblockingServer extends TServer { + ". Are you using TFramedTransport on the client side?"); return false; } + + // if this frame will always be too large for this server, log the + // error and close the connection. + if (frameSize + 4 > MAX_READ_BUFFER_BYTES) { + LOGGER.severe("Read a frame size of " + frameSize + + ", which is bigger than the maximum allowable buffer size for ALL connections."); + return false; + } + + // if this frame will push us over the memory limit, then return. + // with luck, more memory will free up the next time around. + if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) { + return true; + } + + // incremement the amount of memory allocated to read buffers + readBufferBytesAllocated += frameSize + 4; + // reallocate the readbuffer as a frame-sized buffer buffer_ = ByteBuffer.allocate(frameSize + 4); // put the frame size at the head of the buffer @@ -568,6 +617,11 @@ public class TNonblockingServer extends TServer { * Shut the connection down. */ public void close() { + // if we're being closed due to an error, we might have allocated a + // buffer that we need to subtract for our memory accounting. + if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) { + readBufferBytesAllocated -= buffer_.array().length; + } trans_.close(); } @@ -586,14 +640,18 @@ public class TNonblockingServer extends TServer { * trying to write and instead go back to reading. */ public void responseReady() { - // capture the data we want to write as a byte array. - byte[] bytes = response_.toByteArray(); + // the read buffer is definitely no longer in use, so we will decrement + // our read buffer count. we do this here as well as in close because + // we'd like to free this read memory up as quickly as possible for other + // clients. + readBufferBytesAllocated -= buffer_.array().length; - if (bytes.length <= 0) { + if (response_.len() == 0) { // go straight to reading again. this was probably an async method state_ = AWAITING_REGISTER_READ; + buffer_ = null; } else { - buffer_ = ByteBuffer.wrap(bytes); + buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); // set state that we're waiting to be switched to write. we do this // asynchronously through requestSelectInterestChange() because there is a @@ -640,7 +698,7 @@ public class TNonblockingServer extends TServer { * Get the transport that should be used by the invoker for responding. */ private TTransport getOutputTransport() { - response_ = new ByteArrayOutputStream(); + response_ = new TByteArrayOutputStream(); return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); } @@ -690,4 +748,16 @@ public class TNonblockingServer extends TServer { } } // FrameBuffer + + public static class Options { + public long maxReadBufferBytes = Long.MAX_VALUE; + + public Options() {} + + public void validate() { + if (maxReadBufferBytes <= 1024) { + throw new IllegalArgumentException("You must allocate at least 1KB to the read buffer."); + } + } + } } diff --git a/test/java/TestNonblockingServer b/test/java/TestNonblockingServer index cb330d43..ee2ba9da 100644 --- a/test/java/TestNonblockingServer +++ b/test/java/TestNonblockingServer @@ -1,2 +1,2 @@ #!/bin/bash -v -java -server -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $* +java -server -Xmx256m -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $* diff --git a/test/java/src/OverloadNonblockingServer.java b/test/java/src/OverloadNonblockingServer.java new file mode 100644 index 00000000..26f53546 --- /dev/null +++ b/test/java/src/OverloadNonblockingServer.java @@ -0,0 +1,44 @@ + +package com.facebook.thrift; + +import thrift.test.*; + +import com.facebook.thrift.TApplicationException; +import com.facebook.thrift.TSerializer; +import com.facebook.thrift.transport.TTransport; +import com.facebook.thrift.transport.TSocket; +import com.facebook.thrift.transport.TFramedTransport; +import com.facebook.thrift.transport.TTransportException; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.protocol.TSimpleJSONProtocol; + +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; + + +public class OverloadNonblockingServer { + + public static void main(String[] args) throws Exception { + int msg_size_mb = Integer.parseInt(args[0]); + int msg_size = msg_size_mb * 1024 * 1024; + + TSocket socket = new TSocket("localhost", 9090); + TBinaryProtocol binprot = new TBinaryProtocol(socket); + socket.open(); + binprot.writeI32(msg_size); + binprot.writeI32(1); + socket.flush(); + + System.in.read(); + // Thread.sleep(30000); + for (int i = 0; i < msg_size_mb; i++) { + binprot.writeBinary(new byte[1024 * 1024]); + } + + socket.close(); + } +}