From: Mark Slee Date: Tue, 24 Oct 2006 18:49:45 +0000 (+0000) Subject: Python threadpool server for Thrift X-Git-Tag: 0.2.0~1640 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=b90aa7c104e01580b64b37521dd2698904744c64;p=common%2Fthrift.git Python threadpool server for Thrift Summary: Fixed number of threads that work from a shared queue Reviewed By: cheever git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664832 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py index eb2ec709..c3a978da 100644 --- a/lib/py/src/server/TServer.py +++ b/lib/py/src/server/TServer.py @@ -1,6 +1,7 @@ import sys import traceback import threading +import Queue from thrift.Thrift import TProcessor from thrift.transport import TTransport @@ -69,3 +70,54 @@ class TThreadedServer(TServer): pass except Exception, x: print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + +class TThreadPoolServer(TServer): + + """Server with a fixed size pool of threads which service requests.""" + + def __init__(self, processor, serverTransport, transportFactory=None): + TServer.__init__(self, processor, serverTransport, transportFactory) + self.clients = Queue.Queue() + self.threads = 10 + + def setNumThreads(num): + """Set the number of worker threads that should be created""" + self.threads = num + + def serveThread(self): + """Loop around getting clients from the shared queue and process them.""" + while True: + try: + client = self.client.get() + self.serveClient(client) + except Exception, x: + print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + + def serveClient(self, client): + """Process input/output from a client for as long as possible""" + (input, output) = self.transportFactory.getIOTransports(client) + try: + while True: + self.processor.process(input, output) + except TTransport.TTransportException, tx: + pass + except Exception, x: + print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + + def serve(self): + """Start a fixed number of worker threads and put client into a queue""" + for i in range(self.threads): + try: + t = threading.Thread(target = self.serveThread) + t.start() + except Exception, x: + print '%s, %s, %s,' % (type(x), x, traceback.format_exc()) + + # Pump the socket for clients + self.serverTransport.listen() + while True: + try: + client = self.serverTransport.accept() + self.clients.put(client) + except Exception, x: + print '%s, %s, %s' % (type(x), x, traceback.format_exc())