THRIFT-685. java: Direct buffer access to improve deserialization performance
authorBryan Duxbury <bryanduxbury@apache.org>
Thu, 18 Feb 2010 18:27:51 +0000 (18:27 +0000)
committerBryan Duxbury <bryanduxbury@apache.org>
Thu, 18 Feb 2010 18:27:51 +0000 (18:27 +0000)
This initial patch adds direct buffer access support to TDeserializer and TCompactProtocol, with the framework in place to be extended to other areas.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@911510 13f79535-47bb-0310-9956-ffa450edef68

lib/java/src/org/apache/thrift/TDeserializer.java
lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
lib/java/src/org/apache/thrift/protocol/TProtocol.java
lib/java/src/org/apache/thrift/transport/TFramedTransport.java
lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java [new file with mode: 0644]
lib/java/src/org/apache/thrift/transport/TTransport.java
lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java

index 750ea48..25d19db 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.thrift;
 
-import java.io.ByteArrayInputStream;
 import java.io.UnsupportedEncodingException;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -28,8 +27,7 @@ import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.protocol.TProtocolUtil;
 import org.apache.thrift.protocol.TType;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.transport.TMemoryInputTransport;
 
 /**
  * Generic utility for easily deserializing objects from a byte array or Java
@@ -37,7 +35,8 @@ import org.apache.thrift.TFieldIdEnum;
  *
  */
 public class TDeserializer {
-  private final TProtocolFactory protocolFactory_;
+  private final TProtocol protocol_;
+  private final TMemoryInputTransport trans_;
 
   /**
    * Create a new TDeserializer that uses the TBinaryProtocol by default.
@@ -53,7 +52,8 @@ public class TDeserializer {
    * @param protocolFactory Factory to create a protocol
    */
   public TDeserializer(TProtocolFactory protocolFactory) {
-    protocolFactory_ = protocolFactory;
+    trans_ = new TMemoryInputTransport(null);
+    protocol_ = protocolFactory.getProtocol(trans_);
   }
 
   /**
@@ -63,10 +63,8 @@ public class TDeserializer {
    * @param bytes The array to read from
    */
   public void deserialize(TBase base, byte[] bytes) throws TException {
-    base.read(
-        protocolFactory_.getProtocol(
-          new TIOStreamTransport(
-            new ByteArrayInputStream(bytes))));
+    trans_.reset(bytes);
+    base.read(protocol_);
   }
 
   /**
@@ -103,17 +101,15 @@ public class TDeserializer {
       return;
     }
 
-    TProtocol iprot = protocolFactory_.getProtocol(
-        new TIOStreamTransport(
-          new ByteArrayInputStream(bytes))); 
+    trans_.reset(bytes);
 
     // index into field ID path being currently searched for
     int curPathIndex = 0;
 
-    iprot.readStructBegin();
+    protocol_.readStructBegin();
 
     while (curPathIndex < fieldIdPath.length) {
-      TField field = iprot.readFieldBegin();
+      TField field = protocol_.readFieldBegin();
       // we can stop searching if we either see a stop or we go past the field 
       // id we're looking for (since fields should now be serialized in asc
       // order).
@@ -123,19 +119,19 @@ public class TDeserializer {
 
       if (field.id != fieldIdPath[curPathIndex].getThriftFieldId()) {
         // Not the field we're looking for. Skip field.
-        TProtocolUtil.skip(iprot, field.type);
-        iprot.readFieldEnd();
+        TProtocolUtil.skip(protocol_, field.type);
+        protocol_.readFieldEnd();
       } else {
         // This field is the next step in the path. Step into field.
         curPathIndex++;
         if (curPathIndex < fieldIdPath.length) {
-          iprot.readStructBegin();
+          protocol_.readStructBegin();
         }
       }
     }
 
     // when this line is reached, iprot will be positioned at the start of tb.
-    tb.read(iprot);
+    tb.read(protocol_);
   }
 
   /**
index 79f2f4a..ea333b6 100755 (executable)
@@ -467,12 +467,12 @@ public final class TCompactProtocol extends TProtocol {
    */
   public TField readFieldBegin() throws TException {
     byte type = readByte();
-    
+
     // if it's a stop, then we can return immediately, as the struct is over.
-    if ((type & 0x0f) == TType.STOP) {
+    if (type == TType.STOP) {
       return TSTOP;
     }
-    
+
     short fieldId;
 
     // mask off the 4 MSB of the type header. it could contain a field id delta.
@@ -484,7 +484,7 @@ public final class TCompactProtocol extends TProtocol {
       // has a delta. add the delta to the last read field id.
       fieldId = (short)(lastFieldId_ + modifier);
     }
-    
+
     TField field = new TField("", getTType((byte)(type & 0x0f)), fieldId);
 
     // if this happens to be a boolean field, the value is encoded in the type
@@ -554,8 +554,15 @@ public final class TCompactProtocol extends TProtocol {
    * Read a single byte off the wire. Nothing interesting here.
    */
   public byte readByte() throws TException {
-    trans_.readAll(byteRawBuf, 0, 1);
-    return byteRawBuf[0];
+    byte b;
+    if (trans_.getBytesRemainingInBuffer() > 0) {
+      b = trans_.getBuffer()[trans_.getBufferPosition()];
+      trans_.consumeBuffer(1);
+    } else {
+      trans_.readAll(byteRawBuf, 0, 1);
+      b = byteRawBuf[0];
+    }
+    return b;
   }
 
   /**
@@ -592,8 +599,20 @@ public final class TCompactProtocol extends TProtocol {
    * Reads a byte[] (via readBinary), and then UTF-8 decodes it.
    */
   public String readString() throws TException {
+    int length = readVarint32();
+
+    if (length == 0) {
+      return "";
+    }
+
     try {
-      return new String(readBinary(), "UTF-8");
+      if (trans_.getBytesRemainingInBuffer() >= length) {
+        String str = new String(trans_.getBuffer(), trans_.getBufferPosition(), length, "UTF-8");
+        trans_.consumeBuffer(length);
+        return str;
+      } else {
+        return new String(readBinary(length), "UTF-8");
+      }
     } catch (UnsupportedEncodingException e) {
       throw new TException("UTF-8 not supported!");
     }
@@ -611,6 +630,16 @@ public final class TCompactProtocol extends TProtocol {
     return buf;
   }
 
+  /**
+   * Read a byte[] of a known length from the wire. 
+   */
+  private byte[] readBinary(int length) throws TException {
+    if (length == 0) return new byte[0];
+
+    byte[] buf = new byte[length];
+    trans_.readAll(buf, 0, length);
+    return buf;
+  }
 
   //
   // These methods are here for the struct to call, but don't have any wire 
@@ -692,7 +721,8 @@ public final class TCompactProtocol extends TProtocol {
   //
 
   private boolean isBoolType(byte b) {
-    return (b & 0x0f) == Types.BOOLEAN_TRUE || (b & 0x0f) == Types.BOOLEAN_FALSE;
+    int lowerNibble = b & 0x0f;
+    return lowerNibble == Types.BOOLEAN_TRUE || lowerNibble == Types.BOOLEAN_FALSE;
   }
 
   /**
index 50d6683..65b6f4b 100644 (file)
@@ -142,5 +142,4 @@ public abstract class TProtocol {
   public abstract String readString() throws TException;
 
   public abstract byte[] readBinary() throws TException;
-
 }
index 3600a2b..f266cc1 100644 (file)
@@ -24,8 +24,8 @@ import java.io.ByteArrayInputStream;
 import org.apache.thrift.TByteArrayOutputStream;
 
 /**
- * Socket implementation of the TTransport interface. To be commented soon!
- *
+ * TFramedTransport is a buffered TTransport that ensures a fully read message
+ * every time by preceeding messages with a 4-byte frame size. 
  */
 public class TFramedTransport extends TTransport {
 
diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java
new file mode 100644 (file)
index 0000000..845c835
--- /dev/null
@@ -0,0 +1,61 @@
+package org.apache.thrift.transport;
+
+public class TMemoryInputTransport extends TTransport {
+
+  private byte[] buf_;
+  private int pos_;
+
+  public TMemoryInputTransport(byte[] buf) {
+    reset(buf);
+  }
+
+  public void reset(byte[] buf) {
+    buf_ = buf;
+    pos_ = 0;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() {
+    return true;
+  }
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int bytesRemaining = getBytesRemainingInBuffer();
+    int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
+    if (amtToRead > 0) {
+      System.arraycopy(buf_, pos_, buf, off, amtToRead);
+      consumeBuffer(amtToRead);
+    }
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new UnsupportedOperationException("No writing allowed!");
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return buf_;
+  }
+
+  public int getBufferPosition() {
+    return pos_;
+  }
+
+  public int getBytesRemainingInBuffer() {
+    return buf_.length - pos_;
+  }
+
+  public void consumeBuffer(int len) {
+    pos_ += len;
+  }
+
+}
index a6c047b..378efdc 100644 (file)
@@ -118,4 +118,37 @@ public abstract class TTransport {
    */
   public void flush()
     throws TTransportException {}
+
+  /**
+   * Access the protocol's underlying buffer directly. If this is not a
+   * buffered transport, return null.
+   * @return
+   */
+  public byte[] getBuffer() {
+    return null;
+  }
+
+  /**
+   * Return the index within the underlying buffer that specifies the next spot
+   * that should be read from.
+   * @return
+   */
+  public int getBufferPosition() {
+    return 0;
+  }
+
+  /**
+   * Get the number of bytes remaining in the underlying buffer. Returns -1 if
+   * this is a non-buffered transport.
+   * @return
+   */
+  public int getBytesRemainingInBuffer() {
+    return -1;
+  }
+
+  /**
+   * Consume len bytes from the underlying buffer.
+   * @param len
+   */
+  public void consumeBuffer(int len) {}
 }
index 04ef77f..86ea57c 100755 (executable)
@@ -24,7 +24,9 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TField;
@@ -138,6 +140,8 @@ public class TCompactProtocolTest {
     testMessage();
     
     testServerRequest();
+    
+    testTDeserializer();
   }
   
   public static void testNakedByte() throws Exception {
@@ -367,23 +371,17 @@ public class TCompactProtocolTest {
       }
 
       public int primitiveMethod() throws TException {
-        // TODO Auto-generated method stub
         return 0;
       }
 
       public CompactProtoTestStruct structMethod() throws TException {
-        // TODO Auto-generated method stub
         return null;
       }
 
       public void voidMethod() throws TException {
-        // TODO Auto-generated method stub
-        
       }
 
       public void methodWithDefaultArgs(int something) throws TException {
-        // TODO Auto-generated method stub
-        
       }
     };
     
@@ -452,4 +450,17 @@ public class TCompactProtocolTest {
     public abstract void writeMethod(TProtocol proto) throws TException;
     public abstract void readMethod(TProtocol proto) throws TException;
   }
+  
+  private static void testTDeserializer() throws TException {
+    TSerializer ser = new TSerializer(new TCompactProtocol.Factory());
+    byte[] bytes = ser.serialize(Fixtures.compactProtoTestStruct);
+    
+    TDeserializer deser = new TDeserializer(new TCompactProtocol.Factory());
+    CompactProtoTestStruct cpts = new CompactProtoTestStruct();
+    deser.deserialize(cpts, bytes);
+    
+    if (!Fixtures.compactProtoTestStruct.equals(cpts)) {
+      throw new RuntimeException(Fixtures.compactProtoTestStruct + " and " + cpts + " do not match!");
+    }
+  }
 }
\ No newline at end of file