From c9676569adfba12a1378eec1c75c6036e7912d9e Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Tue, 5 Sep 2006 17:34:52 +0000 Subject: [PATCH] Thrift Python server code generation Summary: Yep, it's up and running. We now have full client/server support in all of C++ Java PHP and Python. Well, not quite... there's no PHP server, but honestly who wants one? Actually, if we do want one the framework will support writing is as a PHP file that can be served in apache like a web service (i.e. restserver.php would be thriftserver.php). But now that's rambling and nothing to do with this commit. Notes: cheever, let's chat about porting your multithreaded Pillar Python server over to Thrift git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664783 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_java_generator.cc | 7 +- compiler/cpp/src/generate/t_py_generator.cc | 177 +++++++++++++++++- compiler/cpp/src/generate/t_py_generator.h | 3 + lib/py/setup.py | 2 +- lib/py/src/Thrift.py | 6 + lib/py/src/protocol/TBinaryProtocol.py | 4 +- lib/py/src/server/TServer.py | 32 ++++ lib/py/src/server/__init__.py | 1 + lib/py/src/transport/TSocket.py | 43 ++++- lib/py/src/transport/TTransport.py | 44 ++++- test/py/TestClient.py | 35 +++- test/py/TestServer.py | 37 ++++ 12 files changed, 369 insertions(+), 22 deletions(-) create mode 100644 lib/py/src/server/TServer.py create mode 100644 lib/py/src/server/__init__.py create mode 100755 test/py/TestServer.py diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc index 268b9d5f..059e6b05 100644 --- a/compiler/cpp/src/generate/t_java_generator.cc +++ b/compiler/cpp/src/generate/t_java_generator.cc @@ -547,8 +547,7 @@ void t_java_generator::generate_service_client(t_service* tservice) { indent() << "TMessage _msg = _iprot.readMessageBegin(_itrans);" << endl << indent() << resultname << " __result = new " << resultname << "();" << endl << indent() << "__result.read(_iprot, _itrans);" << endl << - indent() << "_iprot.readMessageEnd(_itrans);" << endl << - endl; + indent() << "_iprot.readMessageEnd(_itrans);" << endl; // Careful, only return _result if not a void function if (!(*f_iter)->get_returntype()->is_void()) { @@ -734,8 +733,7 @@ void t_java_generator::generate_process_function(t_service* tservice, // Declare result for non async function if (!tfunction->is_async()) { f_service_ << - indent() << resultname << " __result = new " << resultname << "();" << endl << - endl; + indent() << resultname << " __result = new " << resultname << "();" << endl; } // Try block for a function with exceptions @@ -803,7 +801,6 @@ void t_java_generator::generate_process_function(t_service* tservice, } f_service_ << - endl << indent() << "_oprot.writeMessageBegin(_otrans, new TMessage(\"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid));" << endl << indent() << "__result.write(_oprot, _otrans);" << endl << indent() << "_oprot.writeMessageEnd(_otrans);" << endl << diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc index c53fe420..53e5b3bf 100644 --- a/compiler/cpp/src/generate/t_py_generator.cc +++ b/compiler/cpp/src/generate/t_py_generator.cc @@ -331,13 +331,15 @@ void t_py_generator::generate_service(t_service* tservice) { py_imports() << endl; f_service_ << - "from " << program_name_ << "_types import *" << endl << endl; + "from " << program_name_ << "_types import *" << endl << + "from thrift.Thrift import TProcessor" << endl << + endl; // Generate the three main parts of the service (well, two for now in PHP) generate_service_interface(tservice); generate_service_client(tservice); generate_service_helpers(tservice); - // generate_service_server(tservice); + generate_service_server(tservice); // Close service file f_service_ << endl; @@ -557,6 +559,177 @@ void t_py_generator::generate_service_client(t_service* tservice) { endl; } +/** + * Generates a service server definition. + * + * @param tservice The service to generate a server for. + */ +void t_py_generator::generate_service_server(t_service* tservice) { + // Generate the dispatch methods + vector functions = tservice->get_functions(); + vector::iterator f_iter; + + // Generate the header portion + f_service_ << + "class Server(Iface, TProcessor):" << endl; + indent_up(); + + indent(f_service_) << + "def __init__(self, handler, iprot, oprot=None):" << endl; + indent_up(); + f_service_ << + indent() << "self.__handler = handler" << endl << + indent() << "self.__iprot = iprot" << endl << + indent() << "if oprot == None:" << endl << + indent() << " self.__oprot = iprot" << endl << + indent() << "else:" << endl << + indent() << " self.__oprot = oprot" << endl; + indent_down(); + f_service_ << endl; + + // Generate the server implementation + indent(f_service_) << + "def process(self, itrans, otrans):" << endl; + indent_up(); + + f_service_ << + indent() << "(name, type, seqid) = self.__iprot.readMessageBegin(itrans)" << endl; + + // TODO(mcslee): validate message + + bool first = true; + for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { + if (!first) { + f_service_ << indent() << "el"; + } else { + f_service_ << indent(); + first = false; + } + f_service_ << + "if name == \"" << (*f_iter)->get_name() << "\":" << endl; + indent_up(); + indent(f_service_) << + "self.process_" << (*f_iter)->get_name() << "(seqid, itrans, otrans)" << endl; + indent_down(); + } + f_service_ << + indent() << "else:" << endl << + indent() << " print 'Unknown function %s' % (name)" << endl; + f_service_ << endl; + + // Read end of args field, the T_STOP, and the struct close + f_service_ << + indent() << "return True" << endl; + + indent_down(); + f_service_ << endl; + + // Generate the process subfunctions + for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { + generate_process_function(tservice, *f_iter); + } + + indent_down(); + f_service_ << endl; +} + +/** + * Generates a process function definition. + * + * @param tfunction The function to write a dispatcher for + */ +void t_py_generator::generate_process_function(t_service* tservice, + t_function* tfunction) { + // Open function + indent(f_service_) << + "def process_" << tfunction->get_name() << + "(self, seqid, itrans, otrans):" << endl; + indent_up(); + + string argsname = tfunction->get_name() + "_args"; + string resultname = tfunction->get_name() + "_result"; + + f_service_ << + indent() << "__args = " << argsname << "()" << endl << + indent() << "__args.read(self.__iprot, itrans)" << endl << + indent() << "self.__iprot.readMessageEnd(itrans)" << endl; + + t_struct* xs = tfunction->get_xceptions(); + const std::vector& xceptions = xs->get_members(); + vector::const_iterator x_iter; + + // Declare result for non async function + if (!tfunction->is_async()) { + f_service_ << + indent() << "__result = " << resultname << "()" << endl; + } + + // Try block for a function with exceptions + if (xceptions.size() > 0) { + f_service_ << + indent() << "try:" << endl; + indent_up(); + } + + // Generate the function call + t_struct* arg_struct = tfunction->get_arglist(); + const std::vector& fields = arg_struct->get_members(); + vector::const_iterator f_iter; + + f_service_ << indent(); + if (!tfunction->is_async() && !tfunction->get_returntype()->is_void()) { + f_service_ << "__result.success = "; + } + f_service_ << + "self.__handler." << tfunction->get_name() << "("; + bool first = true; + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + if (first) { + first = false; + } else { + f_service_ << ", "; + } + f_service_ << "__args." << (*f_iter)->get_name(); + } + f_service_ << ")" << endl; + + if (!tfunction->is_async() && xceptions.size() > 0) { + indent_down(); + for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { + f_service_ << + indent() << "except " << (*x_iter)->get_type()->get_name() << ", " << (*x_iter)->get_name() << ":" << endl; + if (!tfunction->is_async()) { + indent_up(); + f_service_ << + indent() << "__result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << endl; + indent_down(); + } else { + f_service_ << + indent() << "pass" << endl; + } + } + } + + // Shortcut out here for async functions + if (tfunction->is_async()) { + f_service_ << + indent() << "return" << endl; + indent_down(); + f_service_ << endl; + return; + } + + f_service_ << + indent() << "self.__oprot.writeMessageBegin(otrans, \"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid)" << endl << + indent() << "__result.write(self.__oprot, otrans)" << endl << + indent() << "self.__oprot.writeMessageEnd(otrans)" << endl << + indent() << "otrans.flush()" << endl; + + // Close function + indent_down(); + f_service_ << endl; +} + /** * Deserializes a field of any type. */ diff --git a/compiler/cpp/src/generate/t_py_generator.h b/compiler/cpp/src/generate/t_py_generator.h index 2703df38..f68a1218 100644 --- a/compiler/cpp/src/generate/t_py_generator.h +++ b/compiler/cpp/src/generate/t_py_generator.h @@ -48,6 +48,9 @@ class t_py_generator : public t_oop_generator { void generate_service_interface (t_service* tservice); void generate_service_client (t_service* tservice); + void generate_service_server (t_service* tservice); + void generate_process_function (t_service* tservice, t_function* tfunction); + /** Serialization constructs */ void generate_deserialize_field (std::ofstream &out, diff --git a/lib/py/setup.py b/lib/py/setup.py index 1d5d0234..ea6ddaaa 100644 --- a/lib/py/setup.py +++ b/lib/py/setup.py @@ -6,7 +6,7 @@ setup(name = 'Thrift', author = ['Mark Slee'], author_email = ['mcslee@facebook.com'], url = 'http://code.facebook.com/thrift', - packages = ['thrift', 'thrift.protocol', 'thrift.transport'], + packages = ['thrift', 'thrift.protocol', 'thrift.transport', 'thrift.server'], package_dir = {'thrift' : 'src'}, ) diff --git a/lib/py/src/Thrift.py b/lib/py/src/Thrift.py index e69de29b..0c4a458d 100644 --- a/lib/py/src/Thrift.py +++ b/lib/py/src/Thrift.py @@ -0,0 +1,6 @@ +class TProcessor: + + """Base class for procsessor, which works on two streams.""" + + def process(itrans, otrans): + pass diff --git a/lib/py/src/protocol/TBinaryProtocol.py b/lib/py/src/protocol/TBinaryProtocol.py index c089ac57..860f4611 100644 --- a/lib/py/src/protocol/TBinaryProtocol.py +++ b/lib/py/src/protocol/TBinaryProtocol.py @@ -70,7 +70,7 @@ class TBinaryProtocol(TProtocolBase): otrans.write(buff) def writeI64(self, otrans, i64): - buff = pack("!l", i64) + buff = pack("!q", i64) otrans.write(buff) def writeString(self, otrans, str): @@ -150,7 +150,7 @@ class TBinaryProtocol(TProtocolBase): def readI64(self, itrans): buff = itrans.readAll(8) - val, = unpack('!l', buff) + val, = unpack('!q', buff) return val def readString(self, itrans): diff --git a/lib/py/src/server/TServer.py b/lib/py/src/server/TServer.py new file mode 100644 index 00000000..69be2606 --- /dev/null +++ b/lib/py/src/server/TServer.py @@ -0,0 +1,32 @@ +from thrift.Thrift import TProcessor +from thrift.transport import TTransport + +class TServer: + + """Base interface for a server, which must have a run method.""" + + def __init__(self, proc): + self.processor = proc + + def run(self): + pass + +class TSimpleServer(TServer): + + """Simple single-threaded server that just pumps around one transport.""" + + def __init__(self, proc, trans): + TServer.__init__(self, proc) + self.transport = trans + + def run(self): + self.transport.listen() + while True: + client = TTransport.TBufferedTransport(self.transport.accept()) + try: + while True: + self.processor.process(client, client) + except Exception, x: + print x + print 'Client died.' + client.close() diff --git a/lib/py/src/server/__init__.py b/lib/py/src/server/__init__.py new file mode 100644 index 00000000..f7e08be1 --- /dev/null +++ b/lib/py/src/server/__init__.py @@ -0,0 +1 @@ +__all__ = ["TServer"] diff --git a/lib/py/src/transport/TSocket.py b/lib/py/src/transport/TSocket.py index 52c6118a..4ef35d9b 100644 --- a/lib/py/src/transport/TSocket.py +++ b/lib/py/src/transport/TSocket.py @@ -5,15 +5,14 @@ class TSocket(TTransportBase): """Socket implementation of TTransport base.""" - handle = None - host = "localhost" - port = 9090 - - def __init__(self, host, port): + def __init__(self, host='localhost', port=9090): self.host = host self.port = port self.handle = None + def set_handle(self, h): + self.handle = h + def isOpen(self): return handle != None @@ -36,10 +35,42 @@ class TSocket(TTransportBase): def read(self, sz): buff = self.handle.recv(sz) + if len(buff) == 0: + raise Exception('TScket read 0 bytes') return buff def write(self, buff): - self.handle.sendall(buff) + sent = 0 + have = len(buff) + while sent < have: + plus = self.handle.send(buff) + if plus == 0: + raise Exception('sent 0 bytes') + sent += plus + buff = buff[plus:] def flush(self): pass + +class TServerSocket(TServerTransportBase): + + """Socket implementation of TServerTransport base.""" + + def __init__(self, port): + self.port = port + self.handle = None + + def listen(self): + self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.handle.bind(('', self.port)) + self.handle.listen(128) + + def accept(self): + (client, addr) = self.handle.accept() + result = TSocket() + result.set_handle(client) + return result + + def close(self): + self.handle.close() + self.handle = None diff --git a/lib/py/src/transport/TTransport.py b/lib/py/src/transport/TTransport.py index e9e36d72..1e8b6c60 100644 --- a/lib/py/src/transport/TTransport.py +++ b/lib/py/src/transport/TTransport.py @@ -20,6 +20,48 @@ class TTransportBase: def write(self, buf): pass - def flush(): + def flush(self): pass +class TServerTransportBase: + + """Base class for Thrift server transports.""" + + def listen(self): + pass + + def accept(self): + pass + + def close(self): + pass + +class TBufferedTransport(TTransportBase): + + """Class that wraps another transport and buffers its I/O.""" + + def __init__(self, trans): + self.__trans = trans + self.__buf = '' + + def isOpen(self): + return self.__trans.isOpen() + + def open(self): + return self.__trans.open() + + def close(self): + return self.__trans.close() + + def read(self, sz): + return self.__trans.read(sz) + + def readAll(self, sz): + return self.__trans.readAll(sz) + + def write(self, buf): + self.__buf += buf + + def flush(self): + self.__trans.write(self.__buf) + self.__buf = '' diff --git a/test/py/TestClient.py b/test/py/TestClient.py index a64b8d70..21d19908 100755 --- a/test/py/TestClient.py +++ b/test/py/TestClient.py @@ -5,31 +5,56 @@ sys.path.append('./gen-py') import ThriftTest from ThriftTest_types import * +from thrift.transport import TTransport from thrift.transport import TSocket from thrift.protocol import TBinaryProtocol -transport = TSocket.TSocket('localhost', 9090) +import timing + +transport = TTransport.TBufferedTransport(TSocket.TSocket('localhost', 9090)) protocol = TBinaryProtocol.TBinaryProtocol() client = ThriftTest.Client(transport, protocol) transport.open() +# Start debug timing +timing.start() + print "testVoid()" print client.testVoid() -print "testString('PythonTest')" -print client.testString('PythonTest') +print "testString('Python')" +print client.testString('Python') print "testByte(63)" print client.testByte(63) +print "testI32(-1)" +print client.testI32(-1) + +print "testI64(-34359738368)" +print client.testI64(-34359738368) + +print "testStruct({Zero, 1, -3, -5})" +x = Xtruct() +x.string_thing = "Zero" +x.byte_thing = 1 +x.i32_thing = -3 +x.i64_thing = -5 +x = client.testStruct(x) +print "{%s, %d, %d, %d}" % (x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing) + print "testException('Safe')" print client.testException('Safe') -print "textException('Xception')" try: + print "textException('Xception')" print client.testException('Xception') + except Xception, x: - print 'Xception (%d, %s)' % (x.errorCode, x.message) + print "Xception (%d, %s)" % (x.errorCode, x.message) + +timing.finish() +print "Total time: %d microsecs" % timing.micro() transport.close() diff --git a/test/py/TestServer.py b/test/py/TestServer.py new file mode 100755 index 00000000..4b571c7a --- /dev/null +++ b/test/py/TestServer.py @@ -0,0 +1,37 @@ +#!/usr/bin/python + +import sys +sys.path.append('./gen-py') + +import ThriftTest +from ThriftTest_types import * +from thrift.transport import TSocket +from thrift.protocol import TBinaryProtocol +from thrift.server import TServer + +class TestHandler: + + def testVoid(self): + print 'testVoid()' + + def testString(self, str): + print 'testString(%s)' % str + return str + + def testByte(self, byte): + print 'testByte(%d)' % byte + return byte + + def testException(self, str): + print 'testException(%s)' % str + x = Xception() + x.errorCode = 1001 + x.message = str + raise x + +transport = TSocket.TServerSocket(9090) +protocol = TBinaryProtocol.TBinaryProtocol() +handler = TestHandler() +iface = ThriftTest.Server(handler, protocol) +server = TServer.TSimpleServer(iface, transport) +server.run() -- 2.17.1