From: Bryan Duxbury Date: Fri, 26 Mar 2010 05:12:27 +0000 (+0000) Subject: THRIFT-719. java: Update Nonblocking and HsHa server to avoid an extra buffer copy X-Git-Tag: 0.3.0~60 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=5af1b97173bd8cd2dda9529d441ca68960f5ff61;p=common%2Fthrift.git THRIFT-719. java: Update Nonblocking and HsHa server to avoid an extra buffer copy This patch causes Nonblocking and HsHa servers to explicitly enforce use of TFramedTransport and make sure that the actual invoker is deserializing from a TMemoryInputTransport. This should provide a substantial boost in performance. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@927695 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java index 47600f51..933ab179 100644 --- a/lib/java/src/org/apache/thrift/server/THsHaServer.java +++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java @@ -112,8 +112,7 @@ public class THsHaServer extends TNonblockingServer { TNonblockingServerTransport serverTransport, TProtocolFactory protocolFactory, Options options) { - this(new TProcessorFactory(processor), serverTransport, - new TFramedTransport.Factory(), + this(new TProcessorFactory(processor), serverTransport, new TFramedTransport.Factory(), protocolFactory, protocolFactory, options); @@ -142,7 +141,7 @@ public class THsHaServer extends TNonblockingServer { TFramedTransport.Factory transportFactory, TProtocolFactory protocolFactory) { this(processorFactory, serverTransport, - transportFactory, transportFactory, + transportFactory, protocolFactory, protocolFactory, new Options()); } @@ -156,7 +155,7 @@ public class THsHaServer extends TNonblockingServer { TProtocolFactory protocolFactory, Options options) { this(processorFactory, serverTransport, - transportFactory, transportFactory, + transportFactory, protocolFactory, protocolFactory, options); } @@ -166,12 +165,11 @@ public class THsHaServer extends TNonblockingServer { */ public THsHaServer( TProcessor processor, TNonblockingServerTransport serverTransport, - TFramedTransport.Factory inputTransportFactory, TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory) { this(new TProcessorFactory(processor), serverTransport, - inputTransportFactory, outputTransportFactory, + outputTransportFactory, inputProtocolFactory, outputProtocolFactory); } @@ -180,13 +178,12 @@ public class THsHaServer extends TNonblockingServer { */ public THsHaServer( TProcessorFactory processorFactory, TNonblockingServerTransport serverTransport, - TFramedTransport.Factory inputTransportFactory, TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory) { this(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, + outputTransportFactory, inputProtocolFactory, outputProtocolFactory, new Options()); } @@ -195,14 +192,13 @@ public class THsHaServer extends TNonblockingServer { */ public THsHaServer( TProcessorFactory processorFactory, TNonblockingServerTransport serverTransport, - TFramedTransport.Factory inputTransportFactory, TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, Options options) { super(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, + outputTransportFactory, inputProtocolFactory, outputProtocolFactory, options); diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 02fed332..31a6e243 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -20,7 +20,6 @@ package org.apache.thrift.server; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -30,9 +29,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -42,10 +38,13 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TMemoryInputTransport; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A nonblocking TServer implementation. This allows for fairness amongst all @@ -100,7 +99,7 @@ public class TNonblockingServer extends TServer { public TNonblockingServer(TProcessorFactory processorFactory, TNonblockingServerTransport serverTransport) { this(processorFactory, serverTransport, - new TFramedTransport.Factory(), new TFramedTransport.Factory(), + new TFramedTransport.Factory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); } @@ -108,7 +107,7 @@ public class TNonblockingServer extends TServer { TNonblockingServerTransport serverTransport, TProtocolFactory protocolFactory) { this(processor, serverTransport, - new TFramedTransport.Factory(), new TFramedTransport.Factory(), + new TFramedTransport.Factory(), protocolFactory, protocolFactory); } @@ -117,7 +116,7 @@ public class TNonblockingServer extends TServer { TFramedTransport.Factory transportFactory, TProtocolFactory protocolFactory) { this(processor, serverTransport, - transportFactory, transportFactory, + transportFactory, protocolFactory, protocolFactory); } @@ -126,42 +125,39 @@ public class TNonblockingServer extends TServer { TFramedTransport.Factory transportFactory, TProtocolFactory protocolFactory) { this(processorFactory, serverTransport, - transportFactory, transportFactory, + transportFactory, protocolFactory, protocolFactory); } public TNonblockingServer(TProcessor processor, TNonblockingServerTransport serverTransport, - TFramedTransport.Factory inputTransportFactory, TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory) { this(new TProcessorFactory(processor), serverTransport, - inputTransportFactory, outputTransportFactory, + outputTransportFactory, inputProtocolFactory, outputProtocolFactory); } public TNonblockingServer(TProcessorFactory processorFactory, TNonblockingServerTransport serverTransport, - TFramedTransport.Factory inputTransportFactory, TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory) { this(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, + outputTransportFactory, inputProtocolFactory, outputProtocolFactory, new Options()); } public TNonblockingServer(TProcessorFactory processorFactory, TNonblockingServerTransport serverTransport, - TFramedTransport.Factory inputTransportFactory, TFramedTransport.Factory outputTransportFactory, TProtocolFactory inputProtocolFactory, TProtocolFactory outputProtocolFactory, Options options) { super(processorFactory, serverTransport, - inputTransportFactory, outputTransportFactory, + null, outputTransportFactory, inputProtocolFactory, outputProtocolFactory); options_ = options; options_.validate(); @@ -522,7 +518,7 @@ public class TNonblockingServer extends TServer { // if this frame will always be too large for this server, log the // error and close the connection. - if (frameSize + 4 > MAX_READ_BUFFER_BYTES) { + if (frameSize > MAX_READ_BUFFER_BYTES) { LOGGER.error("Read a frame size of " + frameSize + ", which is bigger than the maximum allowable buffer size for ALL connections."); return false; @@ -530,17 +526,15 @@ public class TNonblockingServer extends TServer { // if this frame will push us over the memory limit, then return. // with luck, more memory will free up the next time around. - if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) { + if (readBufferBytesAllocated + frameSize > MAX_READ_BUFFER_BYTES) { return true; } // incremement the amount of memory allocated to read buffers - readBufferBytesAllocated += frameSize + 4; + readBufferBytesAllocated += frameSize; // reallocate the readbuffer as a frame-sized buffer - buffer_ = ByteBuffer.allocate(frameSize + 4); - // put the frame size at the head of the buffer - buffer_.putInt(frameSize); + buffer_ = ByteBuffer.allocate(frameSize); state_ = READING_FRAME; } else { @@ -699,8 +693,7 @@ public class TNonblockingServer extends TServer { * the data it needs to handle an invocation. */ private TTransport getInputTransport() { - return inputTransportFactory_.getTransport(new TIOStreamTransport( - new ByteArrayInputStream(buffer_.array()))); + return new TMemoryInputTransport(buffer_.array()); } /**