| #!/usr/bin/env python | 
 |  | 
 | # | 
 | # Licensed to the Apache Software Foundation (ASF) under one | 
 | # or more contributor license agreements. See the NOTICE file | 
 | # distributed with this work for additional information | 
 | # regarding copyright ownership. The ASF licenses this file | 
 | # to you under the Apache License, Version 2.0 (the | 
 | # "License"); you may not use this file except in compliance | 
 | # with the License. You may obtain a copy of the License at | 
 | # | 
 | #   http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, | 
 | # software distributed under the License is distributed on an | 
 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | # KIND, either express or implied. See the License for the | 
 | # specific language governing permissions and limitations | 
 | # under the License. | 
 | # | 
 |  | 
 | import sys, glob | 
 | sys.path.insert(0, './gen-py') | 
 | sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0]) | 
 |  | 
 | from ThriftTest.ttypes import * | 
 | from thrift.transport import TTransport | 
 | from thrift.transport import TSocket | 
 | from thrift.protocol import TBinaryProtocol, TCompactProtocol | 
 | from thrift.TSerialization import serialize, deserialize | 
 | import unittest | 
 | import time | 
 |  | 
 | class AbstractTest(unittest.TestCase): | 
 |  | 
 |   def setUp(self): | 
 |       self.v1obj = VersioningTestV1( | 
 |           begin_in_both=12345, | 
 |           old_string='aaa', | 
 |           end_in_both=54321, | 
 |           ) | 
 |  | 
 |       self.v2obj = VersioningTestV2( | 
 |           begin_in_both=12345, | 
 |           newint=1, | 
 |           newbyte=2, | 
 |           newshort=3, | 
 |           newlong=4, | 
 |           newdouble=5.0, | 
 |           newstruct=Bonk(message="Hello!", type=123), | 
 |           newlist=[7,8,9], | 
 |           newset=[42,1,8], | 
 |           newmap={1:2,2:3}, | 
 |           newstring="Hola!", | 
 |           end_in_both=54321, | 
 |           ) | 
 |  | 
 |   def _serialize(self, obj): | 
 |       trans = TTransport.TMemoryBuffer() | 
 |       prot = self.protocol_factory.getProtocol(trans) | 
 |       obj.write(prot) | 
 |       return trans.getvalue() | 
 |  | 
 |   def _deserialize(self, objtype, data): | 
 |       prot = self.protocol_factory.getProtocol(TTransport.TMemoryBuffer(data)) | 
 |       ret = objtype() | 
 |       ret.read(prot) | 
 |       return ret | 
 |  | 
 |   def testForwards(self): | 
 |       obj = self._deserialize(VersioningTestV2, self._serialize(self.v1obj)) | 
 |       self.assertEquals(obj.begin_in_both, self.v1obj.begin_in_both) | 
 |       self.assertEquals(obj.end_in_both, self.v1obj.end_in_both) | 
 |  | 
 |   def testBackwards(self): | 
 |       obj = self._deserialize(VersioningTestV1, self._serialize(self.v2obj)) | 
 |       self.assertEquals(obj.begin_in_both, self.v2obj.begin_in_both) | 
 |       self.assertEquals(obj.end_in_both, self.v2obj.end_in_both) | 
 |  | 
 |  | 
 | class NormalBinaryTest(AbstractTest): | 
 |   protocol_factory = TBinaryProtocol.TBinaryProtocolFactory() | 
 |  | 
 | class AcceleratedBinaryTest(AbstractTest): | 
 |   protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory() | 
 |  | 
 | class CompactProtocolTest(AbstractTest): | 
 |   protocol_factory = TCompactProtocol.TCompactProtocolFactory() | 
 |  | 
 | class AcceleratedFramedTest(unittest.TestCase): | 
 |   def testSplit(self): | 
 |     """Test FramedTransport and BinaryProtocolAccelerated | 
 |  | 
 |     Tests that TBinaryProtocolAccelerated and TFramedTransport | 
 |     play nicely together when a read spans a frame""" | 
 |  | 
 |     protocol_factory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory() | 
 |     bigstring = "".join(chr(byte) for byte in range(ord("a"), ord("z")+1)) | 
 |  | 
 |     databuf = TTransport.TMemoryBuffer() | 
 |     prot = protocol_factory.getProtocol(databuf) | 
 |     prot.writeI32(42) | 
 |     prot.writeString(bigstring) | 
 |     prot.writeI16(24) | 
 |     data = databuf.getvalue() | 
 |     cutpoint = len(data)/2 | 
 |     parts = [ data[:cutpoint], data[cutpoint:] ] | 
 |  | 
 |     framed_buffer = TTransport.TMemoryBuffer() | 
 |     framed_writer = TTransport.TFramedTransport(framed_buffer) | 
 |     for part in parts: | 
 |       framed_writer.write(part) | 
 |       framed_writer.flush() | 
 |     self.assertEquals(len(framed_buffer.getvalue()), len(data) + 8) | 
 |  | 
 |     # Recreate framed_buffer so we can read from it. | 
 |     framed_buffer = TTransport.TMemoryBuffer(framed_buffer.getvalue()) | 
 |     framed_reader = TTransport.TFramedTransport(framed_buffer) | 
 |     prot = protocol_factory.getProtocol(framed_reader) | 
 |     self.assertEqual(prot.readI32(), 42) | 
 |     self.assertEqual(prot.readString(), bigstring) | 
 |     self.assertEqual(prot.readI16(), 24) | 
 |  | 
 | class SerializersTest(unittest.TestCase): | 
 |  | 
 |   def testSerializeThenDeserialize(self): | 
 |     obj = Xtruct2(i32_thing=1, | 
 |                   struct_thing=Xtruct(string_thing="foo")) | 
 |  | 
 |     s1 = serialize(obj) | 
 |     for i in range(10): | 
 |       self.assertEquals(s1, serialize(obj)) | 
 |       objcopy = Xtruct2() | 
 |       deserialize(objcopy, serialize(obj)) | 
 |       self.assertEquals(obj, objcopy) | 
 |  | 
 |     obj = Xtruct(string_thing="bar") | 
 |     objcopy = Xtruct() | 
 |     deserialize(objcopy, serialize(obj)) | 
 |     self.assertEquals(obj, objcopy) | 
 |  | 
 |  | 
 | def suite(): | 
 |   suite = unittest.TestSuite() | 
 |   loader = unittest.TestLoader() | 
 |  | 
 |   suite.addTest(loader.loadTestsFromTestCase(NormalBinaryTest)) | 
 |   suite.addTest(loader.loadTestsFromTestCase(AcceleratedBinaryTest)) | 
 |   suite.addTest(loader.loadTestsFromTestCase(CompactProtocolTest)) | 
 |   suite.addTest(loader.loadTestsFromTestCase(AcceleratedFramedTest)) | 
 |   suite.addTest(loader.loadTestsFromTestCase(SerializersTest)) | 
 |   return suite | 
 |  | 
 | if __name__ == "__main__": | 
 |   unittest.main(defaultTest="suite", testRunner=unittest.TextTestRunner(verbosity=2)) |