From: David Reiss Date: Tue, 10 Jun 2008 22:55:26 +0000 (+0000) Subject: Python forking server should close connection in parent. X-Git-Tag: 0.2.0~781 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=bcaa2ad0d6799066ed680f53051c469e4f58771b;p=common%2Fthrift.git Python forking server should close connection in parent. When a function called by the forking python thrift server throws an exception the client will hang. This happens because the parent of the forked process does not properly close the socket fd. Under normal operation the server operation completes and returns a value to the client. However, when an exception occurs the 'end' message is never sent to the client so the client relies on a connection close to abort the call (this is how the threading server works I believe). This fails with the forking server because the parent process never closes the socket fd. The child has closed the fd at this point, but the connection will not actually be closed until all open instances of the fd are closed. Since the client is waiting for a message and the server never closes it the client is forced to wait until a read timeout occurs many minutes later. This diff closes the parent's copy of the socket fd immediately after the fork occurs. I modified my load test server to throw an exception. The client did not hang. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@666363 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py index a44ab521..21a90c86 100644 --- a/lib/py/src/server/TServer.py +++ b/lib/py/src/server/TServer.py @@ -170,7 +170,6 @@ class TThreadPoolServer(TServer): print '%s, %s, %s' % (type(x), x, traceback.format_exc()) - class TForkingServer(TServer): """A Thrift server that forks a new process for each request""" @@ -191,6 +190,13 @@ class TForkingServer(TServer): self.children = [] def serve(self): + def try_close(file): + try: + file.close() + except IOError, e: + print '%s, %s, %s' % (type(e), e, traceback.format_exc()) + + self.serverTransport.listen() while True: client = self.serverTransport.accept() @@ -202,6 +208,12 @@ class TForkingServer(TServer): self.children.append(pid) self.collect_children() + # Parent must close socket or the connection may not get + # closed promptly + itrans = self.inputTransportFactory.getTransport(client) + otrans = self.outputTransportFactory.getTransport(client) + try_close(itrans) + try_close(otrans) else: itrans = self.inputTransportFactory.getTransport(client) otrans = self.outputTransportFactory.getTransport(client) @@ -209,24 +221,20 @@ class TForkingServer(TServer): iprot = self.inputProtocolFactory.getProtocol(itrans) oprot = self.outputProtocolFactory.getProtocol(otrans) + ecode = 0 try: while True: self.processor.process(iprot, oprot) except TTransport.TTransportException, tx: pass except Exception, e: - print '%s, %s, %s' % (type(x), x, traceback.format_exc()) - os._exit(1) - - def try_close(file): - try: - file.close() - except IOError, e: - print '%s, %s, %s' % (type(x), x, traceback.format_exc()) + print '%s, %s, %s' % (type(e), e, traceback.format_exc()) + ecode = 1 + finally: + try_close(itrans) + try_close(otrans) - try_close(itrans) - try_close(otrans) - os._exit(0) + os._exit(ecode) except TTransport.TTransportException, tx: pass @@ -234,7 +242,6 @@ class TForkingServer(TServer): print '%s, %s, %s' % (type(x), x, traceback.format_exc()) - def collect_children(self): while self.children: try: diff --git a/test/py/RunClientServer.py b/test/py/RunClientServer.py index f05dc5d0..cbff3729 100755 --- a/test/py/RunClientServer.py +++ b/test/py/RunClientServer.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +import time import subprocess import sys import os @@ -8,12 +9,20 @@ import signal def relfile(fname): return os.path.join(os.path.dirname(__file__), fname) -serverproc = subprocess.Popen([sys.executable, relfile("TestServer.py")]) -try: +def runTest(server_class): + print "Testing ", server_class + serverproc = subprocess.Popen([sys.executable, relfile("TestServer.py"), server_class]) + try: - ret = subprocess.call([sys.executable, relfile("TestClient.py")]) - if ret != 0: - raise Exception("subprocess failed") -finally: - # fixme: should check that server didn't die - os.kill(serverproc.pid, signal.SIGKILL) + ret = subprocess.call([sys.executable, relfile("TestClient.py")]) + if ret != 0: + raise Exception("subprocess failed") + finally: + # fixme: should check that server didn't die + os.kill(serverproc.pid, signal.SIGKILL) + + # wait for shutdown + time.sleep(5) + +map(runTest, ["TForkingServer", "TThreadPoolServer", + "TThreadedServer", "TSimpleServer"]) diff --git a/test/py/TestClient.py b/test/py/TestClient.py index d7b65b74..fb0133a5 100755 --- a/test/py/TestClient.py +++ b/test/py/TestClient.py @@ -86,6 +86,12 @@ class AbstractTest(unittest.TestCase): self.assertEqual(x.errorCode, 1001) self.assertEqual(x.message, 'Xception') + try: + self.client.testException("throw_undeclared") + self.fail("should have thrown exception") + except Exception: # type is undefined + pass + def testAsync(self): start = time.time() self.client.testAsync(2) diff --git a/test/py/TestServer.py b/test/py/TestServer.py index 4c6cb8a3..0247bc27 100755 --- a/test/py/TestServer.py +++ b/test/py/TestServer.py @@ -51,6 +51,8 @@ class TestHandler: x.errorCode = 1001 x.message = str raise x + elif str == "throw_undeclared": + raise ValueError("foo") def testAsync(self, seconds): print 'testAsync(%d) => sleeping...' % seconds @@ -62,5 +64,8 @@ processor = ThriftTest.Processor(handler) transport = TSocket.TServerSocket(9090) tfactory = TTransport.TBufferedTransportFactory() pfactory = TBinaryProtocol.TBinaryProtocolFactory() -server = TServer.TThreadedServer(processor, transport, tfactory, pfactory) + +ServerClass = getattr(TServer, sys.argv[1]) + +server = ServerClass(processor, transport, tfactory, pfactory) server.serve()