THRIFT-1103. py: TZlibTransport for python, a zlib compressed transport
This patch adds a new TZlibTransport to the Python library and extends the test suite to exercise it.
Patch: Will Pierce
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1084276 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/test/py/RunClientServer.py b/test/py/RunClientServer.py
index dced91a..633856f 100755
--- a/test/py/RunClientServer.py
+++ b/test/py/RunClientServer.py
@@ -30,9 +30,18 @@
parser = OptionParser()
parser.add_option("--port", type="int", dest="port", default=9090,
help="port number for server to listen on")
+parser.add_option('-v', '--verbose', action="store_const",
+ dest="verbose", const=2,
+ help="verbose output")
+parser.add_option('-q', '--quiet', action="store_const",
+ dest="verbose", const=0,
+ help="minimal output")
+parser.set_defaults(verbose=1)
options, args = parser.parse_args()
FRAMED = ["TNonblockingServer"]
+SKIP_ZLIB = ['TNonblockingServer', 'THttpServer']
+SKIP_SSL = ['TNonblockingServer', 'THttpServer']
EXTRA_DELAY = ['TProcessPoolServer']
EXTRA_SLEEP = 3.5
@@ -58,6 +67,11 @@
print 'Warning: the multiprocessing module is unavailable. Skipping tests for TProcessPoolServer'
SERVERS.remove('TProcessPoolServer')
+try:
+ import ssl
+except:
+ print 'Warning, no ssl module available. Skipping all SSL tests.'
+ SKIP_SSL.extend(SERVERS)
# commandline permits a single class name to be specified to override SERVERS=[...]
if len(args) == 1:
@@ -71,41 +85,68 @@
def relfile(fname):
return os.path.join(os.path.dirname(__file__), fname)
-def runTest(server_class, proto, port):
- server_args = [sys.executable, # /usr/bin/python or similar
- relfile('TestServer.py'), # ./TestServer.py
- '--proto=%s' % proto, # accel, binary or compact
- '--port=%d' % port, # usually 9090, given on cmdline
- server_class] # name of class to test, from SERVERS[] or cmdline
- print "Testing server %s: %s" % (server_class, ' '.join(server_args))
- serverproc = subprocess.Popen(server_args)
- time.sleep(0.25)
- try:
- argv = [sys.executable, relfile("TestClient.py"),
- '--proto=%s' % (proto), '--port=%d' % (port) ]
- if server_class in FRAMED:
- argv.append('--framed')
- if server_class == 'THttpServer':
- argv.append('--http=/')
- print 'Testing client %s: %s' % (server_class, ' '.join(argv))
- ret = subprocess.call(argv)
- if ret != 0:
- raise Exception("subprocess %s failed, args: %s" % (server_class, ' '.join(argv)))
- finally:
- # check that server didn't die
- time.sleep(0.05)
- serverproc.poll()
- if serverproc.returncode is not None:
- print 'Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode)
- raise Exception('subprocess %s died, args: %s' % (server_class, ' '.join(server_args)))
- else:
- if server_class in EXTRA_DELAY:
- print 'Giving %s (proto=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class, proto, EXTRA_SLEEP)
- time.sleep(EXTRA_SLEEP)
- os.kill(serverproc.pid, signal.SIGKILL)
- # wait for shutdown
- time.sleep(0.5)
+def runTest(server_class, proto, port, use_zlib, use_ssl):
+ # Build command line arguments
+ server_args = [sys.executable, relfile('TestServer.py') ]
+ cli_args = [sys.executable, relfile('TestClient.py') ]
+ for which in (server_args, cli_args):
+ which.append('--proto=%s' % proto) # accel, binary or compact
+ which.append('--port=%d' % port) # default to 9090
+ if use_zlib:
+ which.append('--zlib')
+ if use_ssl:
+ which.append('--ssl')
+ if options.verbose == 0:
+ which.append('-q')
+ if options.verbose == 2:
+ which.append('-v')
+ # server-specific option to select server class
+ server_args.append(server_class)
+ # client-specific cmdline options
+ if server_class in FRAMED:
+ cli_args.append('--framed')
+ if server_class == 'THttpServer':
+ cli_args.append('--http=/')
+ if options.verbose > 0:
+ print 'Testing server %s: %s' % (server_class, ' '.join(server_args))
+ serverproc = subprocess.Popen(server_args)
+ time.sleep(0.2)
+ try:
+ if options.verbose > 0:
+ print 'Testing client: %s' % (' '.join(cli_args))
+ ret = subprocess.call(cli_args)
+ if ret != 0:
+ raise Exception("Client subprocess failed, retcode=%d, args: %s" % (ret, ' '.join(cli_args)))
+ finally:
+ # check that server didn't die
+ serverproc.poll()
+ if serverproc.returncode is not None:
+ print 'FAIL: Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode)
+ raise Exception('Server subprocess %s died, args: %s' % (server_class, ' '.join(server_args)))
+ else:
+ if server_class in EXTRA_DELAY:
+ if options.verbose > 0:
+ print 'Giving %s (proto=%s,zlib=%s,ssl=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class,
+ proto, use_zlib, use_ssl, EXTRA_SLEEP)
+ time.sleep(EXTRA_SLEEP)
+ os.kill(serverproc.pid, signal.SIGKILL)
+ # wait for shutdown
+ time.sleep(0.1)
+test_count = 0
for try_server in SERVERS:
for try_proto in PROTOS:
- runTest(try_server, try_proto, options.port)
+ for with_zlib in (False, True):
+ # skip any servers that don't work with the Zlib transport
+ if with_zlib and try_server in SKIP_ZLIB:
+ continue
+ for with_ssl in (False, True):
+ # skip any servers that don't work with SSL
+ if with_ssl and try_server in SKIP_SSL:
+ continue
+ test_count += 1
+ if options.verbose > 0:
+ print '\nTest run #%d: Server=%s, Proto=%s, zlib=%s, SSL=%s' % (test_count, try_server, try_proto, with_zlib, with_ssl)
+ runTest(try_server, try_proto, options.port, with_zlib, with_ssl)
+ if options.verbose > 0:
+ print 'OK: Finished %s / %s proto / zlib=%s / SSL=%s. %d combinations tested.' % (try_server, try_proto, with_zlib, with_ssl, test_count)
diff --git a/test/py/TestClient.py b/test/py/TestClient.py
index eecb850..6429ec3 100755
--- a/test/py/TestClient.py
+++ b/test/py/TestClient.py
@@ -28,6 +28,7 @@
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.transport import THttpClient
+from thrift.transport import TZlibTransport
from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
import unittest
@@ -40,6 +41,10 @@
help="connect to server at port")
parser.add_option("--host", type="string", dest="host",
help="connect to server")
+parser.add_option("--zlib", action="store_true", dest="zlib",
+ help="use zlib wrapper for compressed transport")
+parser.add_option("--ssl", action="store_true", dest="ssl",
+ help="use SSL for encrypted transport")
parser.add_option("--framed", action="store_true", dest="framed",
help="use framed transport")
parser.add_option("--http", dest="http_path",
@@ -58,19 +63,21 @@
class AbstractTest(unittest.TestCase):
def setUp(self):
if options.http_path:
- self.transport = THttpClient.THttpClient(
- options.host, options.port, options.http_path)
+ self.transport = THttpClient.THttpClient(options.host, port=options.port, path=options.http_path)
else:
- socket = TSocket.TSocket(options.host, options.port)
-
+ if options.ssl:
+ from thrift.transport import TSSLSocket
+ socket = TSSLSocket.TSSLSocket(options.host, options.port, validate=False)
+ else:
+ socket = TSocket.TSocket(options.host, options.port)
# frame or buffer depending upon args
if options.framed:
self.transport = TTransport.TFramedTransport(socket)
else:
self.transport = TTransport.TBufferedTransport(socket)
-
+ if options.zlib:
+ self.transport = TZlibTransport.TZlibTransport(self.transport, 9)
self.transport.open()
-
protocol = self.protocol_factory.getProtocol(self.transport)
self.client = ThriftTest.Client(protocol)
@@ -82,7 +89,7 @@
self.client.testVoid()
def testString(self):
- self.assertEqual(self.client.testString('Python'), 'Python')
+ self.assertEqual(self.client.testString('Python' * 20), 'Python' * 20)
self.assertEqual(self.client.testString(''), '')
def testByte(self):
diff --git a/test/py/TestServer.py b/test/py/TestServer.py
index 99d925a..fa62765 100755
--- a/test/py/TestServer.py
+++ b/test/py/TestServer.py
@@ -28,64 +28,76 @@
from ThriftTest.ttypes import *
from thrift.transport import TTransport
from thrift.transport import TSocket
+from thrift.transport import TZlibTransport
from thrift.protocol import TBinaryProtocol
from thrift.protocol import TCompactProtocol
from thrift.server import TServer, TNonblockingServer, THttpServer
+PROT_FACTORIES = {'binary': TBinaryProtocol.TBinaryProtocolFactory,
+ 'accel': TBinaryProtocol.TBinaryProtocolAcceleratedFactory,
+ 'compact': TCompactProtocol.TCompactProtocolFactory}
+
parser = OptionParser()
-parser.set_defaults(port=9090, verbose=1, proto='binary')
parser.add_option("--port", type="int", dest="port",
help="port number for server to listen on")
+parser.add_option("--zlib", action="store_true", dest="zlib",
+ help="use zlib wrapper for compressed transport")
+parser.add_option("--ssl", action="store_true", dest="ssl",
+ help="use SSL for encrypted transport")
parser.add_option('-v', '--verbose', action="store_const",
dest="verbose", const=2,
help="verbose output")
+parser.add_option('-q', '--quiet', action="store_const",
+ dest="verbose", const=0,
+ help="minimal output")
parser.add_option('--proto', dest="proto", type="string",
help="protocol to use, one of: accel, binary, compact")
+parser.set_defaults(port=9090, verbose=1, proto='binary')
options, args = parser.parse_args()
class TestHandler:
def testVoid(self):
- if options.verbose:
+ if options.verbose > 1:
print 'testVoid()'
def testString(self, str):
- if options.verbose:
+ if options.verbose > 1:
print 'testString(%s)' % str
return str
def testByte(self, byte):
- if options.verbose:
+ if options.verbose > 1:
print 'testByte(%d)' % byte
return byte
def testI16(self, i16):
- if options.verbose:
+ if options.verbose > 1:
print 'testI16(%d)' % i16
return i16
def testI32(self, i32):
- if options.verbose:
+ if options.verbose > 1:
print 'testI32(%d)' % i32
return i32
def testI64(self, i64):
- if options.verbose:
+ if options.verbose > 1:
print 'testI64(%d)' % i64
return i64
def testDouble(self, dub):
- if options.verbose:
+ if options.verbose > 1:
print 'testDouble(%f)' % dub
return dub
def testStruct(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testStruct({%s, %d, %d, %d})' % (thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing)
return thing
def testException(self, str):
- if options.verbose:
+ if options.verbose > 1:
print 'testException(%s)' % str
if str == 'Xception':
x = Xception()
@@ -96,90 +108,111 @@
raise ValueError("foo")
def testOneway(self, seconds):
- if options.verbose:
+ if options.verbose > 1:
print 'testOneway(%d) => sleeping...' % seconds
time.sleep(seconds / 3) # be quick
- if options.verbose:
+ if options.verbose > 1:
print 'done sleeping'
def testNest(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testNest(%s)' % thing
return thing
def testMap(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testMap(%s)' % thing
return thing
def testSet(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testSet(%s)' % thing
return thing
def testList(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testList(%s)' % thing
return thing
def testEnum(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testEnum(%s)' % thing
return thing
def testTypedef(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testTypedef(%s)' % thing
return thing
def testMapMap(self, thing):
- if options.verbose:
+ if options.verbose > 1:
print 'testMapMap(%s)' % thing
return thing
def testMulti(self, arg0, arg1, arg2, arg3, arg4, arg5):
- if options.verbose:
+ if options.verbose > 1:
print 'testMulti(%s)' % [arg0, arg1, arg2, arg3, arg4, arg5]
x = Xtruct(byte_thing=arg0, i32_thing=arg1, i64_thing=arg2)
return x
-if options.proto == 'binary':
- pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-elif options.proto == 'accel':
- pfactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory()
-elif options.proto == 'compact':
- pfactory = TCompactProtocol.TCompactProtocolFactory()
-else:
+# set up the protocol factory form the --proto option
+pfactory_cls = PROT_FACTORIES.get(options.proto, None)
+if pfactory_cls is None:
raise AssertionError('Unknown --proto option: %s' % options.proto)
+pfactory = pfactory_cls()
+
+# get the server type (TSimpleServer, TNonblockingServer, etc...)
+if len(args) > 1:
+ raise AssertionError('Only one server type may be specified, not multiple types.')
+server_type = args[0]
+
+# Set up the handler and processor objects
handler = TestHandler()
processor = ThriftTest.Processor(handler)
-if args[0] == "THttpServer":
- server = THttpServer.THttpServer(processor, ('', options.port), pfactory)
+# Handle THttpServer as a special case
+if server_type == 'THttpServer':
+ server =THttpServer.THttpServer(processor, ('', options.port), pfactory)
+ server.serve()
+ sys.exit(0)
+
+# set up server transport and transport factory
+host = None
+if options.ssl:
+ from thrift.transport import TSSLSocket
+ transport = TSSLSocket.TSSLServerSocket(host, options.port, certfile='test_cert.pem')
else:
- host = None
transport = TSocket.TServerSocket(host, options.port)
- tfactory = TTransport.TBufferedTransportFactory()
+tfactory = TTransport.TBufferedTransportFactory()
- if args[0] == "TNonblockingServer":
- server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
- elif args[0] == "TProcessPoolServer":
- import signal
- def set_alarm():
- def clean_shutdown(signum, frame):
- for worker in server.workers:
+# if --zlib, then wrap server transport, and use a different transport factory
+if options.zlib:
+ transport = TZlibTransport.TZlibTransport(transport) # wrap with zlib
+ tfactory = TZlibTransport.TZlibTransportFactory()
+
+# do server-specific setup here:
+if server_type == "TNonblockingServer":
+ server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory)
+elif server_type == "TProcessPoolServer":
+ import signal
+ from thrift.server import TProcessPoolServer
+ server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
+ server.setNumWorkers(5)
+ def set_alarm():
+ def clean_shutdown(signum, frame):
+ for worker in server.workers:
+ if options.verbose > 0:
print 'Terminating worker: %s' % worker
- worker.terminate()
+ worker.terminate()
+ if options.verbose > 0:
print 'Requesting server to stop()'
- server.stop()
- signal.signal(signal.SIGALRM, clean_shutdown)
- signal.alarm(2)
- from thrift.server import TProcessPoolServer
- server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory)
- server.setNumWorkers(5)
- set_alarm()
- else:
- ServerClass = getattr(TServer, args[0])
- server = ServerClass(processor, transport, tfactory, pfactory)
-
+ server.stop()
+ signal.signal(signal.SIGALRM, clean_shutdown)
+ signal.alarm(2)
+ set_alarm()
+else:
+ # look up server class dynamically to instantiate server
+ ServerClass = getattr(TServer, server_type)
+ server = ServerClass(processor, transport, tfactory, pfactory)
+# enter server main loop
server.serve()
diff --git a/test/py/test_cert.pem b/test/py/test_cert.pem
new file mode 100644
index 0000000..9b1a51f
--- /dev/null
+++ b/test/py/test_cert.pem
@@ -0,0 +1,28 @@
+-----BEGIN CERTIFICATE-----
+MIIB+zCCAWQCCQDyq++o7K0rpTANBgkqhkiG9w0BAQUFADBCMQswCQYDVQQGEwJV
+UzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBh
+bnkgTHRkMB4XDTExMDMxNjEzMTQ1NVoXDTIxMDMxMzEzMTQ1NVowQjELMAkGA1UE
+BhMCVVMxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoGA1UECgwTRGVmYXVsdCBD
+b21wYW55IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA9lmCuVQRqRBR
+OYVH+FMChSoF8IjMwfrpnC65J9RR88dUIZbjC2b+JPT5qiUVQft2NzPPwiBnXI2s
+j6AmHYVKoWGB24hNX8bj2cjtxdPpT2rvfAlIK0pat1C+kCxgRHIg++S7o6GEJOkw
+OQiopsUroAsIbSRT/Ird/A0+KeSqQ0sCAwEAATANBgkqhkiG9w0BAQUFAAOBgQDf
+WseEh6/3gWl/G44MyjljBvgRAa0c+eqFL/cVl7Zfh03/KOXMlPV5/snVUYBOJCCI
+qPuQwWToT+Q36kNQyMnG4e4gh+DmsiIhgQgA3lVSNDhPPfRrG1vDeeXXtybpEoke
+fI6o9a9olzrKWNvW+/8P9xIDlP0SRZxL66464LAQnw==
+-----END CERTIFICATE-----
+-----BEGIN RSA PRIVATE KEY-----
+MIICXwIBAAKBgQD2WYK5VBGpEFE5hUf4UwKFKgXwiMzB+umcLrkn1FHzx1QhluML
+Zv4k9PmqJRVB+3Y3M8/CIGdcjayPoCYdhUqhYYHbiE1fxuPZyO3F0+lPau98CUgr
+Slq3UL6QLGBEciD75LujoYQk6TA5CKimxSugCwhtJFP8it38DT4p5KpDSwIDAQAB
+AoGBAMcnA7Q5T3GifFeI9O6+hLoMh/K1VPq4kmStrQeS8JGoIc5pwbC1GV3dIXy4
+L+BAnofv/dQNCCJdchRGPqn82J/aOA/sMsJJ6VzTSr9NNVl9lgQHdLjEDoZ15yxT
+vVSc4nG2xBs7uZ/24fN/SJZVFO3+EdphOvrp7uEXLiXlqvopAkEA/h7XGlrULBIN
+ekjAzEJEchlZb4xJdPrH3P4LZs92ZlcO88GFr5wfOz1ytafLiZA9EzYwLIQTPdsk
+HHynJeZWtwJBAPgr9PYUJOdkhUeWVSN2PyqvWKrdQVKvM1VwNgRFaSPXgBd0WGIN
+Eym1b7wt6ngwNtfLx9FUOR6nl7MklsFLBA0CQQDnSiibqynLxs6PiyI3huUHOH1H
+YtcE6q/4Ox0jgRYRhZFtWKkVsbJXV9FM9yDw3uBH2R01lyxwM0GF0ArOGvy3AkEA
+7eEcjh/i+9Wzl1n3Q+WdSKoJAMbSTZJYT0Ye0NtDm7J+On0wFtRXkPw0HRmaDRiS
+CSlw4CquEb8tPu8Mfj0MpQJBAKwTLSdHsy0vxQQJXm0lTI+Ck9KJUM9vJzFuCL/x
+G6fmsqEttxjhyLnze+iIIRAu/IV+A5UrWnI1h728y/wRejw=
+-----END RSA PRIVATE KEY-----
diff --git a/test/py/test_cert.readme b/test/py/test_cert.readme
new file mode 100644
index 0000000..08bbbc9
--- /dev/null
+++ b/test/py/test_cert.readme
@@ -0,0 +1,7 @@
+NOTE:
+The test_cert.pem file in this directory must never be used on production systems.
+
+The key it represents is a self-signed key intended only for use by the unit
+testing framework. It is a publicly exposed private key.
+
+Do not use test_cert.pem in production environments under any circumstances.