try:
while True:
self.processor.process(input, output)
+ except TTransport.TTransportException, tx:
+ pass
except Exception, x:
print '%s, %s, %s' % (type(x), x, traceback.format_exc())
- print 'Client died.'
input.close()
output.close()
+
+class TThreadedServer(TServer):
+
+ """Threaded server that spawns a new thread per each connection."""
+
+ def __init__(self, processor, serverTransport, transportFactory=None):
+ TServer.__init__(self, processor, serverTransport, transportFactory)
+
+ def serve(self):
+ self.serverTransport.listen()
+ while True:
+ try:
+ client = self.serverTransport.accept()
+ t = threading.Thread(target = self.handle, args=(client,))
+ t.start()
+ except Exception, x:
+ print '%s, %s, %s,' % (type(x), x, traceback.format_exc())
+
+ def handle(self, client):
+ (input, output) = self.transportFactory.getIOTransports(client)
+ try:
+ while True:
+ self.processor.process(input, output)
+ except TTransport.TTransportException, tx:
+ pass
+ except Exception, x:
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
self.port = port
self.handle = None
- def set_handle(self, h):
+ def setHandle(self, h):
self.handle = h
def isOpen(self):
def read(self, sz):
buff = self.handle.recv(sz)
if len(buff) == 0:
- raise Exception('TSocket read 0 bytes')
+ raise TTransportException('TSocket read 0 bytes')
return buff
def write(self, buff):
while sent < have:
plus = self.handle.send(buff)
if plus == 0:
- raise Exception('sent 0 bytes')
+ raise TTransportException('sent 0 bytes')
sent += plus
buff = buff[plus:]
def listen(self):
self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ if hasattr(self.handle, 'set_timeout'):
+ self.handle.set_timeout(None)
self.handle.bind(('', self.port))
self.handle.listen(128)
def accept(self):
(client, addr) = self.handle.accept()
result = TSocket()
- result.set_handle(client)
+ result.setHandle(client)
return result
def close(self):
+from cStringIO import StringIO
+
+class TTransportException(Exception):
+
+ """Custom Transport Exception class"""
+
+ pass
+
class TTransportBase:
"""Base class for Thrift transport layer."""
def __init__(self, trans):
self.__trans = trans
- self.__buf = ''
+ self.__buf = StringIO()
def isOpen(self):
return self.__trans.isOpen()
return self.__trans.readAll(sz)
def write(self, buf):
- self.__buf += buf
+ self.__buf.write(buf)
def flush(self):
- self.__trans.write(self.__buf)
- self.__buf = ''
+ self.__trans.write(self.__buf.getvalue())
+ self.__buf = StringIO()