From: David Reiss Date: Wed, 5 Sep 2007 01:14:09 +0000 (+0000) Subject: Thrift: Python TBufferedTransport improvements. X-Git-Tag: 0.2.0~1222 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=2c2e6d27a238746a0db9982cc4eb35c093553361;p=common%2Fthrift.git Thrift: Python TBufferedTransport improvements. Summary: The Python version of TBufferedTransport now uses input buffering. It is also compatible with the fasbinary module. Reviewed By: mcslee Test Plan: test/FastbinaryTest.py dreiss@dreiss-vmware:~/gp/thrift/test/py$ strace -f ./TestClient.py 2>&1 | grep recv | wc -l 99 # Install new version in other terminal dreiss@dreiss-vmware:~/gp/thrift/test/py$ strace -f ./TestClient.py 2>&1 | grep recv | wc -l 14 Revert Plan: ok git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665250 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/py/src/protocol/fastbinary.c b/lib/py/src/protocol/fastbinary.c index 8a04836b..61ccd8f5 100644 --- a/lib/py/src/protocol/fastbinary.c +++ b/lib/py/src/protocol/fastbinary.c @@ -608,7 +608,7 @@ static bool readBytes(DecodeBuffer* input, char** output, int len) { // using building functions as this is a rare codepath newiobuf = PyObject_CallFunction( - input->refill_callable, "s#i", *output, len, read, NULL); + input->refill_callable, "s#i", *output, read, len, NULL); if (newiobuf == NULL) { return false; } diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py index 0f5bfdce..60f4233c 100644 --- a/lib/py/src/transport/TTransport.py +++ b/lib/py/src/transport/TTransport.py @@ -112,13 +112,16 @@ class TBufferedTransportFactory: return buffered -class TBufferedTransport(TTransportBase): +class TBufferedTransport(TTransportBase,CReadableTransport): """Class that wraps another transport and buffers its I/O.""" + DEFAULT_BUFFER = 4096 + def __init__(self, trans): self.__trans = trans - self.__buf = StringIO() + self.__wbuf = StringIO() + self.__rbuf = StringIO("") def isOpen(self): return self.__trans.isOpen() @@ -130,15 +133,38 @@ class TBufferedTransport(TTransportBase): return self.__trans.close() def read(self, sz): - return self.__trans.read(sz) + ret = self.__rbuf.read(sz) + if len(ret) != 0: + return ret + + self.__rbuf = StringIO(self.__trans.read(max(sz, self.DEFAULT_BUFFER))) + return self.__rbuf.read(sz) def write(self, buf): - self.__buf.write(buf) + self.__wbuf.write(buf) def flush(self): - self.__trans.write(self.__buf.getvalue()) + self.__trans.write(self.__wbuf.getvalue()) self.__trans.flush() - self.__buf = StringIO() + self.__wbuf = StringIO() + + # Implement the CReadableTransport interface. + @property + def cstringio_buf(self): + return self.__rbuf + + def cstringio_refill(self, partialread, reqlen): + retstring = partialread + if reqlen < self.DEFAULT_BUFFER: + # try to make a read of as much as we can. + retstring += self.__trans.read(self.DEFAULT_BUFFER) + + # but make sure we do read reqlen bytes. + if len(retstring) < reqlen: + retstring += self.__trans.readAll(reqlen - len(retstring)) + + self.__rbuf = StringIO(retstring) + return self.__rbuf class TMemoryBuffer(TTransportBase, CReadableTransport): """Wraps a cStringIO object as a TTransport. diff --git a/test/FastbinaryTest.py b/test/FastbinaryTest.py index 0918002f..f6a86995 100755 --- a/test/FastbinaryTest.py +++ b/test/FastbinaryTest.py @@ -79,6 +79,10 @@ rs.maps = {1:Wrapper({"foo":Empty()}),2:Wrapper({"foo":Empty()})} rs.bigint = 124523452435L rs.triple = 3.14 +# make sure this splits two buffers in a buffered protocol +rshuge = RandomStuff() +rshuge.myintlist=range(10000) + my_zero = Srv.Janky_result({"arg":5}) my_nega = Srv.Janky_args({"success":6}) @@ -98,9 +102,22 @@ def checkWrite(o): def checkRead(o): prot = TBinaryProtocol.TBinaryProtocol(TTransport.TMemoryBuffer()) o.write(prot) + + slow_version_binary = prot.trans.getvalue() + prot = TBinaryProtocol.TBinaryProtocolAccelerated( - TTransport.TMemoryBuffer( - prot.trans.getvalue())) + TTransport.TMemoryBuffer(slow_version_binary)) + c = o.__class__() + c.read(prot) + if c != o: + print "copy: " + pprint(eval(repr(c))) + print "orig: " + pprint(eval(repr(o))) + + prot = TBinaryProtocol.TBinaryProtocolAccelerated( + TTransport.TBufferedTransport( + TTransport.TMemoryBuffer(slow_version_binary))) c = o.__class__() c.read(prot) if c != o: @@ -117,6 +134,8 @@ def doTest(): checkRead(no_set) checkWrite(rs) checkRead(rs) + checkWrite(rshuge) + checkRead(rshuge) checkWrite(my_zero) checkRead(my_zero) checkRead(Backwards({"first_tag2":4, "second_tag1":2}))