blob: 21a90c867c79e6bbe2f291e9b6d815ef73463db3 [file] [log] [blame]
Mark Slee89e2bb82007-03-01 00:20:36 +00001#!/usr/bin/env python
2#
3# Copyright (c) 2006- Facebook
4# Distributed under the Thrift Software License
5#
6# See accompanying file LICENSE or visit the Thrift site at:
7# http://developers.facebook.com/thrift/
8
Mark Sleec98d0502006-09-06 02:42:25 +00009import sys
David Reiss66536542008-06-10 22:54:49 +000010import os
Mark Sleec98d0502006-09-06 02:42:25 +000011import traceback
Mark Slee3c4d7fd2006-10-02 17:53:20 +000012import threading
Mark Sleeb90aa7c2006-10-24 18:49:45 +000013import Queue
Mark Sleec98d0502006-09-06 02:42:25 +000014
Mark Sleec9676562006-09-05 17:34:52 +000015from thrift.Thrift import TProcessor
16from thrift.transport import TTransport
Mark Slee4ac459f2006-10-25 21:39:01 +000017from thrift.protocol import TBinaryProtocol
Mark Sleec9676562006-09-05 17:34:52 +000018
19class TServer:
20
Mark Slee794993d2006-09-20 01:56:10 +000021 """Base interface for a server, which must have a serve method."""
Mark Sleec9676562006-09-05 17:34:52 +000022
Aditya Agarwal5c468192007-02-06 01:14:33 +000023 """ 3 constructors for all servers:
24 1) (processor, serverTransport)
25 2) (processor, serverTransport, transportFactory, protocolFactory)
26 3) (processor, serverTransport,
27 inputTransportFactory, outputTransportFactory,
28 inputProtocolFactory, outputProtocolFactory)"""
29 def __init__(self, *args):
Aditya Agarwal5c468192007-02-06 01:14:33 +000030 if (len(args) == 2):
31 self.__initArgs__(args[0], args[1],
32 TTransport.TTransportFactoryBase(),
33 TTransport.TTransportFactoryBase(),
34 TBinaryProtocol.TBinaryProtocolFactory(),
35 TBinaryProtocol.TBinaryProtocolFactory())
36 elif (len(args) == 4):
37 self.__initArgs__(args[0], args[1], args[2], args[2], args[3], args[3])
38 elif (len(args) == 6):
39 self.__initArgs__(args[0], args[1], args[2], args[3], args[4], args[5])
40
41 def __initArgs__(self, processor, serverTransport,
42 inputTransportFactory, outputTransportFactory,
43 inputProtocolFactory, outputProtocolFactory):
Mark Sleed788b2e2006-09-07 01:26:35 +000044 self.processor = processor
45 self.serverTransport = serverTransport
Aditya Agarwal5c468192007-02-06 01:14:33 +000046 self.inputTransportFactory = inputTransportFactory
47 self.outputTransportFactory = outputTransportFactory
48 self.inputProtocolFactory = inputProtocolFactory
49 self.outputProtocolFactory = outputProtocolFactory
Mark Sleec9676562006-09-05 17:34:52 +000050
Mark Slee794993d2006-09-20 01:56:10 +000051 def serve(self):
Mark Sleec9676562006-09-05 17:34:52 +000052 pass
53
54class TSimpleServer(TServer):
55
56 """Simple single-threaded server that just pumps around one transport."""
57
Aditya Agarwal5c468192007-02-06 01:14:33 +000058 def __init__(self, *args):
59 TServer.__init__(self, *args)
Mark Sleec9676562006-09-05 17:34:52 +000060
Mark Slee794993d2006-09-20 01:56:10 +000061 def serve(self):
Mark Sleed788b2e2006-09-07 01:26:35 +000062 self.serverTransport.listen()
Mark Sleec9676562006-09-05 17:34:52 +000063 while True:
Mark Sleed788b2e2006-09-07 01:26:35 +000064 client = self.serverTransport.accept()
Aditya Agarwal5c468192007-02-06 01:14:33 +000065 itrans = self.inputTransportFactory.getTransport(client)
66 otrans = self.outputTransportFactory.getTransport(client)
67 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +000068 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleec9676562006-09-05 17:34:52 +000069 try:
70 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +000071 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +000072 except TTransport.TTransportException, tx:
73 pass
Mark Sleec9676562006-09-05 17:34:52 +000074 except Exception, x:
Mark Sleec98d0502006-09-06 02:42:25 +000075 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleed788b2e2006-09-07 01:26:35 +000076
Mark Slee4ac459f2006-10-25 21:39:01 +000077 itrans.close()
78 otrans.close()
Mark Slee4f0fed62006-10-02 17:50:08 +000079
80class TThreadedServer(TServer):
81
82 """Threaded server that spawns a new thread per each connection."""
83
Aditya Agarwal5c468192007-02-06 01:14:33 +000084 def __init__(self, *args):
85 TServer.__init__(self, *args)
Mark Slee4f0fed62006-10-02 17:50:08 +000086
87 def serve(self):
88 self.serverTransport.listen()
89 while True:
90 try:
91 client = self.serverTransport.accept()
92 t = threading.Thread(target = self.handle, args=(client,))
93 t.start()
Mark Slee5299a952007-10-05 00:13:24 +000094 except KeyboardInterrupt:
95 raise
Mark Slee4f0fed62006-10-02 17:50:08 +000096 except Exception, x:
97 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
98
99 def handle(self, client):
Aditya Agarwal5c468192007-02-06 01:14:33 +0000100 itrans = self.inputTransportFactory.getTransport(client)
101 otrans = self.outputTransportFactory.getTransport(client)
102 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Sleefb84b2b2007-02-20 03:37:28 +0000103 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Slee4f0fed62006-10-02 17:50:08 +0000104 try:
105 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000106 self.processor.process(iprot, oprot)
Mark Slee4f0fed62006-10-02 17:50:08 +0000107 except TTransport.TTransportException, tx:
108 pass
109 except Exception, x:
110 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000111
Mark Slee4ac459f2006-10-25 21:39:01 +0000112 itrans.close()
113 otrans.close()
114
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000115class TThreadPoolServer(TServer):
116
117 """Server with a fixed size pool of threads which service requests."""
118
Aditya Agarwal5c468192007-02-06 01:14:33 +0000119 def __init__(self, *args):
120 TServer.__init__(self, *args)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000121 self.clients = Queue.Queue()
122 self.threads = 10
123
Mark Slee4ce787f2006-10-24 18:54:06 +0000124 def setNumThreads(self, num):
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000125 """Set the number of worker threads that should be created"""
126 self.threads = num
127
128 def serveThread(self):
129 """Loop around getting clients from the shared queue and process them."""
130 while True:
131 try:
Mark Slee9a695ba2006-10-24 18:55:36 +0000132 client = self.clients.get()
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000133 self.serveClient(client)
134 except Exception, x:
135 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
David Reiss0c90f6f2008-02-06 22:18:40 +0000136
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000137 def serveClient(self, client):
138 """Process input/output from a client for as long as possible"""
Aditya Agarwal5c468192007-02-06 01:14:33 +0000139 itrans = self.inputTransportFactory.getTransport(client)
140 otrans = self.outputTransportFactory.getTransport(client)
141 iprot = self.inputProtocolFactory.getProtocol(itrans)
Mark Slee04342d82007-02-20 03:41:35 +0000142 oprot = self.outputProtocolFactory.getProtocol(otrans)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000143 try:
144 while True:
Mark Slee4ac459f2006-10-25 21:39:01 +0000145 self.processor.process(iprot, oprot)
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000146 except TTransport.TTransportException, tx:
147 pass
148 except Exception, x:
149 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
150
Mark Slee4ac459f2006-10-25 21:39:01 +0000151 itrans.close()
152 otrans.close()
153
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000154 def serve(self):
155 """Start a fixed number of worker threads and put client into a queue"""
156 for i in range(self.threads):
157 try:
158 t = threading.Thread(target = self.serveThread)
159 t.start()
160 except Exception, x:
161 print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
David Reiss0c90f6f2008-02-06 22:18:40 +0000162
Mark Sleeb90aa7c2006-10-24 18:49:45 +0000163 # Pump the socket for clients
164 self.serverTransport.listen()
165 while True:
166 try:
167 client = self.serverTransport.accept()
168 self.clients.put(client)
169 except Exception, x:
170 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
David Reiss66536542008-06-10 22:54:49 +0000171
172
David Reiss66536542008-06-10 22:54:49 +0000173class TForkingServer(TServer):
174
175 """A Thrift server that forks a new process for each request"""
176 """
177 This is more scalable than the threaded server as it does not cause
178 GIL contention.
179
180 Note that this has different semantics from the threading server.
181 Specifically, updates to shared variables will no longer be shared.
182 It will also not work on windows.
183
184 This code is heavily inspired by SocketServer.ForkingMixIn in the
185 Python stdlib.
186 """
187
188 def __init__(self, *args):
189 TServer.__init__(self, *args)
190 self.children = []
191
192 def serve(self):
David Reissbcaa2ad2008-06-10 22:55:26 +0000193 def try_close(file):
194 try:
195 file.close()
196 except IOError, e:
197 print '%s, %s, %s' % (type(e), e, traceback.format_exc())
198
199
David Reiss66536542008-06-10 22:54:49 +0000200 self.serverTransport.listen()
201 while True:
202 client = self.serverTransport.accept()
203 try:
204 pid = os.fork()
205
206 if pid: # parent
207 # add before collect, otherwise you race w/ waitpid
208 self.children.append(pid)
209 self.collect_children()
210
David Reissbcaa2ad2008-06-10 22:55:26 +0000211 # Parent must close socket or the connection may not get
212 # closed promptly
213 itrans = self.inputTransportFactory.getTransport(client)
214 otrans = self.outputTransportFactory.getTransport(client)
215 try_close(itrans)
216 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000217 else:
218 itrans = self.inputTransportFactory.getTransport(client)
219 otrans = self.outputTransportFactory.getTransport(client)
220
221 iprot = self.inputProtocolFactory.getProtocol(itrans)
222 oprot = self.outputProtocolFactory.getProtocol(otrans)
223
David Reissbcaa2ad2008-06-10 22:55:26 +0000224 ecode = 0
David Reiss66536542008-06-10 22:54:49 +0000225 try:
226 while True:
227 self.processor.process(iprot, oprot)
228 except TTransport.TTransportException, tx:
229 pass
230 except Exception, e:
David Reissbcaa2ad2008-06-10 22:55:26 +0000231 print '%s, %s, %s' % (type(e), e, traceback.format_exc())
232 ecode = 1
233 finally:
234 try_close(itrans)
235 try_close(otrans)
David Reiss66536542008-06-10 22:54:49 +0000236
David Reissbcaa2ad2008-06-10 22:55:26 +0000237 os._exit(ecode)
David Reiss66536542008-06-10 22:54:49 +0000238
239 except TTransport.TTransportException, tx:
240 pass
241 except Exception, x:
242 print '%s, %s, %s' % (type(x), x, traceback.format_exc())
243
244
David Reiss66536542008-06-10 22:54:49 +0000245 def collect_children(self):
246 while self.children:
247 try:
248 pid, status = os.waitpid(0, os.WNOHANG)
249 except os.error:
250 pid = None
251
252 if pid:
253 self.children.remove(pid)
254 else:
255 break
256
257