// for the passing of Invocations from the Selector to workers.
private ExecutorService invoker;
- private final Options options;
+ protected final int MIN_WORKER_THREADS;
+ protected final int MAX_WORKER_THREADS;
+ protected final int STOP_TIMEOUT_VAL;
+ protected final TimeUnit STOP_TIMEOUT_UNIT;
/**
* Create server with given processor, and server transport. Default server
{
super(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- this.options = options;
+ inputProtocolFactory, outputProtocolFactory,
+ options);
+
+ MIN_WORKER_THREADS = options.minWorkerThreads;
+ MAX_WORKER_THREADS = options.maxWorkerThreads;
+ STOP_TIMEOUT_VAL = options.stopTimeoutVal;
+ STOP_TIMEOUT_UNIT = options.stopTimeoutUnit;
}
/** @inheritdoc */
protected boolean startInvokerPool() {
// start the invoker pool
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
- invoker = new ThreadPoolExecutor(options.minWorkerThreads,
- options.maxWorkerThreads, options.stopTimeoutVal, options.stopTimeoutUnit,
- queue);
+ invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS,
+ STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue);
return true;
}
}
}
- public static class Options {
+ public static class Options extends TNonblockingServer.Options {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public int stopTimeoutVal = 60;
import com.facebook.thrift.transport.TNonblockingTransport;
import com.facebook.thrift.transport.TTransportException;
import com.facebook.thrift.transport.TTransportFactory;
+import com.facebook.thrift.TByteArrayOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
private SelectThread selectThread_;
+ /**
+ * The maximum amount of memory we will allocate to client IO buffers at a
+ * time. Without this limit, the server will gladly allocate client buffers
+ * right into an out of memory exception, rather than waiting.
+ */
+ private final long MAX_READ_BUFFER_BYTES;
+
+ protected final Options options_;
+
+ /**
+ * How many bytes are currently allocated to read buffers.
+ */
+ private long readBufferBytesAllocated = 0;
+
/**
* Create server with given processor and server transport, using
* TBinaryProtocol for the protocol, TFramedTransport.Factory on both input
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
+ this(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory,
+ new Options());
+ }
+
+ public TNonblockingServer(TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ Options options) {
super(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
+ options_ = options;
+ options_.validate();
+ MAX_READ_BUFFER_BYTES = options.maxReadBufferBytes;
}
/**
// the ByteBuffer we'll be using to write and read, depending on the state
private ByteBuffer buffer_;
- private ByteArrayOutputStream response_;
+ private TByteArrayOutputStream response_;
public FrameBuffer( final TNonblockingTransport trans,
final SelectionKey selectionKey) {
+ ". Are you using TFramedTransport on the client side?");
return false;
}
+
+ // if this frame will always be too large for this server, log the
+ // error and close the connection.
+ if (frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ LOGGER.severe("Read a frame size of " + frameSize
+ + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+ return false;
+ }
+
+ // if this frame will push us over the memory limit, then return.
+ // with luck, more memory will free up the next time around.
+ if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ return true;
+ }
+
+ // incremement the amount of memory allocated to read buffers
+ readBufferBytesAllocated += frameSize + 4;
+
// reallocate the readbuffer as a frame-sized buffer
buffer_ = ByteBuffer.allocate(frameSize + 4);
// put the frame size at the head of the buffer
* Shut the connection down.
*/
public void close() {
+ // if we're being closed due to an error, we might have allocated a
+ // buffer that we need to subtract for our memory accounting.
+ if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) {
+ readBufferBytesAllocated -= buffer_.array().length;
+ }
trans_.close();
}
* trying to write and instead go back to reading.
*/
public void responseReady() {
- // capture the data we want to write as a byte array.
- byte[] bytes = response_.toByteArray();
+ // the read buffer is definitely no longer in use, so we will decrement
+ // our read buffer count. we do this here as well as in close because
+ // we'd like to free this read memory up as quickly as possible for other
+ // clients.
+ readBufferBytesAllocated -= buffer_.array().length;
- if (bytes.length <= 0) {
+ if (response_.len() == 0) {
// go straight to reading again. this was probably an async method
state_ = AWAITING_REGISTER_READ;
+ buffer_ = null;
} else {
- buffer_ = ByteBuffer.wrap(bytes);
+ buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
// set state that we're waiting to be switched to write. we do this
// asynchronously through requestSelectInterestChange() because there is a
* Get the transport that should be used by the invoker for responding.
*/
private TTransport getOutputTransport() {
- response_ = new ByteArrayOutputStream();
+ response_ = new TByteArrayOutputStream();
return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
}
}
} // FrameBuffer
+
+ public static class Options {
+ public long maxReadBufferBytes = Long.MAX_VALUE;
+
+ public Options() {}
+
+ public void validate() {
+ if (maxReadBufferBytes <= 1024) {
+ throw new IllegalArgumentException("You must allocate at least 1KB to the read buffer.");
+ }
+ }
+ }
}
--- /dev/null
+
+package com.facebook.thrift;
+
+import thrift.test.*;
+
+import com.facebook.thrift.TApplicationException;
+import com.facebook.thrift.TSerializer;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TSimpleJSONProtocol;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class OverloadNonblockingServer {
+
+ public static void main(String[] args) throws Exception {
+ int msg_size_mb = Integer.parseInt(args[0]);
+ int msg_size = msg_size_mb * 1024 * 1024;
+
+ TSocket socket = new TSocket("localhost", 9090);
+ TBinaryProtocol binprot = new TBinaryProtocol(socket);
+ socket.open();
+ binprot.writeI32(msg_size);
+ binprot.writeI32(1);
+ socket.flush();
+
+ System.in.read();
+ // Thread.sleep(30000);
+ for (int i = 0; i < msg_size_mb; i++) {
+ binprot.writeBinary(new byte[1024 * 1024]);
+ }
+
+ socket.close();
+ }
+}