From becaf536211a699f1fb936752262fdb7bcd36126 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Thu, 18 Feb 2010 18:27:51 +0000 Subject: [PATCH] THRIFT-685. java: Direct buffer access to improve deserialization performance This initial patch adds direct buffer access support to TDeserializer and TCompactProtocol, with the framework in place to be extended to other areas. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@911510 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/org/apache/thrift/TDeserializer.java | 32 +++++----- .../thrift/protocol/TCompactProtocol.java | 46 +++++++++++--- .../org/apache/thrift/protocol/TProtocol.java | 1 - .../thrift/transport/TFramedTransport.java | 4 +- .../transport/TMemoryInputTransport.java | 61 +++++++++++++++++++ .../apache/thrift/transport/TTransport.java | 33 ++++++++++ .../thrift/test/TCompactProtocolTest.java | 23 +++++-- 7 files changed, 165 insertions(+), 35 deletions(-) create mode 100644 lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java diff --git a/lib/java/src/org/apache/thrift/TDeserializer.java b/lib/java/src/org/apache/thrift/TDeserializer.java index 750ea48a..25d19db7 100644 --- a/lib/java/src/org/apache/thrift/TDeserializer.java +++ b/lib/java/src/org/apache/thrift/TDeserializer.java @@ -19,7 +19,6 @@ package org.apache.thrift; -import java.io.ByteArrayInputStream; import java.io.UnsupportedEncodingException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -28,8 +27,7 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.protocol.TProtocolUtil; import org.apache.thrift.protocol.TType; -import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.transport.TMemoryInputTransport; /** * Generic utility for easily deserializing objects from a byte array or Java @@ -37,7 +35,8 @@ import org.apache.thrift.TFieldIdEnum; * */ public class TDeserializer { - private final TProtocolFactory protocolFactory_; + private final TProtocol protocol_; + private final TMemoryInputTransport trans_; /** * Create a new TDeserializer that uses the TBinaryProtocol by default. @@ -53,7 +52,8 @@ public class TDeserializer { * @param protocolFactory Factory to create a protocol */ public TDeserializer(TProtocolFactory protocolFactory) { - protocolFactory_ = protocolFactory; + trans_ = new TMemoryInputTransport(null); + protocol_ = protocolFactory.getProtocol(trans_); } /** @@ -63,10 +63,8 @@ public class TDeserializer { * @param bytes The array to read from */ public void deserialize(TBase base, byte[] bytes) throws TException { - base.read( - protocolFactory_.getProtocol( - new TIOStreamTransport( - new ByteArrayInputStream(bytes)))); + trans_.reset(bytes); + base.read(protocol_); } /** @@ -103,17 +101,15 @@ public class TDeserializer { return; } - TProtocol iprot = protocolFactory_.getProtocol( - new TIOStreamTransport( - new ByteArrayInputStream(bytes))); + trans_.reset(bytes); // index into field ID path being currently searched for int curPathIndex = 0; - iprot.readStructBegin(); + protocol_.readStructBegin(); while (curPathIndex < fieldIdPath.length) { - TField field = iprot.readFieldBegin(); + TField field = protocol_.readFieldBegin(); // we can stop searching if we either see a stop or we go past the field // id we're looking for (since fields should now be serialized in asc // order). @@ -123,19 +119,19 @@ public class TDeserializer { if (field.id != fieldIdPath[curPathIndex].getThriftFieldId()) { // Not the field we're looking for. Skip field. - TProtocolUtil.skip(iprot, field.type); - iprot.readFieldEnd(); + TProtocolUtil.skip(protocol_, field.type); + protocol_.readFieldEnd(); } else { // This field is the next step in the path. Step into field. curPathIndex++; if (curPathIndex < fieldIdPath.length) { - iprot.readStructBegin(); + protocol_.readStructBegin(); } } } // when this line is reached, iprot will be positioned at the start of tb. - tb.read(iprot); + tb.read(protocol_); } /** diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java index 79f2f4aa..ea333b68 100755 --- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java @@ -467,12 +467,12 @@ public final class TCompactProtocol extends TProtocol { */ public TField readFieldBegin() throws TException { byte type = readByte(); - + // if it's a stop, then we can return immediately, as the struct is over. - if ((type & 0x0f) == TType.STOP) { + if (type == TType.STOP) { return TSTOP; } - + short fieldId; // mask off the 4 MSB of the type header. it could contain a field id delta. @@ -484,7 +484,7 @@ public final class TCompactProtocol extends TProtocol { // has a delta. add the delta to the last read field id. fieldId = (short)(lastFieldId_ + modifier); } - + TField field = new TField("", getTType((byte)(type & 0x0f)), fieldId); // if this happens to be a boolean field, the value is encoded in the type @@ -554,8 +554,15 @@ public final class TCompactProtocol extends TProtocol { * Read a single byte off the wire. Nothing interesting here. */ public byte readByte() throws TException { - trans_.readAll(byteRawBuf, 0, 1); - return byteRawBuf[0]; + byte b; + if (trans_.getBytesRemainingInBuffer() > 0) { + b = trans_.getBuffer()[trans_.getBufferPosition()]; + trans_.consumeBuffer(1); + } else { + trans_.readAll(byteRawBuf, 0, 1); + b = byteRawBuf[0]; + } + return b; } /** @@ -592,8 +599,20 @@ public final class TCompactProtocol extends TProtocol { * Reads a byte[] (via readBinary), and then UTF-8 decodes it. */ public String readString() throws TException { + int length = readVarint32(); + + if (length == 0) { + return ""; + } + try { - return new String(readBinary(), "UTF-8"); + if (trans_.getBytesRemainingInBuffer() >= length) { + String str = new String(trans_.getBuffer(), trans_.getBufferPosition(), length, "UTF-8"); + trans_.consumeBuffer(length); + return str; + } else { + return new String(readBinary(length), "UTF-8"); + } } catch (UnsupportedEncodingException e) { throw new TException("UTF-8 not supported!"); } @@ -611,6 +630,16 @@ public final class TCompactProtocol extends TProtocol { return buf; } + /** + * Read a byte[] of a known length from the wire. + */ + private byte[] readBinary(int length) throws TException { + if (length == 0) return new byte[0]; + + byte[] buf = new byte[length]; + trans_.readAll(buf, 0, length); + return buf; + } // // These methods are here for the struct to call, but don't have any wire @@ -692,7 +721,8 @@ public final class TCompactProtocol extends TProtocol { // private boolean isBoolType(byte b) { - return (b & 0x0f) == Types.BOOLEAN_TRUE || (b & 0x0f) == Types.BOOLEAN_FALSE; + int lowerNibble = b & 0x0f; + return lowerNibble == Types.BOOLEAN_TRUE || lowerNibble == Types.BOOLEAN_FALSE; } /** diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocol.java b/lib/java/src/org/apache/thrift/protocol/TProtocol.java index 50d6683d..65b6f4bd 100644 --- a/lib/java/src/org/apache/thrift/protocol/TProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TProtocol.java @@ -142,5 +142,4 @@ public abstract class TProtocol { public abstract String readString() throws TException; public abstract byte[] readBinary() throws TException; - } diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java index 3600a2b6..f266cc1a 100644 --- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java @@ -24,8 +24,8 @@ import java.io.ByteArrayInputStream; import org.apache.thrift.TByteArrayOutputStream; /** - * Socket implementation of the TTransport interface. To be commented soon! - * + * TFramedTransport is a buffered TTransport that ensures a fully read message + * every time by preceeding messages with a 4-byte frame size. */ public class TFramedTransport extends TTransport { diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java new file mode 100644 index 00000000..845c8356 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java @@ -0,0 +1,61 @@ +package org.apache.thrift.transport; + +public class TMemoryInputTransport extends TTransport { + + private byte[] buf_; + private int pos_; + + public TMemoryInputTransport(byte[] buf) { + reset(buf); + } + + public void reset(byte[] buf) { + buf_ = buf; + pos_ = 0; + } + + @Override + public void close() {} + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() throws TTransportException {} + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + int bytesRemaining = getBytesRemainingInBuffer(); + int amtToRead = (len > bytesRemaining ? bytesRemaining : len); + if (amtToRead > 0) { + System.arraycopy(buf_, pos_, buf, off, amtToRead); + consumeBuffer(amtToRead); + } + return amtToRead; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + throw new UnsupportedOperationException("No writing allowed!"); + } + + @Override + public byte[] getBuffer() { + return buf_; + } + + public int getBufferPosition() { + return pos_; + } + + public int getBytesRemainingInBuffer() { + return buf_.length - pos_; + } + + public void consumeBuffer(int len) { + pos_ += len; + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java index a6c047bb..378efdc3 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -118,4 +118,37 @@ public abstract class TTransport { */ public void flush() throws TTransportException {} + + /** + * Access the protocol's underlying buffer directly. If this is not a + * buffered transport, return null. + * @return + */ + public byte[] getBuffer() { + return null; + } + + /** + * Return the index within the underlying buffer that specifies the next spot + * that should be read from. + * @return + */ + public int getBufferPosition() { + return 0; + } + + /** + * Get the number of bytes remaining in the underlying buffer. Returns -1 if + * this is a non-buffered transport. + * @return + */ + public int getBytesRemainingInBuffer() { + return -1; + } + + /** + * Consume len bytes from the underlying buffer. + * @param len + */ + public void consumeBuffer(int len) {} } diff --git a/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java b/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java index 04ef77fe..86ea57c7 100755 --- a/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java +++ b/lib/java/test/org/apache/thrift/test/TCompactProtocolTest.java @@ -24,7 +24,9 @@ import java.util.Arrays; import java.util.List; import org.apache.thrift.TBase; +import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TField; @@ -138,6 +140,8 @@ public class TCompactProtocolTest { testMessage(); testServerRequest(); + + testTDeserializer(); } public static void testNakedByte() throws Exception { @@ -367,23 +371,17 @@ public class TCompactProtocolTest { } public int primitiveMethod() throws TException { - // TODO Auto-generated method stub return 0; } public CompactProtoTestStruct structMethod() throws TException { - // TODO Auto-generated method stub return null; } public void voidMethod() throws TException { - // TODO Auto-generated method stub - } public void methodWithDefaultArgs(int something) throws TException { - // TODO Auto-generated method stub - } }; @@ -452,4 +450,17 @@ public class TCompactProtocolTest { public abstract void writeMethod(TProtocol proto) throws TException; public abstract void readMethod(TProtocol proto) throws TException; } + + private static void testTDeserializer() throws TException { + TSerializer ser = new TSerializer(new TCompactProtocol.Factory()); + byte[] bytes = ser.serialize(Fixtures.compactProtoTestStruct); + + TDeserializer deser = new TDeserializer(new TCompactProtocol.Factory()); + CompactProtoTestStruct cpts = new CompactProtoTestStruct(); + deser.deserialize(cpts, bytes); + + if (!Fixtures.compactProtoTestStruct.equals(cpts)) { + throw new RuntimeException(Fixtures.compactProtoTestStruct + " and " + cpts + " do not match!"); + } + } } \ No newline at end of file -- 2.17.1