From: Bryan Duxbury Date: Tue, 22 Mar 2011 18:06:04 +0000 (+0000) Subject: THRIFT-1103. py: TZlibTransport for python, a zlib compressed transport X-Git-Tag: 0.7.0~141 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=1606659171d9ee8885d5806d6030ec39399b3b08;p=common%2Fthrift.git THRIFT-1103. py: TZlibTransport for python, a zlib compressed transport This patch adds a new TZlibTransport to the Python library and extends the test suite to exercise it. Patch: Will Pierce git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1084276 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/py/src/transport/TSSLSocket.py b/lib/py/src/transport/TSSLSocket.py index 5eff5e61..be358448 100644 --- a/lib/py/src/transport/TSSLSocket.py +++ b/lib/py/src/transport/TSSLSocket.py @@ -36,7 +36,7 @@ class TSSLSocket(TSocket.TSocket): """ SSL_VERSION = ssl.PROTOCOL_TLSv1 - def __init__(self, validate=True, ca_certs=None, *args, **kwargs): + def __init__(self, host='localhost', port=9090, validate=True, ca_certs=None, unix_socket=None): """ @param validate: Set to False to disable SSL certificate validation entirely. @type validate: bool @@ -56,10 +56,10 @@ class TSSLSocket(TSocket.TSocket): else: self.cert_reqs = ssl.CERT_REQUIRED self.ca_certs = ca_certs - if validate and ca_certs is not None: - if not os.access(ca_certs, os.R_OK): + if validate: + if ca_certs is None or not os.access(ca_certs, os.R_OK): raise IOError('Certificate Authority ca_certs file "%s" is not readable, cannot validate SSL certificates.' % (ca_certs)) - TSocket.TSocket.__init__(self, *args, **kwargs) + TSocket.TSocket.__init__(self, host, port, unix_socket) def open(self): try: @@ -131,7 +131,7 @@ class TSSLServerSocket(TSocket.TServerSocket): """ SSL_VERSION = ssl.PROTOCOL_TLSv1 - def __init__(self, certfile='cert.pem', *args, **kwargs): + def __init__(self, host=None, port=9090, certfile='cert.pem', unix_socket=None): """Initialize a TSSLServerSocket @param certfile: The filename of the server certificate file, defaults to cert.pem @@ -143,7 +143,7 @@ class TSSLServerSocket(TSocket.TServerSocket): @type port: int """ self.setCertfile(certfile) - TSocket.TServerSocket.__init__(self, *args, **kwargs) + TSocket.TServerSocket.__init__(self, host, port) def setCertfile(self, certfile): """Set or change the server certificate file used to wrap new connections. @@ -159,8 +159,18 @@ class TSSLServerSocket(TSocket.TServerSocket): def accept(self): plain_client, addr = self.handle.accept() + try: + client = ssl.wrap_socket(plain_client, certfile=self.certfile, + server_side=True, ssl_version=self.SSL_VERSION) + except ssl.SSLError, ssl_exc: + # failed handshake/ssl wrap, close socket to client + plain_client.close() + # raise ssl_exc + # We can't raise the exception, because it kills most TServer derived serve() + # methods. + # Instead, return None, and let the TServer instance deal with it in + # other exception handling. (but TSimpleServer dies anyway) + return None result = TSocket.TSocket() - client = ssl.wrap_socket(plain_client, certfile=self.certfile, - server_side=True, ssl_version=self.SSL_VERSION) result.setHandle(client) return result diff --git a/lib/py/src/transport/TZlibTransport.py b/lib/py/src/transport/TZlibTransport.py new file mode 100644 index 00000000..784d4e1e --- /dev/null +++ b/lib/py/src/transport/TZlibTransport.py @@ -0,0 +1,261 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +''' +TZlibTransport provides a compressed transport and transport factory +class, using the python standard library zlib module to implement +data compression. +''' + +from __future__ import division +import zlib +from cStringIO import StringIO +from TTransport import TTransportBase, CReadableTransport + +class TZlibTransportFactory(object): + ''' + Factory transport that builds zlib compressed transports. + + This factory caches the last single client/transport that it was passed + and returns the same TZlibTransport object that was created. + + This caching means the TServer class will get the _same_ transport + object for both input and output transports from this factory. + (For non-threaded scenarios only, since the cache only holds one object) + + The purpose of this caching is to allocate only one TZlibTransport where + only one is really needed (since it must have separate read/write buffers), + and makes the statistics from getCompSavings() and getCompRatio() + easier to understand. + ''' + + # class scoped cache of last transport given and zlibtransport returned + _last_trans = None + _last_z = None + + def getTransport(self, trans, compresslevel=9): + '''Wrap a transport , trans, with the TZlibTransport + compressed transport class, returning a new + transport to the caller. + + @param compresslevel: The zlib compression level, ranging + from 0 (no compression) to 9 (best compression). Defaults to 9. + @type compresslevel: int + + This method returns a TZlibTransport which wraps the + passed C{trans} TTransport derived instance. + ''' + if trans == self._last_trans: + return self._last_z + ztrans = TZlibTransport(trans, compresslevel) + self._last_trans = trans + self._last_z = ztrans + return ztrans + + +class TZlibTransport(TTransportBase, CReadableTransport): + ''' + Class that wraps a transport with zlib, compressing writes + and decompresses reads, using the python standard + library zlib module. + ''' + + # Read buffer size for the python fastbinary C extension, + # the TBinaryProtocolAccelerated class. + DEFAULT_BUFFSIZE = 4096 + + def __init__(self, trans, compresslevel=9): + ''' + Create a new TZlibTransport, wrapping C{trans}, another + TTransport derived object. + + @param trans: A thrift transport object, i.e. a TSocket() object. + @type trans: TTransport + @param compresslevel: The zlib compression level, ranging + from 0 (no compression) to 9 (best compression). Default is 9. + @type compresslevel: int + ''' + self.__trans = trans + self.compresslevel = compresslevel + self.__rbuf = StringIO() + self.__wbuf = StringIO() + self._init_zlib() + self._init_stats() + + def _reinit_buffers(self): + ''' + Internal method to initialize/reset the internal StringIO objects + for read and write buffers. + ''' + self.__rbuf = StringIO() + self.__wbuf = StringIO() + + def _init_stats(self): + ''' + Internal method to reset the internal statistics counters + for compression ratios and bandwidth savings. + ''' + self.bytes_in = 0 + self.bytes_out = 0 + self.bytes_in_comp = 0 + self.bytes_out_comp = 0 + + def _init_zlib(self): + ''' + Internal method for setting up the zlib compression and + decompression objects. + ''' + self._zcomp_read = zlib.decompressobj() + self._zcomp_write = zlib.compressobj(self.compresslevel) + + def getCompRatio(self): + ''' + Get the current measured compression ratios (in,out) from + this transport. + + Returns a tuple of: + (inbound_compression_ratio, outbound_compression_ratio) + + The compression ratios are computed as: + compressed / uncompressed + + E.g., data that compresses by 10x will have a ratio of: 0.10 + and data that compresses to half of ts original size will + have a ratio of 0.5 + + None is returned if no bytes have yet been processed in + a particular direction. + ''' + r_percent, w_percent = (None, None) + if self.bytes_in > 0: + r_percent = self.bytes_in_comp / self.bytes_in + if self.bytes_out > 0: + w_percent = self.bytes_out_comp / self.bytes_out + return (r_percent, w_percent) + + def getCompSavings(self): + ''' + Get the current count of saved bytes due to data + compression. + + Returns a tuple of: + (inbound_saved_bytes, outbound_saved_bytes) + + Note: if compression is actually expanding your + data (only likely with very tiny thrift objects), then + the values returned will be negative. + ''' + r_saved = self.bytes_in - self.bytes_in_comp + w_saved = self.bytes_out - self.bytes_out_comp + return (r_saved, w_saved) + + def isOpen(self): + '''Return the underlying transport's open status''' + return self.__trans.isOpen() + + def open(self): + """Open the underlying transport""" + self._init_stats() + return self.__trans.open() + + def listen(self): + '''Invoke the underlying transport's listen() method''' + self.__trans.listen() + + def accept(self): + '''Accept connections on the underlying transport''' + return self.__trans.accept() + + def close(self): + '''Close the underlying transport,''' + self._reinit_buffers() + self._init_zlib() + return self.__trans.close() + + def read(self, sz): + ''' + Read up to sz bytes from the decompressed bytes buffer, and + read from the underlying transport if the decompression + buffer is empty. + ''' + ret = self.__rbuf.read(sz) + if len(ret) > 0: + return ret + # keep reading from transport until something comes back + while True: + if self.readComp(sz): + break + ret = self.__rbuf.read(sz) + return ret + + def readComp(self, sz): + ''' + Read compressed data from the underlying transport, then + decompress it and append it to the internal StringIO read buffer + ''' + zbuf = self.__trans.read(sz) + zbuf = self._zcomp_read.unconsumed_tail + zbuf + buf = self._zcomp_read.decompress(zbuf) + self.bytes_in += len(zbuf) + self.bytes_in_comp += len(buf) + old = self.__rbuf.read() + self.__rbuf = StringIO(old + buf) + if len(old) + len(buf) == 0: + return False + return True + + def write(self, buf): + ''' + Write some bytes, putting them into the internal write + buffer for eventual compression. + ''' + self.__wbuf.write(buf) + + def flush(self): + ''' + Flush any queued up data in the write buffer and ensure the + compression buffer is flushed out to the underlying transport + ''' + wout = self.__wbuf.getvalue() + if len(wout) > 0: + zbuf = self._zcomp_write.compress(wout) + self.bytes_out += len(wout) + self.bytes_out_comp += len(zbuf) + else: + zbuf = '' + ztail = self._zcomp_write.flush(zlib.Z_SYNC_FLUSH) + self.bytes_out_comp += len(ztail) + if (len(zbuf) + len(ztail)) > 0: + self.__wbuf = StringIO() + self.__trans.write(zbuf + ztail) + self.__trans.flush() + + @property + def cstringio_buf(self): + '''Implement the CReadableTransport interface''' + return self.__rbuf + + def cstringio_refill(self, partialread, reqlen): + '''Implement the CReadableTransport interface for refill''' + retstring = partialread + if reqlen < self.DEFAULT_BUFFSIZE: + retstring += self.read(self.DEFAULT_BUFFSIZE) + while len(retstring) < reqlen: + retstring += self.read(reqlen - len(retstring)) + self.__rbuf = StringIO(retstring) + return self.__rbuf diff --git a/lib/py/src/transport/__init__.py b/lib/py/src/transport/__init__.py index 02c6048a..46e54fe6 100644 --- a/lib/py/src/transport/__init__.py +++ b/lib/py/src/transport/__init__.py @@ -17,4 +17,4 @@ # under the License. # -__all__ = ['TTransport', 'TSocket', 'THttpClient'] +__all__ = ['TTransport', 'TSocket', 'THttpClient','TZlibTransport'] diff --git a/test/py/RunClientServer.py b/test/py/RunClientServer.py index dced91ad..633856f5 100755 --- a/test/py/RunClientServer.py +++ b/test/py/RunClientServer.py @@ -30,9 +30,18 @@ from optparse import OptionParser parser = OptionParser() parser.add_option("--port", type="int", dest="port", default=9090, help="port number for server to listen on") +parser.add_option('-v', '--verbose', action="store_const", + dest="verbose", const=2, + help="verbose output") +parser.add_option('-q', '--quiet', action="store_const", + dest="verbose", const=0, + help="minimal output") +parser.set_defaults(verbose=1) options, args = parser.parse_args() FRAMED = ["TNonblockingServer"] +SKIP_ZLIB = ['TNonblockingServer', 'THttpServer'] +SKIP_SSL = ['TNonblockingServer', 'THttpServer'] EXTRA_DELAY = ['TProcessPoolServer'] EXTRA_SLEEP = 3.5 @@ -58,6 +67,11 @@ except: print 'Warning: the multiprocessing module is unavailable. Skipping tests for TProcessPoolServer' SERVERS.remove('TProcessPoolServer') +try: + import ssl +except: + print 'Warning, no ssl module available. Skipping all SSL tests.' + SKIP_SSL.extend(SERVERS) # commandline permits a single class name to be specified to override SERVERS=[...] if len(args) == 1: @@ -71,41 +85,68 @@ if len(args) == 1: def relfile(fname): return os.path.join(os.path.dirname(__file__), fname) -def runTest(server_class, proto, port): - server_args = [sys.executable, # /usr/bin/python or similar - relfile('TestServer.py'), # ./TestServer.py - '--proto=%s' % proto, # accel, binary or compact - '--port=%d' % port, # usually 9090, given on cmdline - server_class] # name of class to test, from SERVERS[] or cmdline - print "Testing server %s: %s" % (server_class, ' '.join(server_args)) - serverproc = subprocess.Popen(server_args) - time.sleep(0.25) - try: - argv = [sys.executable, relfile("TestClient.py"), - '--proto=%s' % (proto), '--port=%d' % (port) ] - if server_class in FRAMED: - argv.append('--framed') - if server_class == 'THttpServer': - argv.append('--http=/') - print 'Testing client %s: %s' % (server_class, ' '.join(argv)) - ret = subprocess.call(argv) - if ret != 0: - raise Exception("subprocess %s failed, args: %s" % (server_class, ' '.join(argv))) - finally: - # check that server didn't die - time.sleep(0.05) - serverproc.poll() - if serverproc.returncode is not None: - print 'Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode) - raise Exception('subprocess %s died, args: %s' % (server_class, ' '.join(server_args))) - else: - if server_class in EXTRA_DELAY: - print 'Giving %s (proto=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class, proto, EXTRA_SLEEP) - time.sleep(EXTRA_SLEEP) - os.kill(serverproc.pid, signal.SIGKILL) - # wait for shutdown - time.sleep(0.5) +def runTest(server_class, proto, port, use_zlib, use_ssl): + # Build command line arguments + server_args = [sys.executable, relfile('TestServer.py') ] + cli_args = [sys.executable, relfile('TestClient.py') ] + for which in (server_args, cli_args): + which.append('--proto=%s' % proto) # accel, binary or compact + which.append('--port=%d' % port) # default to 9090 + if use_zlib: + which.append('--zlib') + if use_ssl: + which.append('--ssl') + if options.verbose == 0: + which.append('-q') + if options.verbose == 2: + which.append('-v') + # server-specific option to select server class + server_args.append(server_class) + # client-specific cmdline options + if server_class in FRAMED: + cli_args.append('--framed') + if server_class == 'THttpServer': + cli_args.append('--http=/') + if options.verbose > 0: + print 'Testing server %s: %s' % (server_class, ' '.join(server_args)) + serverproc = subprocess.Popen(server_args) + time.sleep(0.2) + try: + if options.verbose > 0: + print 'Testing client: %s' % (' '.join(cli_args)) + ret = subprocess.call(cli_args) + if ret != 0: + raise Exception("Client subprocess failed, retcode=%d, args: %s" % (ret, ' '.join(cli_args))) + finally: + # check that server didn't die + serverproc.poll() + if serverproc.returncode is not None: + print 'FAIL: Server process (%s) failed with retcode %d' % (' '.join(server_args), serverproc.returncode) + raise Exception('Server subprocess %s died, args: %s' % (server_class, ' '.join(server_args))) + else: + if server_class in EXTRA_DELAY: + if options.verbose > 0: + print 'Giving %s (proto=%s,zlib=%s,ssl=%s) an extra %d seconds for child processes to terminate via alarm' % (server_class, + proto, use_zlib, use_ssl, EXTRA_SLEEP) + time.sleep(EXTRA_SLEEP) + os.kill(serverproc.pid, signal.SIGKILL) + # wait for shutdown + time.sleep(0.1) +test_count = 0 for try_server in SERVERS: for try_proto in PROTOS: - runTest(try_server, try_proto, options.port) + for with_zlib in (False, True): + # skip any servers that don't work with the Zlib transport + if with_zlib and try_server in SKIP_ZLIB: + continue + for with_ssl in (False, True): + # skip any servers that don't work with SSL + if with_ssl and try_server in SKIP_SSL: + continue + test_count += 1 + if options.verbose > 0: + print '\nTest run #%d: Server=%s, Proto=%s, zlib=%s, SSL=%s' % (test_count, try_server, try_proto, with_zlib, with_ssl) + runTest(try_server, try_proto, options.port, with_zlib, with_ssl) + if options.verbose > 0: + print 'OK: Finished %s / %s proto / zlib=%s / SSL=%s. %d combinations tested.' % (try_server, try_proto, with_zlib, with_ssl, test_count) diff --git a/test/py/TestClient.py b/test/py/TestClient.py index eecb8500..6429ec37 100755 --- a/test/py/TestClient.py +++ b/test/py/TestClient.py @@ -28,6 +28,7 @@ from ThriftTest.ttypes import * from thrift.transport import TTransport from thrift.transport import TSocket from thrift.transport import THttpClient +from thrift.transport import TZlibTransport from thrift.protocol import TBinaryProtocol from thrift.protocol import TCompactProtocol import unittest @@ -40,6 +41,10 @@ parser.add_option("--port", type="int", dest="port", help="connect to server at port") parser.add_option("--host", type="string", dest="host", help="connect to server") +parser.add_option("--zlib", action="store_true", dest="zlib", + help="use zlib wrapper for compressed transport") +parser.add_option("--ssl", action="store_true", dest="ssl", + help="use SSL for encrypted transport") parser.add_option("--framed", action="store_true", dest="framed", help="use framed transport") parser.add_option("--http", dest="http_path", @@ -58,19 +63,21 @@ options, args = parser.parse_args() class AbstractTest(unittest.TestCase): def setUp(self): if options.http_path: - self.transport = THttpClient.THttpClient( - options.host, options.port, options.http_path) + self.transport = THttpClient.THttpClient(options.host, port=options.port, path=options.http_path) else: - socket = TSocket.TSocket(options.host, options.port) - + if options.ssl: + from thrift.transport import TSSLSocket + socket = TSSLSocket.TSSLSocket(options.host, options.port, validate=False) + else: + socket = TSocket.TSocket(options.host, options.port) # frame or buffer depending upon args if options.framed: self.transport = TTransport.TFramedTransport(socket) else: self.transport = TTransport.TBufferedTransport(socket) - + if options.zlib: + self.transport = TZlibTransport.TZlibTransport(self.transport, 9) self.transport.open() - protocol = self.protocol_factory.getProtocol(self.transport) self.client = ThriftTest.Client(protocol) @@ -82,7 +89,7 @@ class AbstractTest(unittest.TestCase): self.client.testVoid() def testString(self): - self.assertEqual(self.client.testString('Python'), 'Python') + self.assertEqual(self.client.testString('Python' * 20), 'Python' * 20) self.assertEqual(self.client.testString(''), '') def testByte(self): diff --git a/test/py/TestServer.py b/test/py/TestServer.py index 99d925a2..fa627650 100755 --- a/test/py/TestServer.py +++ b/test/py/TestServer.py @@ -28,64 +28,76 @@ from ThriftTest import ThriftTest from ThriftTest.ttypes import * from thrift.transport import TTransport from thrift.transport import TSocket +from thrift.transport import TZlibTransport from thrift.protocol import TBinaryProtocol from thrift.protocol import TCompactProtocol from thrift.server import TServer, TNonblockingServer, THttpServer +PROT_FACTORIES = {'binary': TBinaryProtocol.TBinaryProtocolFactory, + 'accel': TBinaryProtocol.TBinaryProtocolAcceleratedFactory, + 'compact': TCompactProtocol.TCompactProtocolFactory} + parser = OptionParser() -parser.set_defaults(port=9090, verbose=1, proto='binary') parser.add_option("--port", type="int", dest="port", help="port number for server to listen on") +parser.add_option("--zlib", action="store_true", dest="zlib", + help="use zlib wrapper for compressed transport") +parser.add_option("--ssl", action="store_true", dest="ssl", + help="use SSL for encrypted transport") parser.add_option('-v', '--verbose', action="store_const", dest="verbose", const=2, help="verbose output") +parser.add_option('-q', '--quiet', action="store_const", + dest="verbose", const=0, + help="minimal output") parser.add_option('--proto', dest="proto", type="string", help="protocol to use, one of: accel, binary, compact") +parser.set_defaults(port=9090, verbose=1, proto='binary') options, args = parser.parse_args() class TestHandler: def testVoid(self): - if options.verbose: + if options.verbose > 1: print 'testVoid()' def testString(self, str): - if options.verbose: + if options.verbose > 1: print 'testString(%s)' % str return str def testByte(self, byte): - if options.verbose: + if options.verbose > 1: print 'testByte(%d)' % byte return byte def testI16(self, i16): - if options.verbose: + if options.verbose > 1: print 'testI16(%d)' % i16 return i16 def testI32(self, i32): - if options.verbose: + if options.verbose > 1: print 'testI32(%d)' % i32 return i32 def testI64(self, i64): - if options.verbose: + if options.verbose > 1: print 'testI64(%d)' % i64 return i64 def testDouble(self, dub): - if options.verbose: + if options.verbose > 1: print 'testDouble(%f)' % dub return dub def testStruct(self, thing): - if options.verbose: + if options.verbose > 1: print 'testStruct({%s, %d, %d, %d})' % (thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing) return thing def testException(self, str): - if options.verbose: + if options.verbose > 1: print 'testException(%s)' % str if str == 'Xception': x = Xception() @@ -96,90 +108,111 @@ class TestHandler: raise ValueError("foo") def testOneway(self, seconds): - if options.verbose: + if options.verbose > 1: print 'testOneway(%d) => sleeping...' % seconds time.sleep(seconds / 3) # be quick - if options.verbose: + if options.verbose > 1: print 'done sleeping' def testNest(self, thing): - if options.verbose: + if options.verbose > 1: print 'testNest(%s)' % thing return thing def testMap(self, thing): - if options.verbose: + if options.verbose > 1: print 'testMap(%s)' % thing return thing def testSet(self, thing): - if options.verbose: + if options.verbose > 1: print 'testSet(%s)' % thing return thing def testList(self, thing): - if options.verbose: + if options.verbose > 1: print 'testList(%s)' % thing return thing def testEnum(self, thing): - if options.verbose: + if options.verbose > 1: print 'testEnum(%s)' % thing return thing def testTypedef(self, thing): - if options.verbose: + if options.verbose > 1: print 'testTypedef(%s)' % thing return thing def testMapMap(self, thing): - if options.verbose: + if options.verbose > 1: print 'testMapMap(%s)' % thing return thing def testMulti(self, arg0, arg1, arg2, arg3, arg4, arg5): - if options.verbose: + if options.verbose > 1: print 'testMulti(%s)' % [arg0, arg1, arg2, arg3, arg4, arg5] x = Xtruct(byte_thing=arg0, i32_thing=arg1, i64_thing=arg2) return x -if options.proto == 'binary': - pfactory = TBinaryProtocol.TBinaryProtocolFactory() -elif options.proto == 'accel': - pfactory = TBinaryProtocol.TBinaryProtocolAcceleratedFactory() -elif options.proto == 'compact': - pfactory = TCompactProtocol.TCompactProtocolFactory() -else: +# set up the protocol factory form the --proto option +pfactory_cls = PROT_FACTORIES.get(options.proto, None) +if pfactory_cls is None: raise AssertionError('Unknown --proto option: %s' % options.proto) +pfactory = pfactory_cls() + +# get the server type (TSimpleServer, TNonblockingServer, etc...) +if len(args) > 1: + raise AssertionError('Only one server type may be specified, not multiple types.') +server_type = args[0] + +# Set up the handler and processor objects handler = TestHandler() processor = ThriftTest.Processor(handler) -if args[0] == "THttpServer": - server = THttpServer.THttpServer(processor, ('', options.port), pfactory) +# Handle THttpServer as a special case +if server_type == 'THttpServer': + server =THttpServer.THttpServer(processor, ('', options.port), pfactory) + server.serve() + sys.exit(0) + +# set up server transport and transport factory +host = None +if options.ssl: + from thrift.transport import TSSLSocket + transport = TSSLSocket.TSSLServerSocket(host, options.port, certfile='test_cert.pem') else: - host = None transport = TSocket.TServerSocket(host, options.port) - tfactory = TTransport.TBufferedTransportFactory() - - if args[0] == "TNonblockingServer": - server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory) - elif args[0] == "TProcessPoolServer": - import signal - def set_alarm(): - def clean_shutdown(signum, frame): - for worker in server.workers: +tfactory = TTransport.TBufferedTransportFactory() + +# if --zlib, then wrap server transport, and use a different transport factory +if options.zlib: + transport = TZlibTransport.TZlibTransport(transport) # wrap with zlib + tfactory = TZlibTransport.TZlibTransportFactory() + +# do server-specific setup here: +if server_type == "TNonblockingServer": + server = TNonblockingServer.TNonblockingServer(processor, transport, inputProtocolFactory=pfactory) +elif server_type == "TProcessPoolServer": + import signal + from thrift.server import TProcessPoolServer + server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory) + server.setNumWorkers(5) + def set_alarm(): + def clean_shutdown(signum, frame): + for worker in server.workers: + if options.verbose > 0: print 'Terminating worker: %s' % worker - worker.terminate() + worker.terminate() + if options.verbose > 0: print 'Requesting server to stop()' - server.stop() - signal.signal(signal.SIGALRM, clean_shutdown) - signal.alarm(2) - from thrift.server import TProcessPoolServer - server = TProcessPoolServer.TProcessPoolServer(processor, transport, tfactory, pfactory) - server.setNumWorkers(5) - set_alarm() - else: - ServerClass = getattr(TServer, args[0]) - server = ServerClass(processor, transport, tfactory, pfactory) - + server.stop() + signal.signal(signal.SIGALRM, clean_shutdown) + signal.alarm(2) + set_alarm() +else: + # look up server class dynamically to instantiate server + ServerClass = getattr(TServer, server_type) + server = ServerClass(processor, transport, tfactory, pfactory) +# enter server main loop server.serve() diff --git a/test/py/test_cert.pem b/test/py/test_cert.pem new file mode 100644 index 00000000..9b1a51f4 --- /dev/null +++ b/test/py/test_cert.pem @@ -0,0 +1,28 @@ +-----BEGIN CERTIFICATE----- +MIIB+zCCAWQCCQDyq++o7K0rpTANBgkqhkiG9w0BAQUFADBCMQswCQYDVQQGEwJV +UzEVMBMGA1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBh +bnkgTHRkMB4XDTExMDMxNjEzMTQ1NVoXDTIxMDMxMzEzMTQ1NVowQjELMAkGA1UE +BhMCVVMxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoGA1UECgwTRGVmYXVsdCBD +b21wYW55IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA9lmCuVQRqRBR +OYVH+FMChSoF8IjMwfrpnC65J9RR88dUIZbjC2b+JPT5qiUVQft2NzPPwiBnXI2s +j6AmHYVKoWGB24hNX8bj2cjtxdPpT2rvfAlIK0pat1C+kCxgRHIg++S7o6GEJOkw +OQiopsUroAsIbSRT/Ird/A0+KeSqQ0sCAwEAATANBgkqhkiG9w0BAQUFAAOBgQDf +WseEh6/3gWl/G44MyjljBvgRAa0c+eqFL/cVl7Zfh03/KOXMlPV5/snVUYBOJCCI +qPuQwWToT+Q36kNQyMnG4e4gh+DmsiIhgQgA3lVSNDhPPfRrG1vDeeXXtybpEoke +fI6o9a9olzrKWNvW+/8P9xIDlP0SRZxL66464LAQnw== +-----END CERTIFICATE----- +-----BEGIN RSA PRIVATE KEY----- +MIICXwIBAAKBgQD2WYK5VBGpEFE5hUf4UwKFKgXwiMzB+umcLrkn1FHzx1QhluML +Zv4k9PmqJRVB+3Y3M8/CIGdcjayPoCYdhUqhYYHbiE1fxuPZyO3F0+lPau98CUgr +Slq3UL6QLGBEciD75LujoYQk6TA5CKimxSugCwhtJFP8it38DT4p5KpDSwIDAQAB +AoGBAMcnA7Q5T3GifFeI9O6+hLoMh/K1VPq4kmStrQeS8JGoIc5pwbC1GV3dIXy4 +L+BAnofv/dQNCCJdchRGPqn82J/aOA/sMsJJ6VzTSr9NNVl9lgQHdLjEDoZ15yxT +vVSc4nG2xBs7uZ/24fN/SJZVFO3+EdphOvrp7uEXLiXlqvopAkEA/h7XGlrULBIN +ekjAzEJEchlZb4xJdPrH3P4LZs92ZlcO88GFr5wfOz1ytafLiZA9EzYwLIQTPdsk +HHynJeZWtwJBAPgr9PYUJOdkhUeWVSN2PyqvWKrdQVKvM1VwNgRFaSPXgBd0WGIN +Eym1b7wt6ngwNtfLx9FUOR6nl7MklsFLBA0CQQDnSiibqynLxs6PiyI3huUHOH1H +YtcE6q/4Ox0jgRYRhZFtWKkVsbJXV9FM9yDw3uBH2R01lyxwM0GF0ArOGvy3AkEA +7eEcjh/i+9Wzl1n3Q+WdSKoJAMbSTZJYT0Ye0NtDm7J+On0wFtRXkPw0HRmaDRiS +CSlw4CquEb8tPu8Mfj0MpQJBAKwTLSdHsy0vxQQJXm0lTI+Ck9KJUM9vJzFuCL/x +G6fmsqEttxjhyLnze+iIIRAu/IV+A5UrWnI1h728y/wRejw= +-----END RSA PRIVATE KEY----- diff --git a/test/py/test_cert.readme b/test/py/test_cert.readme new file mode 100644 index 00000000..08bbbc9f --- /dev/null +++ b/test/py/test_cert.readme @@ -0,0 +1,7 @@ +NOTE: +The test_cert.pem file in this directory must never be used on production systems. + +The key it represents is a self-signed key intended only for use by the unit +testing framework. It is a publicly exposed private key. + +Do not use test_cert.pem in production environments under any circumstances.