From 0f52f072013e8fcb85a8382a412a764433cf0673 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Tue, 2 Mar 2010 18:39:57 +0000 Subject: [PATCH] THRIFT-711. java: TFramedTransport should support direct buffer access This patch adds direct buffer read access to TFramedTransport as well as a simple test for reading, direct buffer reading, and writing. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@918142 13f79535-47bb-0310-9956-ffa450edef68 --- lib/java/build.xml | 2 + .../thrift/transport/TFramedTransport.java | 26 ++- .../test/transport/TFramedTransportTest.java | 164 ++++++++++++++++++ 3 files changed, 186 insertions(+), 6 deletions(-) create mode 100644 lib/java/test/org/apache/thrift/test/transport/TFramedTransportTest.java diff --git a/lib/java/build.xml b/lib/java/build.xml index 97509639..afe5afbe 100644 --- a/lib/java/build.xml +++ b/lib/java/build.xml @@ -186,6 +186,8 @@ classpathref="test.classpath" failonerror="true" /> + diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java index f266cc1a..74e2c970 100644 --- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java @@ -19,8 +19,6 @@ package org.apache.thrift.transport; -import java.io.ByteArrayInputStream; - import org.apache.thrift.TByteArrayOutputStream; /** @@ -43,7 +41,7 @@ public class TFramedTransport extends TTransport { /** * Buffer for input */ - private ByteArrayInputStream readBuffer_ = null; + private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]); public static class Factory extends TTransportFactory { public Factory() { @@ -87,8 +85,24 @@ public class TFramedTransport extends TTransport { return readBuffer_.read(buf, off, len); } + public byte[] getBuffer() { + return readBuffer_.getBuffer(); + } + + public int getBufferPosition() { + return readBuffer_.getBufferPosition(); + } + + public int getBytesRemainingInBuffer() { + return readBuffer_.getBytesRemainingInBuffer(); + } + + public void consumeBuffer(int len) { + readBuffer_.consumeBuffer(len); + } + + private final byte[] i32rd = new byte[4]; private void readFrame() throws TTransportException { - byte[] i32rd = new byte[4]; transport_.readAll(i32rd, 0, 4); int size = ((i32rd[0] & 0xff) << 24) | @@ -99,10 +113,10 @@ public class TFramedTransport extends TTransport { if (size < 0) { throw new TTransportException("Read a negative frame size (" + size + ")!"); } - + byte[] buff = new byte[size]; transport_.readAll(buff, 0, size); - readBuffer_ = new ByteArrayInputStream(buff); + readBuffer_.reset(buff); } public void write(byte[] buf, int off, int len) throws TTransportException { diff --git a/lib/java/test/org/apache/thrift/test/transport/TFramedTransportTest.java b/lib/java/test/org/apache/thrift/test/transport/TFramedTransportTest.java new file mode 100644 index 00000000..b2169de6 --- /dev/null +++ b/lib/java/test/org/apache/thrift/test/transport/TFramedTransportTest.java @@ -0,0 +1,164 @@ +package org.apache.thrift.test.transport; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +public class TFramedTransportTest { + public static class WriteCountingTransport extends TTransport { + private int writeCount = 0; + private final TTransport trans; + + public WriteCountingTransport(TTransport underlying) { + trans = underlying; + } + + @Override + public void close() {} + + @Override + public boolean isOpen() {return true;} + + @Override + public void open() throws TTransportException {} + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + return 0; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + writeCount ++; + trans.write(buf, off, len); + } + } + + public static class ReadCountingTransport extends TTransport { + public int readCount = 0; + private TTransport trans; + + public ReadCountingTransport(TTransport underlying) { + trans = underlying; + } + + @Override + public void close() {} + + @Override + public boolean isOpen() {return true;} + + @Override + public void open() throws TTransportException {} + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + readCount++; + return trans.read(buf, off, len); + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException {} + } + + public static void main(String[] args) throws TTransportException, IOException { + testWrite(); + testRead(); + testDirectRead(); + } + + private static void testWrite() throws TTransportException, IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(baos)); + TTransport trans = new TFramedTransport(countingTrans); + + trans.write(byteSequence(0,100)); + failUnless(countingTrans.writeCount == 0); + trans.write(byteSequence(101,200)); + trans.write(byteSequence(201,255)); + failUnless(countingTrans.writeCount == 0); + + trans.flush(); + failUnless(countingTrans.writeCount == 2); + + DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + failUnless(din.readInt() == 256); + + byte[] buf = new byte[256]; + din.read(buf, 0, 256); + failUnless(Arrays.equals(byteSequence(0,255), buf)); + } + + private static void testRead() throws IOException, TTransportException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(50); + dos.write(byteSequence(0, 49)); + + TMemoryBuffer membuf = new TMemoryBuffer(0); + membuf.write(baos.toByteArray()); + + ReadCountingTransport countTrans = new ReadCountingTransport(membuf); + TFramedTransport trans = new TFramedTransport(countTrans); + + byte[] readBuf = new byte[10]; + trans.read(readBuf, 0, 10); + failUnless(Arrays.equals(readBuf, byteSequence(0,9))); + + trans.read(readBuf, 0, 10); + failUnless(Arrays.equals(readBuf, byteSequence(10,19))); + + failUnless(countTrans.readCount == 2); + } + + private static void testDirectRead() throws IOException, TTransportException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(50); + dos.write(byteSequence(0, 49)); + + TMemoryBuffer membuf = new TMemoryBuffer(0); + membuf.write(baos.toByteArray()); + + ReadCountingTransport countTrans = new ReadCountingTransport(membuf); + TFramedTransport trans = new TFramedTransport(countTrans); + + failUnless(trans.getBytesRemainingInBuffer() == 0); + + byte[] readBuf = new byte[10]; + trans.read(readBuf, 0, 10); + failUnless(Arrays.equals(readBuf, byteSequence(0,9))); + + failUnless(trans.getBytesRemainingInBuffer() == 40); + failUnless(trans.getBufferPosition() == 10); + + trans.consumeBuffer(5); + failUnless(trans.getBytesRemainingInBuffer() == 35); + failUnless(trans.getBufferPosition() == 15); + + failUnless(countTrans.readCount == 2); + } + + private static void failUnless(boolean b) { + if (!b) { + throw new RuntimeException(); + } + } + + private static byte[] byteSequence(int start, int end) { + byte[] result = new byte[end-start+1]; + for (int i = 0; i <= (end-start); i++) { + result[i] = (byte)(start+i); + } + return result; + } +} -- 2.17.1