From: Chris Piro Date: Thu, 7 Mar 2013 16:32:48 +0000 (-0500) Subject: THRIFT-1704: Tornado support (Python) X-Git-Tag: 0.9.1~185 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=20c81ad74c53c102692adec0e3c68d413899cabd;p=common%2Fthrift.git THRIFT-1704: Tornado support (Python) --- diff --git a/.gitignore b/.gitignore index eb2ae4d4..000a95cc 100644 --- a/.gitignore +++ b/.gitignore @@ -241,6 +241,9 @@ gen-* /test/py.twisted/Makefile.in /test/py.twisted/_trial_temp/ /test/py.twisted/test_suite.pyc +/test/py.tornado/Makefile +/test/py.tornado/Makefile.in +/test/py.tornado/*.pyc /test/rb/Makefile /test/rb/Makefile.in /tutorial/Makefile @@ -257,6 +260,8 @@ gen-* /tutorial/js/build/ /tutorial/py.twisted/Makefile /tutorial/py.twisted/Makefile.in +/tutorial/py.tornado/Makefile +/tutorial/py.tornado/Makefile.in /tutorial/py/Makefile /tutorial/py/Makefile.in /ylwrap diff --git a/compiler/cpp/src/generate/t_py_generator.cc b/compiler/cpp/src/generate/t_py_generator.cc index 60856703..fd954c51 100644 --- a/compiler/cpp/src/generate/t_py_generator.cc +++ b/compiler/cpp/src/generate/t_py_generator.cc @@ -91,13 +91,22 @@ class t_py_generator : public t_generator { iter = parsed_options.find("twisted"); gen_twisted_ = (iter != parsed_options.end()); + iter = parsed_options.find("tornado"); + gen_tornado_ = (iter != parsed_options.end()); + + if (gen_twisted_ && gen_tornado_) { + throw "at most one of 'twisted' and 'tornado' are allowed"; + } + iter = parsed_options.find("utf8strings"); gen_utf8strings_ = (iter != parsed_options.end()); copy_options_ = option_string; - if (gen_twisted_){ + if (gen_twisted_) { out_dir_base_ = "gen-py.twisted"; + } else if (gen_tornado_) { + out_dir_base_ = "gen-py.tornado"; } else { out_dir_base_ = "gen-py"; } @@ -213,6 +222,17 @@ class t_py_generator : public t_generator { void generate_python_docstring (std::ofstream& out, t_doc* tdoc); + /** + * a type for specifying to function_signature what type of Tornado callback + * parameter to add + */ + + enum tornado_callback_t { + NONE = 0, + MANDATORY_FOR_ONEWAY_ELSE_NONE = 1, + OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY = 2, + }; + /** * Helper rendering functions */ @@ -224,9 +244,12 @@ class t_py_generator : public t_generator { std::string declare_argument(t_field* tfield); std::string render_field_default_value(t_field* tfield); std::string type_name(t_type* ttype); - std::string function_signature(t_function* tfunction, std::string prefix=""); - std::string function_signature_if(t_function* tfunction, std::string prefix=""); - std::string argument_list(t_struct* tstruct); + std::string function_signature(t_function* tfunction, + bool interface=false, + tornado_callback_t callback=NONE); + std::string argument_list(t_struct* tstruct, + std::vector *pre=NULL, + std::vector *post=NULL); std::string type_to_enum(t_type* ttype); std::string type_to_spec_args(t_type* ttype); @@ -276,6 +299,11 @@ class t_py_generator : public t_generator { */ bool gen_twisted_; + /** + * True if we should generate code for use with Tornado + */ + bool gen_tornado_; + /** * True if strings should be encoded using utf-8. */ @@ -1027,6 +1055,9 @@ void t_py_generator::generate_service(t_service* tservice) { "from zope.interface import Interface, implements" << endl << "from twisted.internet import defer" << endl << "from thrift.transport import TTwisted" << endl; + } else if (gen_tornado_) { + f_service_ << "from tornado import gen" << endl; + f_service_ << "from tornado import stack_context" << endl; } f_service_ << endl; @@ -1098,7 +1129,7 @@ void t_py_generator::generate_service_interface(t_service* tservice) { } else { if (gen_twisted_) { extends_if = "(Interface)"; - } else if (gen_newstyle_ || gen_dynamic_) { + } else if (gen_newstyle_ || gen_dynamic_ || gen_tornado_) { extends_if = "(object)"; } } @@ -1115,7 +1146,7 @@ void t_py_generator::generate_service_interface(t_service* tservice) { vector::iterator f_iter; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { f_service_ << - indent() << "def " << function_signature_if(*f_iter) << ":" << endl; + indent() << "def " << function_signature(*f_iter, true, OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) << ":" << endl; indent_up(); generate_python_docstring(f_service_, (*f_iter)); f_service_ << @@ -1165,6 +1196,9 @@ void t_py_generator::generate_service_client(t_service* tservice) { if (gen_twisted_) { f_service_ << indent() << "def __init__(self, transport, oprot_factory):" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << "def __init__(self, transport, iprot_factory, oprot_factory=None):" << endl; } else { f_service_ << indent() << "def __init__(self, iprot, oprot=None):" << endl; @@ -1177,6 +1211,15 @@ void t_py_generator::generate_service_client(t_service* tservice) { indent() << " self._seqid = 0" << endl << indent() << " self._reqs = {}" << endl << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << " self._transport = transport" << endl << + indent() << " self._iprot_factory = iprot_factory" << endl << + indent() << " self._oprot_factory = (oprot_factory if oprot_factory is not None" << endl << + indent() << " else iprot_factory)" << endl << + indent() << " self._seqid = 0" << endl << + indent() << " self._reqs = {}" << endl << + endl; } else { f_service_ << indent() << " self._iprot = self._oprot = iprot" << endl << @@ -1190,6 +1233,10 @@ void t_py_generator::generate_service_client(t_service* tservice) { f_service_ << indent() << " " << extends << ".Client.__init__(self, transport, oprot_factory)" << endl << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << " " << extends << ".Client.__init__(self, transport, iprot_factory, oprot_factory)" << endl << + endl; } else { f_service_ << indent() << " " << extends << ".Client.__init__(self, iprot, oprot)" << endl << @@ -1197,6 +1244,24 @@ void t_py_generator::generate_service_client(t_service* tservice) { } } + if (gen_tornado_ && extends.empty()) { + f_service_ << + indent() << "@gen.engine" << endl << + indent() << "def recv_dispatch(self):" << endl << + indent() << " \"\"\"read a response from the wire. schedule exactly one per send that" << endl << + indent() << " expects a response, but it doesn't matter which callee gets which" << endl << + indent() << " response; they're dispatched here properly\"\"\"" << endl << + endl << + indent() << " # wait for a frame header" << endl << + indent() << " frame = yield gen.Task(self._transport.readFrame)" << endl << + indent() << " tr = TTransport.TMemoryBuffer(frame)" << endl << + indent() << " iprot = self._iprot_factory.getProtocol(tr)" << endl << + indent() << " (fname, mtype, rseqid) = iprot.readMessageBegin()" << endl << + indent() << " method = getattr(self, 'recv_' + fname)" << endl << + indent() << " method(iprot, mtype, rseqid)" << endl << + endl; + } + // Generate client method implementations vector functions = tservice->get_functions(); vector::const_iterator f_iter; @@ -1208,7 +1273,7 @@ void t_py_generator::generate_service_client(t_service* tservice) { // Open function indent(f_service_) << - "def " << function_signature(*f_iter) << ":" << endl; + "def " << function_signature(*f_iter, false, OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) << ":" << endl; indent_up(); generate_python_docstring(f_service_, (*f_iter)); if (gen_twisted_) { @@ -1217,6 +1282,12 @@ void t_py_generator::generate_service_client(t_service* tservice) { indent(f_service_) << "d = self._reqs[self._seqid] = defer.Deferred()" << endl; } + } else if (gen_tornado_) { + indent(f_service_) << "self._seqid += 1" << endl; + if (!(*f_iter)->is_oneway()) { + indent(f_service_) << + "self._reqs[self._seqid] = callback" << endl; + } } indent(f_service_) << @@ -1231,12 +1302,24 @@ void t_py_generator::generate_service_client(t_service* tservice) { } f_service_ << (*fld_iter)->get_name(); } + + if (gen_tornado_ && (*f_iter)->is_oneway()) { + if (first) { + first = false; + } else { + f_service_ << ", "; + } + f_service_ << "callback"; + } + f_service_ << ")" << endl; if (!(*f_iter)->is_oneway()) { f_service_ << indent(); if (gen_twisted_) { f_service_ << "return d" << endl; + } else if (gen_tornado_) { + f_service_ << "self.recv_dispatch()" << endl; } else { if (!(*f_iter)->get_returntype()->is_void()) { f_service_ << "return "; @@ -1254,14 +1337,14 @@ void t_py_generator::generate_service_client(t_service* tservice) { f_service_ << endl; indent(f_service_) << - "def send_" << function_signature(*f_iter) << ":" << endl; + "def send_" << function_signature(*f_iter, false, MANDATORY_FOR_ONEWAY_ELSE_NONE) << ":" << endl; indent_up(); std::string argsname = (*f_iter)->get_name() + "_args"; // Serialize the request header - if (gen_twisted_) { + if (gen_twisted_ || gen_tornado_) { f_service_ << indent() << "oprot = self._oprot_factory.getProtocol(self._transport)" << endl << indent() << @@ -1286,6 +1369,19 @@ void t_py_generator::generate_service_client(t_service* tservice) { indent() << "args.write(oprot)" << endl << indent() << "oprot.writeMessageEnd()" << endl << indent() << "oprot.trans.flush()" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << "args.write(oprot)" << endl << + indent() << "oprot.writeMessageEnd()" << endl; + if ((*f_iter)->is_oneway()) { + // send_* carry the callback so you can block on the write's flush + // (rather than on receipt of the response) + f_service_ << + indent() << "oprot.trans.flush(callback=callback)" << endl; + } else { + f_service_ << + indent() << "oprot.trans.flush()" << endl; + } } else { f_service_ << indent() << "args.write(self._oprot)" << endl << @@ -1300,7 +1396,7 @@ void t_py_generator::generate_service_client(t_service* tservice) { // Open function f_service_ << endl; - if (gen_twisted_) { + if (gen_twisted_ || gen_tornado_) { f_service_ << indent() << "def recv_" << (*f_iter)->get_name() << "(self, iprot, mtype, rseqid):" << endl; @@ -1319,6 +1415,9 @@ void t_py_generator::generate_service_client(t_service* tservice) { if (gen_twisted_) { f_service_ << indent() << "d = self._reqs.pop(rseqid)" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << "callback = self._reqs.pop(rseqid)" << endl; } else { f_service_ << indent() << "(fname, mtype, rseqid) = self._iprot.readMessageBegin()" << endl; @@ -1336,6 +1435,15 @@ void t_py_generator::generate_service_client(t_service* tservice) { indent() << "result = " << resultname << "()" << endl << indent() << "result.read(iprot)" << endl << indent() << "iprot.readMessageEnd()" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << " x.read(iprot)" << endl << + indent() << " iprot.readMessageEnd()" << endl << + indent() << " callback(x)" << endl << + indent() << " return" << endl << + indent() << "result = " << resultname << "()" << endl << + indent() << "result.read(iprot)" << endl << + indent() << "iprot.readMessageEnd()" << endl; } else { f_service_ << indent() << " x.read(self._iprot)" << endl << @@ -1353,6 +1461,10 @@ void t_py_generator::generate_service_client(t_service* tservice) { if (gen_twisted_) { f_service_ << indent() << " return d.callback(result.success)" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << " callback(result.success)" << endl << + indent() << " return" << endl; } else { f_service_ << indent() << " return result.success" << endl; @@ -1369,6 +1481,10 @@ void t_py_generator::generate_service_client(t_service* tservice) { f_service_ << indent() << " return d.errback(result." << (*x_iter)->get_name() << ")" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << " callback(result." << (*x_iter)->get_name() << ")" << endl << + indent() << " return" << endl; } else { f_service_ << indent() << " raise result." << (*x_iter)->get_name() << "" << endl; @@ -1378,16 +1494,24 @@ void t_py_generator::generate_service_client(t_service* tservice) { // Careful, only return _result if not a void function if ((*f_iter)->get_returntype()->is_void()) { if (gen_twisted_) { - indent(f_service_) << - "return d.callback(None)" << endl; + f_service_ << + indent() << "return d.callback(None)" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << "callback(None)" << endl << + indent() << "return" << endl; } else { - indent(f_service_) << - "return" << endl; + f_service_ << + indent() << "return" << endl; } } else { if (gen_twisted_) { f_service_ << indent() << "return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\"))" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << "callback(TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\"))" << endl << + indent() << "return" << endl; } else { f_service_ << indent() << "raise TApplicationException(TApplicationException.MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl; @@ -1645,9 +1769,22 @@ void t_py_generator::generate_service_server(t_service* tservice) { f_service_ << endl; // Generate the server implementation - indent(f_service_) << - "def process(self, iprot, oprot):" << endl; - indent_up(); + if (gen_tornado_) { + f_service_ << + indent() << "@gen.engine" << endl << + indent() << "def process(self, transport, iprot_factory, oprot, callback):" << endl; + indent_up(); + f_service_ << + indent() << "# wait for a frame header" << endl << + indent() << "frame = yield gen.Task(transport.readFrame)" << endl << + indent() << "tr = TTransport.TMemoryBuffer(frame)" << endl << + indent() << "iprot = iprot_factory.getProtocol(tr)" << endl << + endl; + } else { + f_service_ << + indent() << "def process(self, iprot, oprot):" << endl; + indent_up(); + } f_service_ << indent() << "(name, type, seqid) = iprot.readMessageBegin()" << endl; @@ -1668,6 +1805,8 @@ void t_py_generator::generate_service_server(t_service* tservice) { if (gen_twisted_) { f_service_ << indent() << " return defer.succeed(None)" << endl; + } else if (gen_tornado_) { + // nothing } else { f_service_ << indent() << " return" << endl; @@ -1679,6 +1818,10 @@ void t_py_generator::generate_service_server(t_service* tservice) { if (gen_twisted_) { f_service_ << indent() << " return self._processMap[name](self, seqid, iprot, oprot)" << endl; + } else if (gen_tornado_) { + f_service_ << + indent() << " yield gen.Task(self._processMap[name], self, seqid, iprot, oprot)" << endl << + indent() << "callback()" << endl; } else { f_service_ << indent() << " self._processMap[name](self, seqid, iprot, oprot)" << endl; @@ -1709,9 +1852,17 @@ void t_py_generator::generate_process_function(t_service* tservice, t_function* tfunction) { (void) tservice; // Open function - indent(f_service_) << - "def process_" << tfunction->get_name() << - "(self, seqid, iprot, oprot):" << endl; + if (gen_tornado_) { + f_service_ << + indent() << "@gen.engine" << endl << + indent() << "def process_" << tfunction->get_name() << + "(self, seqid, iprot, oprot, callback):" << endl; + } else { + f_service_ << + indent() << "def process_" << tfunction->get_name() << + "(self, seqid, iprot, oprot):" << endl; + } + indent_up(); string argsname = tfunction->get_name() + "_args"; @@ -1827,8 +1978,81 @@ void t_py_generator::generate_process_function(t_service* tservice, indent_down(); f_service_ << endl; } - } else { + } else if (gen_tornado_) { + if (!tfunction->is_oneway() && xceptions.size() > 0) { + f_service_ << + endl << + indent() << "def handle_exception(xtype, value, traceback):" << endl; + + for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { + f_service_ << + indent() << " if xtype == " << type_name((*x_iter)->get_type()) << ":" << endl; + if (!tfunction->is_oneway()) { + f_service_ << + indent() << " result." << (*x_iter)->get_name() << " = value" << endl; + } + f_service_ << + indent() << " return True" << endl; + } + + f_service_ << + endl << + indent() << "with stack_context.ExceptionStackContext(handle_exception):" << endl; + indent_up(); + } + + // Generate the function call + t_struct* arg_struct = tfunction->get_arglist(); + const std::vector& fields = arg_struct->get_members(); + vector::const_iterator f_iter; + + f_service_ << indent(); + if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) { + f_service_ << "result.success = "; + } + f_service_ << + "yield gen.Task(self._handler." << tfunction->get_name() << ", "; + bool first = true; + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + if (first) { + first = false; + } else { + f_service_ << ", "; + } + f_service_ << "args." << (*f_iter)->get_name(); + } + f_service_ << ")" << endl; + + if (xceptions.size() > 0) { + f_service_ << endl; + } + + if (!tfunction->is_oneway() && xceptions.size() > 0) { + indent_down(); + } + + // Shortcut out here for oneway functions + if (tfunction->is_oneway()) { + f_service_ << + indent() << "callback()" << endl; + indent_down(); + f_service_ << endl; + return; + } + + f_service_ << + indent() << "oprot.writeMessageBegin(\"" << tfunction->get_name() << "\", TMessageType.REPLY, seqid)" << endl << + indent() << "result.write(oprot)" << endl << + indent() << "oprot.writeMessageEnd()" << endl << + indent() << "oprot.trans.flush()" << endl << + indent() << "callback()" << endl; + + // Close function + indent_down(); + f_service_ << endl; + + } else { // py // Try block for a function with exceptions if (xceptions.size() > 0) { f_service_ << @@ -2001,7 +2225,7 @@ void t_py_generator::generate_deserialize_container(ofstream &out, if (ttype->is_map()) { out << indent() << prefix << " = {}" << endl << - indent() << "(" << ktype << ", " << vtype << ", " << size << " ) = iprot.readMapBegin() " << endl; + indent() << "(" << ktype << ", " << vtype << ", " << size << " ) = iprot.readMapBegin()" << endl; } else if (ttype->is_set()) { out << indent() << prefix << " = set()" << endl << @@ -2382,46 +2606,56 @@ string t_py_generator::render_field_default_value(t_field* tfield) { * @return String of rendered function definition */ string t_py_generator::function_signature(t_function* tfunction, - string prefix) { - string argument_list_result = argument_list(tfunction->get_arglist()); - if (!argument_list_result.empty()) { - argument_list_result = "self, " + argument_list_result; - } else { - argument_list_result = "self"; - } - - return prefix + tfunction->get_name() + "(" + argument_list_result + ")"; -} - -/** - * Renders an interface function signature of the form 'type name(args)' - * - * @param tfunction Function definition - * @return String of rendered function definition - */ -string t_py_generator::function_signature_if(t_function* tfunction, - string prefix) { - string argument_list_result = argument_list(tfunction->get_arglist()); - if (!gen_twisted_) { - if (!argument_list_result.empty()) { - argument_list_result = "self, " + argument_list_result; - } else { - argument_list_result = "self"; + bool interface, + tornado_callback_t callback) { + vector pre; + vector post; + string signature = tfunction->get_name() + "("; + + if (!(gen_twisted_ && interface)) { + pre.push_back("self"); + } + + if (gen_tornado_) { + if (callback == NONE) { + } else if (callback == MANDATORY_FOR_ONEWAY_ELSE_NONE) { + if (tfunction->is_oneway()) { + // Tornado send_* carry the callback so you can block on the write's flush + // (rather than on receipt of the response) + post.push_back("callback"); + } + } else if (callback == OPTIONAL_FOR_ONEWAY_ELSE_MANDATORY) { + if (tfunction->is_oneway()) { + post.push_back("callback=None"); + } else { + post.push_back("callback"); + } } } - return prefix + tfunction->get_name() + "(" + argument_list_result + ")"; + signature += argument_list(tfunction->get_arglist(), &pre, &post) + ")"; + return signature; } - /** * Renders a field list */ -string t_py_generator::argument_list(t_struct* tstruct) { +string t_py_generator::argument_list(t_struct* tstruct, vector *pre, vector *post) { string result = ""; const vector& fields = tstruct->get_members(); vector::const_iterator f_iter; + vector::const_iterator s_iter; bool first = true; + if (pre) { + for (s_iter = pre->begin(); s_iter != pre->end(); ++s_iter) { + if (first) { + first = false; + } else { + result += ", "; + } + result += *s_iter; + } + } for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { if (first) { first = false; @@ -2430,6 +2664,16 @@ string t_py_generator::argument_list(t_struct* tstruct) { } result += (*f_iter)->get_name(); } + if (post) { + for (s_iter = post->begin(); s_iter != post->end(); ++s_iter) { + if (first) { + first = false; + } else { + result += ", "; + } + result += *s_iter; + } + } return result; } @@ -2523,6 +2767,7 @@ string t_py_generator::type_to_spec_args(t_type* ttype) { THRIFT_REGISTER_GENERATOR(py, "Python", " new_style: Generate new-style classes.\n" \ " twisted: Generate Twisted-friendly RPC services.\n" \ +" tornado: Generate code for use with Tornado.\n" \ " utf8strings: Encode/decode strings using utf8 in the generated code.\n" \ " slots: Generate code using slots for instance members.\n" \ " dynamic: Generate dynamic code, less code generated but slower.\n" \ diff --git a/configure.ac b/configure.ac index 123c52b1..54979efe 100755 --- a/configure.ac +++ b/configure.ac @@ -125,7 +125,7 @@ if test "$with_cpp" = "yes"; then AX_LIB_ZLIB([1.2.3]) have_zlib=$success - + AX_THRIFT_LIB(qt4, [Qt], yes) have_qt=no if test "$with_qt4" = "yes"; then @@ -610,6 +610,7 @@ AC_CONFIG_FILES([ test/perl/Makefile test/py/Makefile test/py.twisted/Makefile + test/py.tornado/Makefile test/rb/Makefile tutorial/Makefile tutorial/cpp/Makefile @@ -617,6 +618,7 @@ AC_CONFIG_FILES([ tutorial/js/Makefile tutorial/py/Makefile tutorial/py.twisted/Makefile + tutorial/py.tornado/Makefile ]) AC_OUTPUT diff --git a/lib/py/src/TTornado.py b/lib/py/src/TTornado.py new file mode 100644 index 00000000..af309c3d --- /dev/null +++ b/lib/py/src/TTornado.py @@ -0,0 +1,153 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from cStringIO import StringIO +import logging +import socket +import struct + +from thrift.transport import TTransport +from thrift.transport.TTransport import TTransportException + +from tornado import gen +from tornado import iostream +from tornado import netutil + + +class TTornadoStreamTransport(TTransport.TTransportBase): + """a framed, buffered transport over a Tornado stream""" + def __init__(self, host, port, stream=None): + self.host = host + self.port = port + self.is_queuing_reads = False + self.read_queue = [] + self.__wbuf = StringIO() + + # servers provide a ready-to-go stream + self.stream = stream + if self.stream is not None: + self._set_close_callback() + + # not the same number of parameters as TTransportBase.open + def open(self, callback): + logging.debug('socket connecting') + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + self.stream = iostream.IOStream(sock) + + def on_close_in_connect(*_): + message = 'could not connect to {}:{}'.format(self.host, self.port) + raise TTransportException( + type=TTransportException.NOT_OPEN, + message=message) + self.stream.set_close_callback(on_close_in_connect) + + def finish(*_): + self._set_close_callback() + callback() + + self.stream.connect((self.host, self.port), callback=finish) + + def _set_close_callback(self): + def on_close(): + raise TTransportException( + type=TTransportException.END_OF_FILE, + message='socket closed') + self.stream.set_close_callback(self.close) + + def close(self): + # don't raise if we intend to close + self.stream.set_close_callback(None) + self.stream.close() + + def read(self, _): + # The generated code for Tornado shouldn't do individual reads -- only + # frames at a time + assert "you're doing it wrong" is True + + @gen.engine + def readFrame(self, callback): + self.read_queue.append(callback) + logging.debug('read queue: %s', self.read_queue) + + if self.is_queuing_reads: + # If a read is already in flight, then the while loop below should + # pull it from self.read_queue + return + + self.is_queuing_reads = True + while self.read_queue: + next_callback = self.read_queue.pop() + result = yield gen.Task(self._readFrameFromStream) + next_callback(result) + self.is_queuing_reads = False + + @gen.engine + def _readFrameFromStream(self, callback): + logging.debug('_readFrameFromStream') + frame_header = yield gen.Task(self.stream.read_bytes, 4) + frame_length, = struct.unpack('!i', frame_header) + logging.debug('received frame header, frame length = %i', frame_length) + frame = yield gen.Task(self.stream.read_bytes, frame_length) + logging.debug('received frame payload') + callback(frame) + + def write(self, buf): + self.__wbuf.write(buf) + + def flush(self, callback=None): + wout = self.__wbuf.getvalue() + wsz = len(wout) + # reset wbuf before write/flush to preserve state on underlying failure + self.__wbuf = StringIO() + # N.B.: Doing this string concatenation is WAY cheaper than making + # two separate calls to the underlying socket object. Socket writes in + # Python turn out to be REALLY expensive, but it seems to do a pretty + # good job of managing string buffer operations without excessive copies + buf = struct.pack("!i", wsz) + wout + + logging.debug('writing frame length = %i', wsz) + self.stream.write(buf, callback) + + +class TTornadoServer(netutil.TCPServer): + def __init__(self, processor, iprot_factory, oprot_factory=None, + *args, **kwargs): + super(TTornadoServer, self).__init__(*args, **kwargs) + + self._processor = processor + self._iprot_factory = iprot_factory + self._oprot_factory = (oprot_factory if oprot_factory is not None + else iprot_factory) + + def handle_stream(self, stream, address): + try: + host, port = address + trans = TTornadoStreamTransport(host=host, port=port, stream=stream) + oprot = self._oprot_factory.getProtocol(trans) + + def next_pass(): + if not trans.stream.closed(): + self._processor.process(trans, self._iprot_factory, oprot, + callback=next_pass) + + next_pass() + + except Exception: + logging.exception('thrift exception in handle_stream') + trans.close() diff --git a/test/Makefile.am b/test/Makefile.am index aaa497fa..7ebe51cb 100755 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -34,6 +34,7 @@ endif if WITH_PYTHON SUBDIRS += py SUBDIRS += py.twisted +SUBDIRS += py.tornado endif if WITH_RUBY @@ -61,6 +62,7 @@ EXTRA_DIST = \ php \ py \ py.twisted \ + py.tornado \ rb \ threads \ AnnotationTest.thrift \ diff --git a/test/py.tornado/Makefile.am b/test/py.tornado/Makefile.am new file mode 100644 index 00000000..a8e680a9 --- /dev/null +++ b/test/py.tornado/Makefile.am @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +THRIFT = $(top_srcdir)/compiler/cpp/thrift + +thrift_gen: ../ThriftTest.thrift ../SmallTest.thrift + $(THRIFT) --gen py:tornado ../ThriftTest.thrift + $(THRIFT) --gen py:tornado ../SmallTest.thrift + +check: thrift_gen + ./test_suite.py + +clean-local: + $(RM) -r gen-py.tornado diff --git a/test/py.tornado/test_suite.py b/test/py.tornado/test_suite.py new file mode 100755 index 00000000..f04ba040 --- /dev/null +++ b/test/py.tornado/test_suite.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import datetime +import glob +import sys +import time +import unittest + +sys.path.insert(0, './gen-py.tornado') +sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0]) + +try: + __import__('tornado') +except ImportError: + print "module `tornado` not found, skipping test" + sys.exit(0) + +from tornado import gen, ioloop, stack_context +from tornado.testing import AsyncTestCase, get_unused_port + +from thrift import TTornado +from thrift.protocol import TBinaryProtocol + +from ThriftTest import ThriftTest +from ThriftTest.ttypes import * + + +class TestHandler(object): + def __init__(self, test_instance): + self.test_instance = test_instance + + def testVoid(self, callback): + callback() + + def testString(self, s, callback): + callback(s) + + def testByte(self, b, callback): + callback(b) + + def testI16(self, i16, callback): + callback(i16) + + def testI32(self, i32, callback): + callback(i32) + + def testI64(self, i64, callback): + callback(i64) + + def testDouble(self, dub, callback): + callback(dub) + + def testStruct(self, thing, callback): + callback(thing) + + def testException(self, s, callback): + if s == 'Xception': + x = Xception() + x.errorCode = 1001 + x.message = s + raise x + elif s == 'throw_undeclared': + raise ValueError("foo") + callback() + + def testOneway(self, seconds, callback=None): + start = time.time() + def fire_oneway(): + end = time.time() + self.test_instance.stop((start, end, seconds)) + + ioloop.IOLoop.instance().add_timeout( + datetime.timedelta(seconds=seconds), + fire_oneway) + + if callback: + callback() + + def testNest(self, thing, callback): + callback(thing) + + def testMap(self, thing, callback): + callback(thing) + + def testSet(self, thing, callback): + callback(thing) + + def testList(self, thing, callback): + callback(thing) + + def testEnum(self, thing, callback): + callback(thing) + + def testTypedef(self, thing, callback): + callback(thing) + + +class ThriftTestCase(AsyncTestCase): + def get_new_ioloop(self): + return ioloop.IOLoop.instance() + + def setUp(self): + self.port = get_unused_port() + self.io_loop = self.get_new_ioloop() + + # server + self.handler = TestHandler(self) + self.processor = ThriftTest.Processor(self.handler) + self.pfactory = TBinaryProtocol.TBinaryProtocolFactory() + + self.server = TTornado.TTornadoServer(self.processor, self.pfactory) + self.server.bind(self.port) + self.server.start(1) + + # client + transport = TTornado.TTornadoStreamTransport('localhost', self.port) + pfactory = TBinaryProtocol.TBinaryProtocolFactory() + self.client = ThriftTest.Client(transport, pfactory) + transport.open(callback=self.stop) + self.wait(timeout=1) + + def test_void(self): + self.client.testVoid(callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, None) + + def test_string(self): + self.client.testString('Python', callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, 'Python') + + def test_byte(self): + self.client.testByte(63, callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, 63) + + def test_i32(self): + self.client.testI32(-1, callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, -1) + + self.client.testI32(0, callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, 0) + + def test_i64(self): + self.client.testI64(-34359738368, callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, -34359738368) + + def test_double(self): + self.client.testDouble(-5.235098235, callback=self.stop) + v = self.wait(timeout=1) + self.assertEquals(v, -5.235098235) + + def test_struct(self): + x = Xtruct() + x.string_thing = "Zero" + x.byte_thing = 1 + x.i32_thing = -3 + x.i64_thing = -5 + self.client.testStruct(x, callback=self.stop) + + y = self.wait(timeout=1) + self.assertEquals(y.string_thing, "Zero") + self.assertEquals(y.byte_thing, 1) + self.assertEquals(y.i32_thing, -3) + self.assertEquals(y.i64_thing, -5) + + def test_exception(self): + self.client.testException('Safe', callback=self.stop) + v = self.wait(timeout=1) + + self.client.testException('Xception', callback=self.stop) + ex = self.wait(timeout=1) + if type(ex) == Xception: + self.assertEquals(ex.errorCode, 1001) + self.assertEquals(ex.message, 'Xception') + else: + self.fail("should have gotten exception") + + def test_oneway(self): + def return_from_send(): + self.stop('done with send') + self.client.testOneway(0.5, callback=return_from_send) + self.assertEquals(self.wait(timeout=1), 'done with send') + + start, end, seconds = self.wait(timeout=1) + self.assertAlmostEquals(seconds, (end - start), places=3) + + +def suite(): + suite = unittest.TestSuite() + loader = unittest.TestLoader() + suite.addTest(loader.loadTestsFromTestCase(ThriftTestCase)) + return suite + + +if __name__ == '__main__': + unittest.TestProgram(defaultTest='suite', + testRunner=unittest.TextTestRunner(verbosity=1)) diff --git a/tutorial/Makefile.am b/tutorial/Makefile.am index 169a2c1e..86c08c06 100755 --- a/tutorial/Makefile.am +++ b/tutorial/Makefile.am @@ -17,7 +17,7 @@ # under the License. # -SUBDIRS = +SUBDIRS = if MINGW # do nothing, just build the compiler @@ -43,6 +43,7 @@ endif if WITH_PYTHON SUBDIRS += py SUBDIRS += py.twisted +SUBDIRS += py.tornado endif if WITH_RUBY diff --git a/tutorial/py.tornado/Makefile.am b/tutorial/py.tornado/Makefile.am new file mode 100755 index 00000000..6ac60234 --- /dev/null +++ b/tutorial/py.tornado/Makefile.am @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +THRIFT = $(top_builddir)/compiler/cpp/thrift + +gen-py.tornado/tutorial/Calculator.py gen-py.tornado/shared/SharedService.py: $(top_srcdir)/tutorial/tutorial.thrift + $(THRIFT) --gen py:tornado -r $< + +all-local: gen-py.tornado/tutorial/Calculator.py + +tutorialserver: all + ${PYTHON} PythonServer.py + +tutorialclient: all + ${PYTHON} PythonClient.py + +clean-local: + $(RM) -r gen-* + +EXTRA_DIST = \ + PythonServer.py \ + PythonClient.py diff --git a/tutorial/py.tornado/PythonClient.py b/tutorial/py.tornado/PythonClient.py new file mode 100755 index 00000000..95d78b86 --- /dev/null +++ b/tutorial/py.tornado/PythonClient.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import glob +sys.path.append('gen-py.tornado') +sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0]) + +import logging + +from tutorial import Calculator +from tutorial.ttypes import Operation, Work, InvalidOperation + +from thrift import TTornado +from thrift.transport import TSocket +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol + +from tornado import gen +from tornado import ioloop + + +@gen.engine +def communicate(callback=None): + # create client + transport = TTornado.TTornadoStreamTransport('localhost', 9090) + pfactory = TBinaryProtocol.TBinaryProtocolFactory() + client = Calculator.Client(transport, pfactory) + + # open the transport, bail on error + try: + yield gen.Task(transport.open) + except TTransport.TTransportException as ex: + logging.error(ex) + if callback: + callback() + return + + # ping + yield gen.Task(client.ping) + print "ping()" + + # add + sum_ = yield gen.Task(client.add, 1, 1) + print "1 + 1 = {}".format(sum_) + + # make a oneway call without a callback (schedule the write and continue + # without blocking) + client.zip() + print "zip() without callback" + + # make a oneway call with a callback (we'll wait for the stream write to + # complete before continuing) + yield gen.Task(client.zip) + print "zip() with callback" + + # calculate 1/0 + work = Work() + work.op = Operation.DIVIDE + work.num1 = 1 + work.num2 = 0 + + try: + quotient = yield gen.Task(client.calculate, 1, work) + print "Whoa? You know how to divide by zero?" + except InvalidOperation as io: + print "InvalidOperation: {}".format(io) + + # calculate 15-10 + work.op = Operation.SUBTRACT + work.num1 = 15 + work.num2 = 10 + + diff = yield gen.Task(client.calculate, 1, work) + print "15 - 10 = {}".format(diff) + + # getStruct + log = yield gen.Task(client.getStruct, 1) + print "Check log: {}".format(log.value) + + # close the transport + client._transport.close() + + if callback: + callback() + + +def main(): + # create an ioloop, do the above, then stop + io_loop = ioloop.IOLoop.instance() + def this_joint(): + communicate(callback=io_loop.stop) + io_loop.add_callback(this_joint) + io_loop.start() + + +if __name__ == "__main__": + main() diff --git a/tutorial/py.tornado/PythonServer.py b/tutorial/py.tornado/PythonServer.py new file mode 100755 index 00000000..52932ff8 --- /dev/null +++ b/tutorial/py.tornado/PythonServer.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import glob +sys.path.append('gen-py.tornado') +sys.path.insert(0, glob.glob('../../lib/py/build/lib.*')[0]) + +from tutorial import Calculator +from tutorial.ttypes import Operation, InvalidOperation + +from shared.ttypes import SharedStruct + +from thrift import TTornado +from thrift.transport import TSocket +from thrift.transport import TTransport +from thrift.protocol import TBinaryProtocol +from thrift.server import TServer + +from tornado import ioloop + + +class CalculatorHandler(object): + def __init__(self): + self.log = {} + + def ping(self, callback): + print "ping()" + callback() + + def add(self, n1, n2, callback): + print "add({}, {})".format(n1, n2) + callback(n1 + n2) + + def calculate(self, logid, work, callback): + print "calculate({}, {})".format(logid, work) + + if work.op == Operation.ADD: + val = work.num1 + work.num2 + elif work.op == Operation.SUBTRACT: + val = work.num1 - work.num2 + elif work.op == Operation.MULTIPLY: + val = work.num1 * work.num2 + elif work.op == Operation.DIVIDE: + if work.num2 == 0: + x = InvalidOperation() + x.what = work.op + x.why = "Cannot divide by 0" + raise x + val = work.num1 / work.num2 + else: + x = InvalidOperation() + x.what = work.op + x.why = "Invalid operation" + raise x + + log = SharedStruct() + log.key = logid + log.value = '%d' % (val) + self.log[logid] = log + callback(val) + + def getStruct(self, key, callback): + print "getStruct({})".format(key) + callback(self.log[key]) + + def zip(self, callback): + print "zip()" + callback() + + +def main(): + handler = CalculatorHandler() + processor = Calculator.Processor(handler) + pfactory = TBinaryProtocol.TBinaryProtocolFactory() + server = TTornado.TTornadoServer(processor, pfactory) + + print "Starting the server..." + server.bind(9090) + server.start(1) + ioloop.IOLoop.instance().start() + print "done." + + +if __name__ == "__main__": + main()