From 4f0fed6c760988ecc41c6c30895fef5a5c8cef96 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Mon, 2 Oct 2006 17:50:08 +0000 Subject: [PATCH] Python basic threaded server Reviewed By: ccheever-pillar git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664812 13f79535-47bb-0310-9956-ffa450edef68 --- lib/py/src/server/TServer.py | 30 +++++++++++++++++++++++++++++- lib/py/src/transport/TSocket.py | 11 +++++++---- lib/py/src/transport/TTransport.py | 16 ++++++++++++---- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py index c9a701ea..53260b87 100644 --- a/lib/py/src/server/TServer.py +++ b/lib/py/src/server/TServer.py @@ -34,9 +34,37 @@ class TSimpleServer(TServer): try: while True: self.processor.process(input, output) + except TTransport.TTransportException, tx: + pass except Exception, x: print '%s, %s, %s' % (type(x), x, traceback.format_exc()) - print 'Client died.' input.close() output.close() + +class TThreadedServer(TServer): + + """Threaded server that spawns a new thread per each connection.""" + + def __init__(self, processor, serverTransport, transportFactory=None): + TServer.__init__(self, processor, serverTransport, transportFactory) + + def serve(self): + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + t = threading.Thread(target = self.handle, args=(client,)) + t.start() + except Exception, x: + print '%s, %s, %s,' % (type(x), x, traceback.format_exc()) + + def handle(self, client): + (input, output) = self.transportFactory.getIOTransports(client) + try: + while True: + self.processor.process(input, output) + except TTransport.TTransportException, tx: + pass + except Exception, x: + print '%s, %s, %s' % (type(x), x, traceback.format_exc()) diff --git a/lib/py/src/transport/TSocket.py b/lib/py/src/transport/TSocket.py index 2c7dd3ee..dd4a1666 100644 --- a/lib/py/src/transport/TSocket.py +++ b/lib/py/src/transport/TSocket.py @@ -10,7 +10,7 @@ class TSocket(TTransportBase): self.port = port self.handle = None - def set_handle(self, h): + def setHandle(self, h): self.handle = h def isOpen(self): @@ -37,7 +37,7 @@ class TSocket(TTransportBase): def read(self, sz): buff = self.handle.recv(sz) if len(buff) == 0: - raise Exception('TSocket read 0 bytes') + raise TTransportException('TSocket read 0 bytes') return buff def write(self, buff): @@ -46,7 +46,7 @@ class TSocket(TTransportBase): while sent < have: plus = self.handle.send(buff) if plus == 0: - raise Exception('sent 0 bytes') + raise TTransportException('sent 0 bytes') sent += plus buff = buff[plus:] @@ -63,13 +63,16 @@ class TServerSocket(TServerTransportBase): def listen(self): self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(self.handle, 'set_timeout'): + self.handle.set_timeout(None) self.handle.bind(('', self.port)) self.handle.listen(128) def accept(self): (client, addr) = self.handle.accept() result = TSocket() - result.set_handle(client) + result.setHandle(client) return result def close(self): diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py index a7eb3b02..7117aa0b 100644 --- a/lib/py/src/transport/TTransport.py +++ b/lib/py/src/transport/TTransport.py @@ -1,3 +1,11 @@ +from cStringIO import StringIO + +class TTransportException(Exception): + + """Custom Transport Exception class""" + + pass + class TTransportBase: """Base class for Thrift transport layer.""" @@ -58,7 +66,7 @@ class TBufferedTransport(TTransportBase): def __init__(self, trans): self.__trans = trans - self.__buf = '' + self.__buf = StringIO() def isOpen(self): return self.__trans.isOpen() @@ -76,8 +84,8 @@ class TBufferedTransport(TTransportBase): return self.__trans.readAll(sz) def write(self, buf): - self.__buf += buf + self.__buf.write(buf) def flush(self): - self.__trans.write(self.__buf) - self.__buf = '' + self.__trans.write(self.__buf.getvalue()) + self.__buf = StringIO() -- 2.17.1