From: Bryan Duxbury Date: Fri, 13 Aug 2010 00:13:16 +0000 (+0000) Subject: THRIFT-831. java: FramedTransport implementation that reuses its buffers X-Git-Tag: 0.4.0~9 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=0bb034008cbbb9681863f009e8473d527f154a18;p=common%2Fthrift.git THRIFT-831. java: FramedTransport implementation that reuses its buffers This patch adds a TFastFramedTransport that is compatible with TFramedTransport, but makes use of a pair of internal, automatically-expanding buffers to avoid unnecessary reallocations. This makes interactions with the transport up to 2.5x faster. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@985049 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java new file mode 100644 index 00000000..b02905f3 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +/** + * Helper class that wraps a byte[] so that it can expand and be reused. Users + * should call resizeIfNecessary to make sure the buffer has suitable capacity, + * and then use the array as needed. Note that the internal array will grow at a + * rate slightly faster than the requested capacity with the (untested) + * objective of avoiding expensive buffer allocations and copies. + */ +public class AutoExpandingBuffer { + private byte[] array; + + private final double growthCoefficient; + + public AutoExpandingBuffer(int initialCapacity, double growthCoefficient) { + if (growthCoefficient < 1.0) { + throw new IllegalArgumentException("Growth coefficient must be >= 1.0"); + } + array = new byte[initialCapacity]; + this.growthCoefficient = growthCoefficient; + } + + public void resizeIfNecessary(int size) { + if (array.length < size) { + byte[] newBuf = new byte[(int)(size * growthCoefficient)]; + System.arraycopy(array, 0, newBuf, 0, array.length); + array = newBuf; + } + } + + public byte[] array() { + return array; + } +} diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java new file mode 100644 index 00000000..d29d60b7 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import org.apache.commons.lang.NotImplementedException; + +/** + * TTransport for reading from an AutoExpandingBuffer. + */ +public class AutoExpandingBufferReadTransport extends TTransport { + + private final AutoExpandingBuffer buf; + + private int pos = 0; + private int limit = 0; + + public AutoExpandingBufferReadTransport(int initialCapacity, double overgrowthCoefficient) { + this.buf = new AutoExpandingBuffer(initialCapacity, overgrowthCoefficient); + } + + public void fill(TTransport inTrans, int length) throws TTransportException { + buf.resizeIfNecessary(length); + inTrans.readAll(buf.array(), 0, length); + pos = 0; + limit = length; + } + + @Override + public void close() {} + + @Override + public boolean isOpen() { return true; } + + @Override + public void open() throws TTransportException {} + + @Override + public final int read(byte[] target, int off, int len) throws TTransportException { + int amtToRead = Math.min(len, getBytesRemainingInBuffer()); + System.arraycopy(buf.array(), pos, target, off, amtToRead); + consumeBuffer(amtToRead); + return amtToRead; + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + throw new NotImplementedException(); + } + + @Override + public final void consumeBuffer(int len) { + pos += len; + } + + @Override + public final byte[] getBuffer() { + return buf.array(); + } + + @Override + public final int getBufferPosition() { + return pos; + } + + @Override + public final int getBytesRemainingInBuffer() { + return limit - pos; + } +} + \ No newline at end of file diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java new file mode 100644 index 00000000..2376cf37 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import org.apache.commons.lang.NotImplementedException; + +/** + * TTransport for writing to an AutoExpandingBuffer. + */ +public final class AutoExpandingBufferWriteTransport extends TTransport { + + private final AutoExpandingBuffer buf; + private int pos; + + public AutoExpandingBufferWriteTransport(int initialCapacity, double growthCoefficient) { + this.buf = new AutoExpandingBuffer(initialCapacity, growthCoefficient); + this.pos = 0; + } + + @Override + public void close() {} + + @Override + public boolean isOpen() {return true;} + + @Override + public void open() throws TTransportException {} + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + throw new NotImplementedException(); + } + + @Override + public void write(byte[] toWrite, int off, int len) throws TTransportException { + buf.resizeIfNecessary(pos + len); + System.arraycopy(toWrite, off, buf.array(), pos, len); + pos += len; + } + + public AutoExpandingBuffer getBuf() { + return buf; + } + + public int getPos() { + return pos; + } + + public void reset() { + pos = 0; + } +} diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java new file mode 100644 index 00000000..2a1f1dac --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +/** + * This transport is wire compatible with {@link TFramedTransport}, but makes + * use of reusable, expanding read and write buffers in order to avoid + * allocating new byte[]s all the time. Since the buffers only expand, you + * should probably only use this transport if your messages are not too variably + * large, unless the persistent memory cost is not an issue. + * + * This implementation is NOT threadsafe. + */ +public class TFastFramedTransport extends TTransport { + + public static class Factory extends TTransportFactory { + private final int initialCapacity; + private final int maxLength; + + public Factory() { + this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); + } + + public Factory(int initialCapacity) { + this(initialCapacity, DEFAULT_MAX_LENGTH); + } + + public Factory(int initialCapacity, int maxLength) { + this.initialCapacity = initialCapacity; + this.maxLength = maxLength; + } + + @Override + public TTransport getTransport(TTransport trans) { + return new TFastFramedTransport(trans, + initialCapacity, + maxLength); + } + } + + /** + * How big should the default read and write buffers be? + */ + public static final int DEFAULT_BUF_CAPACITY = 1024; + /** + * How big is the largest allowable frame? Defaults to Integer.MAX_VALUE. + */ + public static final int DEFAULT_MAX_LENGTH = Integer.MAX_VALUE; + + private final TTransport underlying; + private final AutoExpandingBufferWriteTransport writeBuffer; + private final AutoExpandingBufferReadTransport readBuffer; + private final byte[] i32buf = new byte[4]; + private final int maxLength; + + /** + * Create a new {@link TFastFramedTransport}. Use the defaults + * for initial buffer size and max frame length. + * @param underlying Transport that real reads and writes will go through to. + */ + public TFastFramedTransport(TTransport underlying) { + this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); + } + + /** + * Create a new {@link TFastFramedTransport}. Use the specified + * initial buffer capacity and the default max frame length. + * @param underlying Transport that real reads and writes will go through to. + * @param initialBufferCapacity The initial size of the read and write buffers. + * In practice, it's not critical to set this unless you know in advance that + * your messages are going to be very large. + */ + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) { + this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH); + } + + /** + * + * @param underlying Transport that real reads and writes will go through to. + * @param initialBufferCapacity The initial size of the read and write buffers. + * In practice, it's not critical to set this unless you know in advance that + * your messages are going to be very large. (You can pass + * TFramedTransportWithReusableBuffer.DEFAULT_BUF_CAPACITY if you're only + * using this constructor because you want to set the maxLength.) + * @param maxLength The max frame size you are willing to read. You can use + * this parameter to limit how much memory can be allocated. + */ + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) { + this.underlying = underlying; + this.maxLength = maxLength; + writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 1.5); + readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5); + } + + @Override + public void close() { + underlying.close(); + } + + @Override + public boolean isOpen() { + return underlying.isOpen(); + } + + @Override + public void open() throws TTransportException { + underlying.open(); + } + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + int got = readBuffer.read(buf, off, len); + if (got > 0) { + return got; + } + + // Read another frame of data + readFrame(); + + return readBuffer.read(buf, off, len); + } + + private void readFrame() throws TTransportException { + underlying.readAll(i32buf , 0, 4); + int size = TFramedTransport.decodeFrameSize(i32buf); + + if (size < 0) { + throw new TTransportException("Read a negative frame size (" + size + ")!"); + } + + if (size > maxLength) { + throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength + ")!"); + } + + readBuffer.fill(underlying, size); + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + writeBuffer.write(buf, off, len); + } + + @Override + public void consumeBuffer(int len) { + readBuffer.consumeBuffer(len); + } + + @Override + public void flush() throws TTransportException { + int length = writeBuffer.getPos(); + TFramedTransport.encodeFrameSize(length, i32buf); + underlying.write(i32buf, 0, 4); + underlying.write(writeBuffer.getBuf().array(), 0, length); + writeBuffer.reset(); + underlying.flush(); + } + + @Override + public byte[] getBuffer() { + return readBuffer.getBuffer(); + } + + @Override + public int getBufferPosition() { + return readBuffer.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() { + return readBuffer.getBytesRemainingInBuffer(); + } +} diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java new file mode 100644 index 00000000..337dcf8c --- /dev/null +++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import junit.framework.TestCase; + +public class TestAutoExpandingBuffer extends TestCase { + public void testExpands() throws Exception { + // has expected initial capacity + AutoExpandingBuffer b = new AutoExpandingBuffer(10, 1.5); + assertEquals(10, b.array().length); + + // doesn't shrink + b.resizeIfNecessary(8); + assertEquals(10, b.array().length); + + // grows when more capacity is needed + b.resizeIfNecessary(100); + assertTrue(b.array().length >= 100); + } +} diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java new file mode 100644 index 00000000..2e1f9472 --- /dev/null +++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import java.nio.ByteBuffer; + +import junit.framework.TestCase; + +public class TestAutoExpandingBufferReadTransport extends TestCase { + private static final byte[] HUNDRED_BYTES = new byte[100]; + + static { + for (byte i = 0; i < 100; i++) { + HUNDRED_BYTES[i] = i; + } + } + + public void testIt() throws Exception { + AutoExpandingBufferReadTransport t = new AutoExpandingBufferReadTransport(150, 1.5); + + TMemoryInputTransport membuf = new TMemoryInputTransport(HUNDRED_BYTES); + + t.fill(membuf, 100); + assertEquals(100, t.getBytesRemainingInBuffer()); + assertEquals(0, t.getBufferPosition()); + + byte[] target = new byte[10]; + assertEquals(10, t.read(target, 0, 10)); + assertEquals(ByteBuffer.wrap(HUNDRED_BYTES, 0, 10), ByteBuffer.wrap(target)); + + assertEquals(90, t.getBytesRemainingInBuffer()); + assertEquals(10, t.getBufferPosition()); + } +} diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java new file mode 100644 index 00000000..d5f239da --- /dev/null +++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import java.nio.ByteBuffer; + +import junit.framework.TestCase; + +public class TestAutoExpandingBufferWriteTransport extends TestCase { + + public void testIt() throws Exception { + AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1, 1.5); + assertEquals(1, t.getBuf().array().length); + byte[] b1 = new byte[]{1,2,3}; + t.write(b1); + assertEquals(3, t.getPos()); + assertTrue(t.getBuf().array().length >= 3); + assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(t.getBuf().array(), 0, 3)); + + t.reset(); + assertTrue(t.getBuf().array().length >= 3); + assertEquals(0, t.getPos()); + byte[] b2 = new byte[]{4,5}; + t.write(b2); + assertEquals(2, t.getPos()); + assertEquals(ByteBuffer.wrap(b2), ByteBuffer.wrap(t.getBuf().array(), 0, 2)); + } +} diff --git a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java new file mode 100644 index 00000000..e0240498 --- /dev/null +++ b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +public class TestTFastFramedTransport extends TestTFramedTransport { + @Override + protected TTransport getTransport(TTransport underlying) { + return new TFastFramedTransport(underlying, 50, 10 * 1024 * 1024); + } +} diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java index 27dad804..78f58ece 100644 --- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java +++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java @@ -18,6 +18,7 @@ */ package org.apache.thrift.transport; +import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -29,7 +30,11 @@ import junit.framework.TestCase; public class TestTFramedTransport extends TestCase { - private static byte[] byteSequence(int start, int end) { + protected TTransport getTransport(TTransport underlying) { + return new TFramedTransport(underlying); + } + + public static byte[] byteSequence(int start, int end) { byte[] result = new byte[end-start+1]; for (int i = 0; i <= (end-start); i++) { result[i] = (byte)(start+i); @@ -43,26 +48,37 @@ public class TestTFramedTransport extends TestCase { dos.writeInt(50); dos.write(byteSequence(0, 49)); + dos.writeInt(220); + dos.write(byteSequence(0, 219)); + TMemoryBuffer membuf = new TMemoryBuffer(0); membuf.write(baos.toByteArray()); ReadCountingTransport countTrans = new ReadCountingTransport(membuf); - TFramedTransport trans = new TFramedTransport(countTrans); + TTransport trans = getTransport(countTrans); byte[] readBuf = new byte[10]; trans.read(readBuf, 0, 10); assertTrue(Arrays.equals(readBuf, byteSequence(0,9))); + assertEquals(2, countTrans.readCount); trans.read(readBuf, 0, 10); assertTrue(Arrays.equals(readBuf, byteSequence(10,19))); + assertEquals(2, countTrans.readCount); + assertEquals(30, trans.read(new byte[30], 0, 30)); assertEquals(2, countTrans.readCount); + + readBuf = new byte[220]; + assertEquals(220, trans.read(readBuf, 0, 220)); + assertTrue(Arrays.equals(readBuf, byteSequence(0, 219))); + assertEquals(4, countTrans.readCount); } public void testWrite() throws TTransportException, IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(baos)); - TTransport trans = new TFramedTransport(countingTrans); + WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(new BufferedOutputStream(baos))); + TTransport trans = getTransport(countingTrans); trans.write(byteSequence(0,100)); assertEquals(0, countingTrans.writeCount); @@ -73,12 +89,21 @@ public class TestTFramedTransport extends TestCase { trans.flush(); assertEquals(2, countingTrans.writeCount); + trans.write(byteSequence(0, 245)); + trans.flush(); + assertEquals(4, countingTrans.writeCount); + DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); assertEquals(256, din.readInt()); byte[] buf = new byte[256]; din.read(buf, 0, 256); assertTrue(Arrays.equals(byteSequence(0,255), buf)); + + assertEquals(246, din.readInt()); + buf = new byte[246]; + din.read(buf, 0, 246); + assertTrue(Arrays.equals(byteSequence(0,245), buf)); } public void testDirectRead() throws IOException, TTransportException { @@ -86,12 +111,14 @@ public class TestTFramedTransport extends TestCase { DataOutputStream dos = new DataOutputStream(baos); dos.writeInt(50); dos.write(byteSequence(0, 49)); + dos.writeInt(75); + dos.write(byteSequence(125, 200)); TMemoryBuffer membuf = new TMemoryBuffer(0); membuf.write(baos.toByteArray()); ReadCountingTransport countTrans = new ReadCountingTransport(membuf); - TFramedTransport trans = new TFramedTransport(countTrans); + TTransport trans = getTransport(countTrans); assertEquals(0, trans.getBytesRemainingInBuffer()); @@ -107,5 +134,15 @@ public class TestTFramedTransport extends TestCase { assertEquals(15, trans.getBufferPosition()); assertEquals(2, countTrans.readCount); + + assertEquals(35, trans.read(new byte[35], 0, 35)); + assertEquals(0, trans.getBytesRemainingInBuffer()); + assertEquals(50, trans.getBufferPosition()); + + trans.read(readBuf, 0, 10); + assertEquals(4, countTrans.readCount); + assertTrue(Arrays.equals(readBuf, byteSequence(125,134))); + assertEquals(65, trans.getBytesRemainingInBuffer()); + assertEquals(10, trans.getBufferPosition()); } } diff --git a/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java b/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java index 39a78361..daad838a 100644 --- a/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java +++ b/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java @@ -48,4 +48,9 @@ public class WriteCountingTransport extends TTransport { writeCount ++; trans.write(buf, off, len); } + + @Override + public void flush() throws TTransportException { + trans.flush(); + } } \ No newline at end of file