THRIFT-719. java: Update Nonblocking and HsHa server to avoid an extra buffer copy
authorBryan Duxbury <bryanduxbury@apache.org>
Fri, 26 Mar 2010 05:12:27 +0000 (05:12 +0000)
committerBryan Duxbury <bryanduxbury@apache.org>
Fri, 26 Mar 2010 05:12:27 +0000 (05:12 +0000)
This patch causes Nonblocking and HsHa servers to explicitly enforce use of TFramedTransport and make sure that the actual invoker is deserializing from a TMemoryInputTransport. This should provide a substantial boost in performance.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@927695 13f79535-47bb-0310-9956-ffa450edef68

lib/java/src/org/apache/thrift/server/THsHaServer.java
lib/java/src/org/apache/thrift/server/TNonblockingServer.java

index 47600f5..933ab17 100644 (file)
@@ -112,8 +112,7 @@ public class THsHaServer extends TNonblockingServer {
                       TNonblockingServerTransport serverTransport,
                       TProtocolFactory protocolFactory,
                       Options options) {
-    this(new TProcessorFactory(processor), serverTransport, 
-      new TFramedTransport.Factory(),
+    this(new TProcessorFactory(processor), serverTransport,
       new TFramedTransport.Factory(),
       protocolFactory, protocolFactory, 
       options);
@@ -142,7 +141,7 @@ public class THsHaServer extends TNonblockingServer {
                       TFramedTransport.Factory transportFactory,
                       TProtocolFactory protocolFactory) {
     this(processorFactory, serverTransport,
-      transportFactory, transportFactory,
+      transportFactory,
       protocolFactory, protocolFactory, new Options());
   }
 
@@ -156,7 +155,7 @@ public class THsHaServer extends TNonblockingServer {
                       TProtocolFactory protocolFactory,
                       Options options) {
     this(processorFactory, serverTransport,
-      transportFactory, transportFactory,
+      transportFactory,
       protocolFactory, protocolFactory,
       options);
   }
@@ -166,12 +165,11 @@ public class THsHaServer extends TNonblockingServer {
    */
   public THsHaServer( TProcessor processor,
                       TNonblockingServerTransport serverTransport,
-                      TFramedTransport.Factory inputTransportFactory,
                       TFramedTransport.Factory outputTransportFactory,
                       TProtocolFactory inputProtocolFactory,
                       TProtocolFactory outputProtocolFactory) {
     this(new TProcessorFactory(processor), serverTransport,
-      inputTransportFactory, outputTransportFactory,
+      outputTransportFactory,
       inputProtocolFactory, outputProtocolFactory);
   }
 
@@ -180,13 +178,12 @@ public class THsHaServer extends TNonblockingServer {
    */
   public THsHaServer( TProcessorFactory processorFactory,
                       TNonblockingServerTransport serverTransport,
-                      TFramedTransport.Factory inputTransportFactory,
                       TFramedTransport.Factory outputTransportFactory,
                       TProtocolFactory inputProtocolFactory,
                       TProtocolFactory outputProtocolFactory)
   {
     this(processorFactory, serverTransport,
-      inputTransportFactory, outputTransportFactory,
+      outputTransportFactory,
       inputProtocolFactory, outputProtocolFactory, new Options());
   }
 
@@ -195,14 +192,13 @@ public class THsHaServer extends TNonblockingServer {
    */
   public THsHaServer( TProcessorFactory processorFactory,
                       TNonblockingServerTransport serverTransport,
-                      TFramedTransport.Factory inputTransportFactory,
                       TFramedTransport.Factory outputTransportFactory,
                       TProtocolFactory inputProtocolFactory,
                       TProtocolFactory outputProtocolFactory,
                       Options options)
   {
     super(processorFactory, serverTransport,
-      inputTransportFactory, outputTransportFactory,
+      outputTransportFactory,
       inputProtocolFactory, outputProtocolFactory,
       options);
 
index 02fed33..31a6e24 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.apache.thrift.server;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
@@ -30,9 +29,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.thrift.TByteArrayOutputStream;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
@@ -42,10 +38,13 @@ import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
 import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A nonblocking TServer implementation. This allows for fairness amongst all
@@ -100,7 +99,7 @@ public class TNonblockingServer extends TServer {
   public TNonblockingServer(TProcessorFactory processorFactory,
                             TNonblockingServerTransport serverTransport) {
     this(processorFactory, serverTransport,
-         new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+         new TFramedTransport.Factory(),
          new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
   }
 
@@ -108,7 +107,7 @@ public class TNonblockingServer extends TServer {
                             TNonblockingServerTransport serverTransport,
                             TProtocolFactory protocolFactory) {
     this(processor, serverTransport,
-         new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+         new TFramedTransport.Factory(),
          protocolFactory, protocolFactory);
   }
 
@@ -117,7 +116,7 @@ public class TNonblockingServer extends TServer {
                             TFramedTransport.Factory transportFactory,
                             TProtocolFactory protocolFactory) {
     this(processor, serverTransport,
-         transportFactory, transportFactory,
+         transportFactory,
          protocolFactory, protocolFactory);
   }
 
@@ -126,42 +125,39 @@ public class TNonblockingServer extends TServer {
                             TFramedTransport.Factory transportFactory,
                             TProtocolFactory protocolFactory) {
     this(processorFactory, serverTransport,
-         transportFactory, transportFactory,
+         transportFactory,
          protocolFactory, protocolFactory);
   }
 
   public TNonblockingServer(TProcessor processor,
                             TNonblockingServerTransport serverTransport,
-                            TFramedTransport.Factory inputTransportFactory,
                             TFramedTransport.Factory outputTransportFactory,
                             TProtocolFactory inputProtocolFactory,
                             TProtocolFactory outputProtocolFactory) {
     this(new TProcessorFactory(processor), serverTransport,
-         inputTransportFactory, outputTransportFactory,
+         outputTransportFactory,
          inputProtocolFactory, outputProtocolFactory);
   }
 
   public TNonblockingServer(TProcessorFactory processorFactory,
                             TNonblockingServerTransport serverTransport,
-                            TFramedTransport.Factory inputTransportFactory,
                             TFramedTransport.Factory outputTransportFactory,
                             TProtocolFactory inputProtocolFactory,
                             TProtocolFactory outputProtocolFactory) {
     this(processorFactory, serverTransport,
-          inputTransportFactory, outputTransportFactory,
+          outputTransportFactory,
           inputProtocolFactory, outputProtocolFactory,
           new Options());
   }
 
   public TNonblockingServer(TProcessorFactory processorFactory,
                             TNonblockingServerTransport serverTransport,
-                            TFramedTransport.Factory inputTransportFactory,
                             TFramedTransport.Factory outputTransportFactory,
                             TProtocolFactory inputProtocolFactory,
                             TProtocolFactory outputProtocolFactory,
                             Options options) {
     super(processorFactory, serverTransport,
-          inputTransportFactory, outputTransportFactory,
+          null, outputTransportFactory,
           inputProtocolFactory, outputProtocolFactory);
     options_ = options;
     options_.validate();
@@ -522,7 +518,7 @@ public class TNonblockingServer extends TServer {
 
           // if this frame will always be too large for this server, log the
           // error and close the connection.
-          if (frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+          if (frameSize > MAX_READ_BUFFER_BYTES) {
             LOGGER.error("Read a frame size of " + frameSize
               + ", which is bigger than the maximum allowable buffer size for ALL connections.");
             return false;
@@ -530,17 +526,15 @@ public class TNonblockingServer extends TServer {
 
           // if this frame will push us over the memory limit, then return.
           // with luck, more memory will free up the next time around.
-          if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+          if (readBufferBytesAllocated + frameSize > MAX_READ_BUFFER_BYTES) {
             return true;
           }
 
           // incremement the amount of memory allocated to read buffers
-          readBufferBytesAllocated += frameSize + 4;
+          readBufferBytesAllocated += frameSize;
 
           // reallocate the readbuffer as a frame-sized buffer
-          buffer_ = ByteBuffer.allocate(frameSize + 4);
-          // put the frame size at the head of the buffer
-          buffer_.putInt(frameSize);
+          buffer_ = ByteBuffer.allocate(frameSize);
 
           state_ = READING_FRAME;
         } else {
@@ -699,8 +693,7 @@ public class TNonblockingServer extends TServer {
      * the data it needs to handle an invocation.
      */
     private TTransport getInputTransport() {
-      return inputTransportFactory_.getTransport(new TIOStreamTransport(
-        new ByteArrayInputStream(buffer_.array())));
+      return new TMemoryInputTransport(buffer_.array());
     }
 
     /**