From 665365490f3e14cf3e40205f16d9dc9b5a833f1e Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 10 Jun 2008 22:54:49 +0000 Subject: [PATCH] Forking Python server. The python threading model does not provide concurrency for CPU-bound processes. Process forking is the current recommended way of writing scalable Python servers. Harry Wang ran the [elided] backend with this change for 3 days and observed no errors. The threaded backend caused unexplained lockups under this load after 24 hours. I also ran a CPU-bound load test against this server with 32 concurrent clients. It completed 5X faster than the threaded implementation. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666359 13f79535-47bb-0310-9956-ffa450edef68 --- lib/py/src/server/TServer.py | 81 ++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py index b6738f76..a44ab521 100644 --- a/lib/py/src/server/TServer.py +++ b/lib/py/src/server/TServer.py @@ -7,6 +7,7 @@ # http://developers.facebook.com/thrift/ import sys +import os import traceback import threading import Queue @@ -167,3 +168,83 @@ class TThreadPoolServer(TServer): self.clients.put(client) except Exception, x: print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + + + +class TForkingServer(TServer): + + """A Thrift server that forks a new process for each request""" + """ + This is more scalable than the threaded server as it does not cause + GIL contention. + + Note that this has different semantics from the threading server. + Specifically, updates to shared variables will no longer be shared. + It will also not work on windows. + + This code is heavily inspired by SocketServer.ForkingMixIn in the + Python stdlib. + """ + + def __init__(self, *args): + TServer.__init__(self, *args) + self.children = [] + + def serve(self): + self.serverTransport.listen() + while True: + client = self.serverTransport.accept() + try: + pid = os.fork() + + if pid: # parent + # add before collect, otherwise you race w/ waitpid + self.children.append(pid) + self.collect_children() + + else: + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + + iprot = self.inputProtocolFactory.getProtocol(itrans) + oprot = self.outputProtocolFactory.getProtocol(otrans) + + try: + while True: + self.processor.process(iprot, oprot) + except TTransport.TTransportException, tx: + pass + except Exception, e: + print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + os._exit(1) + + def try_close(file): + try: + file.close() + except IOError, e: + print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + + try_close(itrans) + try_close(otrans) + os._exit(0) + + except TTransport.TTransportException, tx: + pass + except Exception, x: + print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + + + + def collect_children(self): + while self.children: + try: + pid, status = os.waitpid(0, os.WNOHANG) + except os.error: + pid = None + + if pid: + self.children.remove(pid) + else: + break + + -- 2.17.1