From 5ddabb8e3f63a15874e436c9a650dc17f7dd7028 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:09:37 +0000 Subject: [PATCH] THRIFT-923. cpp: Implement a fully nonblocking server and client There are three major parts of this: 1/ New callback-style interfaces for for a few key Thrift components: TAsyncProcessor for servers and TAsyncChannel for clients. 2/ Concrete implementations of TAsyncChannel and a server for TAsyncProcessor based on evhttp. 3/ Async-style code generation for C++ git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005127 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_cpp_generator.cc | 1136 ++++++++++++----- configure.ac | 8 + contrib/async-test/Makefile | 33 + contrib/async-test/aggr.thrift | 8 + contrib/async-test/test-leaf.py | 23 + contrib/async-test/test-server.cpp | 97 ++ lib/cpp/Makefile.am | 15 +- lib/cpp/src/TProcessor.h | 2 +- lib/cpp/src/Thrift.h | 23 + lib/cpp/src/async/SimpleCallback.h | 98 ++ lib/cpp/src/async/TAsyncBufferProcessor.h | 45 + lib/cpp/src/async/TAsyncChannel.cpp | 34 + lib/cpp/src/async/TAsyncChannel.h | 73 ++ lib/cpp/src/async/TAsyncProcessor.h | 58 + lib/cpp/src/async/TAsyncProtocolProcessor.cpp | 50 + lib/cpp/src/async/TAsyncProtocolProcessor.h | 55 + lib/cpp/src/async/TEvhttpClientChannel.cpp | 124 ++ lib/cpp/src/async/TEvhttpClientChannel.h | 76 ++ lib/cpp/src/async/TEvhttpServer.cpp | 154 +++ lib/cpp/src/async/TEvhttpServer.h | 71 ++ 20 files changed, 1830 insertions(+), 353 deletions(-) create mode 100644 contrib/async-test/Makefile create mode 100644 contrib/async-test/aggr.thrift create mode 100755 contrib/async-test/test-leaf.py create mode 100644 contrib/async-test/test-server.cpp create mode 100644 lib/cpp/src/async/SimpleCallback.h create mode 100644 lib/cpp/src/async/TAsyncBufferProcessor.h create mode 100644 lib/cpp/src/async/TAsyncChannel.cpp create mode 100644 lib/cpp/src/async/TAsyncChannel.h create mode 100644 lib/cpp/src/async/TAsyncProcessor.h create mode 100644 lib/cpp/src/async/TAsyncProtocolProcessor.cpp create mode 100644 lib/cpp/src/async/TAsyncProtocolProcessor.h create mode 100644 lib/cpp/src/async/TEvhttpClientChannel.cpp create mode 100644 lib/cpp/src/async/TEvhttpClientChannel.h create mode 100644 lib/cpp/src/async/TEvhttpServer.cpp create mode 100644 lib/cpp/src/async/TEvhttpServer.h diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index ef0dd366..0ba4540f 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -59,6 +59,9 @@ class t_cpp_generator : public t_oop_generator { iter = parsed_options.find("include_prefix"); use_include_prefix_ = (iter != parsed_options.end()); + iter = parsed_options.find("cob_style"); + gen_cob_style_ = (iter != parsed_options.end()); + out_dir_base_ = "gen-cpp"; } @@ -100,15 +103,16 @@ class t_cpp_generator : public t_oop_generator { * Service-level generation functions */ - void generate_service_interface (t_service* tservice); - void generate_service_null (t_service* tservice); + void generate_service_interface (t_service* tservice, string style); + void generate_service_null (t_service* tservice, string style); void generate_service_multiface (t_service* tservice); void generate_service_helpers (t_service* tservice); - void generate_service_client (t_service* tservice); - void generate_service_processor (t_service* tservice); + void generate_service_client (t_service* tservice, string style); + void generate_service_processor (t_service* tservice, string style); void generate_service_skeleton (t_service* tservice); - void generate_process_function (t_service* tservice, t_function* tfunction); + void generate_process_function (t_service* tservice, t_function* tfunction, string style); void generate_function_helpers (t_service* tservice, t_function* tfunction); + void generate_service_async_skeleton (t_service* tservice); /** * Serialization constructs @@ -166,7 +170,12 @@ class t_cpp_generator : public t_oop_generator { t_list* tlist, std::string iter); - /** + void generate_function_call (ostream& out, + t_function* tfunction, + string target, + string iface, + string arg_prefix); + /* * Helper rendering functions */ @@ -176,8 +185,9 @@ class t_cpp_generator : public t_oop_generator { std::string type_name(t_type* ttype, bool in_typedef=false, bool arg=false); std::string base_type_name(t_base_type::t_base tbase); std::string declare_field(t_field* tfield, bool init=false, bool pointer=false, bool constant=false, bool reference=false); - std::string function_signature(t_function* tfunction, std::string prefix="", bool name_params=true); - std::string argument_list(t_struct* tstruct, bool name_params=true); + std::string function_signature(t_function* tfunction, std::string style, std::string prefix="", bool name_params=true); + std::string cob_function_signature(t_function* tfunction, std::string prefix="", bool name_params=true); + std::string argument_list(t_struct* tstruct, bool name_params=true, bool start_comma=false); std::string type_to_enum(t_type* ttype); std::string local_reflection_name(const char*, t_type* ttype, bool external=false); @@ -222,6 +232,11 @@ class t_cpp_generator : public t_oop_generator { */ bool use_include_prefix_; + /** + * True iff we should generate "Continuation OBject"-style classes as well. + */ + bool gen_cob_style_; + /** * Strings for namespace, computed once up front then used directly */ @@ -1172,7 +1187,7 @@ void t_cpp_generator::generate_struct_writer(ofstream& out, type_to_enum((*f_iter)->get_type()) << ", " << (*f_iter)->get_key() << ");" << endl; // Write field contents - if (pointers) { + if (pointers && !(*f_iter)->get_type()->is_xception()) { generate_serialize_field(out, *f_iter, "(*(this->", "))"); } else { generate_serialize_field(out, *f_iter, "this->"); @@ -1294,8 +1309,23 @@ void t_cpp_generator::generate_service(t_service* tservice) { f_header_ << "#ifndef " << svcname << "_H" << endl << "#define " << svcname << "_H" << endl << - endl << - "#include " << endl << + endl; + if (gen_cob_style_) { + f_header_ << + "#include " << endl << + // TODO(dreiss): Libify the base client so we don't have to include this. + "#include " << endl << + "namespace apache { namespace thrift { namespace async {" << endl << + "class TAsyncChannel;" << endl << + "}}}" << endl; + } + f_header_ << + "#include " << endl; + if (gen_cob_style_) { + f_header_ << + "#include " << endl; + } + f_header_ << "#include \"" << get_include_prefix(*get_program()) << program_name_ << "_types.h\"" << endl; @@ -1317,21 +1347,34 @@ void t_cpp_generator::generate_service(t_service* tservice) { f_service_ << autogen_comment(); f_service_ << - "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << - endl << - endl << + "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl; + if (gen_cob_style_) { + f_service_ << + "#include \"async/TAsyncChannel.h\"" << endl; + } + f_service_ << endl << ns_open_ << endl << endl; // Generate all the components - generate_service_interface(tservice); - generate_service_null(tservice); + generate_service_interface(tservice, ""); + generate_service_null(tservice, ""); generate_service_helpers(tservice); - generate_service_client(tservice); - generate_service_processor(tservice); + generate_service_client(tservice, ""); + generate_service_processor(tservice, ""); generate_service_multiface(tservice); generate_service_skeleton(tservice); + // Generate all the cob components + if (gen_cob_style_) { + generate_service_interface(tservice, "CobCl"); + generate_service_interface(tservice, "CobSv"); + generate_service_null(tservice, "CobSv"); + generate_service_client(tservice, "Cob"); + generate_service_processor(tservice, "Cob"); + generate_service_async_skeleton(tservice); + } + // Close the namespace f_service_ << ns_close_ << endl << @@ -1360,6 +1403,7 @@ void t_cpp_generator::generate_service_helpers(t_service* tservice) { t_struct* ts = (*f_iter)->get_arglist(); string name_orig = ts->get_name(); + // TODO(dreiss): Why is this stuff not in generate_function_helpers? ts->set_name(tservice->get_name() + "_" + (*f_iter)->get_name() + "_args"); generate_struct_definition(f_header_, ts, false); generate_struct_reader(f_service_, ts); @@ -1378,23 +1422,29 @@ void t_cpp_generator::generate_service_helpers(t_service* tservice) { * * @param tservice The service to generate a header definition for */ -void t_cpp_generator::generate_service_interface(t_service* tservice) { +void t_cpp_generator::generate_service_interface(t_service* tservice, string style) { + + if (style == "CobCl") { + // Forward declare the client. + indent(f_header_) << "class " << service_name_ << "CobClient;" << endl << endl; + } + string extends = ""; if (tservice->get_extends() != NULL) { - extends = " : virtual public " + type_name(tservice->get_extends()) + "If"; + extends = " : virtual public " + type_name(tservice->get_extends()) + style + "If"; } f_header_ << - "class " << service_name_ << "If" << extends << " {" << endl << + "class " << service_name_ << style << "If" << extends << " {" << endl << " public:" << endl; indent_up(); f_header_ << - indent() << "virtual ~" << service_name_ << "If() {}" << endl; + indent() << "virtual ~" << service_name_ << style << "If" << "() {}" << endl; vector functions = tservice->get_functions(); vector::iterator f_iter; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { f_header_ << - indent() << "virtual " << function_signature(*f_iter) << " = 0;" << endl; + indent() << "virtual " << function_signature(*f_iter, style) << " = 0;" << endl; } indent_down(); f_header_ << @@ -1406,36 +1456,49 @@ void t_cpp_generator::generate_service_interface(t_service* tservice) { * * @param tservice The service to generate a header definition for */ -void t_cpp_generator::generate_service_null(t_service* tservice) { +void t_cpp_generator::generate_service_null(t_service* tservice, string style) { string extends = ""; if (tservice->get_extends() != NULL) { - extends = " , virtual public " + type_name(tservice->get_extends()) + "Null"; + extends = " , virtual public " + type_name(tservice->get_extends()) + style + "Null"; } f_header_ << - "class " << service_name_ << "Null : virtual public " << service_name_ << "If" << extends << " {" << endl << + "class " << service_name_ << style << "Null : virtual public " << service_name_ << style << "If" << extends << " {" << endl << " public:" << endl; indent_up(); f_header_ << - indent() << "virtual ~" << service_name_ << "Null() {}" << endl; + indent() << "virtual ~" << service_name_ << style << "Null() {}" << endl; vector functions = tservice->get_functions(); vector::iterator f_iter; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { f_header_ << - indent() << function_signature(*f_iter, "", false) << " {" << endl; + indent() << function_signature(*f_iter, style, "", false) << " {" << endl; indent_up(); + t_type* returntype = (*f_iter)->get_returntype(); - if (returntype->is_void()) { - f_header_ << - indent() << "return;" << endl; - } else if (is_complex_type(returntype)) { - f_header_ << - indent() << "return;" << endl; + t_field returnfield(returntype, "_return"); + + if (style == "") { + if (returntype->is_void() || is_complex_type(returntype)) { + f_header_ << indent() << "return;" << endl; + } else { + f_header_ << + indent() << declare_field(&returnfield, true) << endl << + indent() << "return _return;" << endl; + } + } else if (style == "CobSv") { + if (returntype->is_void()) { + f_header_ << indent() << "return cob();" << endl; } else { t_field returnfield(returntype, "_return"); f_header_ << indent() << declare_field(&returnfield, true) << endl << - indent() << "return _return;" << endl; + indent() << "return cob(_return);" << endl; + } + + } else { + throw "UNKNOWN STYLE"; } + indent_down(); f_header_ << indent() << "}" << endl; @@ -1445,6 +1508,109 @@ void t_cpp_generator::generate_service_null(t_service* tservice) { "};" << endl << endl; } +void t_cpp_generator::generate_function_call(ostream& out, t_function* tfunction, string target, string iface, string arg_prefix) { + bool first = true; + t_type* ret_type = get_true_type(tfunction->get_returntype()); + out << indent(); + if (!tfunction->is_oneway() && !ret_type->is_void()) { + if (is_complex_type(ret_type)) { + first = false; + out << iface << "->" << tfunction->get_name() << "(" << target; + } else { + out << target << " = " << iface << "->" << tfunction->get_name() << "("; + } + } else { + out << iface << "->" << tfunction->get_name() << "("; + } + const std::vector& fields = tfunction->get_arglist()->get_members(); + vector::const_iterator f_iter; + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + if (first) { + first = false; + } else { + out << ", "; + } + out << arg_prefix << (*f_iter)->get_name(); + } + out << ");" << endl; +} + +void t_cpp_generator::generate_service_async_skeleton(t_service* tservice) { + string svcname = tservice->get_name(); + + // Service implementation file includes + string f_skeleton_name = get_out_dir()+svcname+"_async_server.skeleton.cpp"; + + string ns = namespace_prefix(tservice->get_program()->get_namespace("cpp")); + + ofstream f_skeleton; + f_skeleton.open(f_skeleton_name.c_str()); + f_skeleton << + "// This autogenerated skeleton file illustrates one way to adapt a synchronous" << endl << + "// interface into an asynchronous interface. You should copy it to another" << endl << + "// filename to avoid overwriting it and rewrite as asynchronous any functions" << endl << + "// that would otherwise introduce unwanted latency." << endl << + endl << + "#include \"" << get_include_prefix(*get_program()) << svcname << ".h\"" << endl << + "#include " << endl << + "#include " << endl << + endl << + "using namespace ::apache::thrift;" << endl << + "using namespace ::apache::thrift::protocol;" << endl << + "using namespace ::apache::thrift::transport;" << endl << + "using namespace ::apache::thrift::async;" << endl << + endl << + "using boost::shared_ptr;" << endl << + endl; + + if (!ns.empty()) { + f_skeleton << + "using namespace " << string(ns, 0, ns.size()-2) << ";" << endl << + endl; + } + + f_skeleton << + "class " << svcname << "AsyncHandler : " << + "public " << svcname << "CobSvIf {" << endl << + " public:" << endl; + indent_up(); + f_skeleton << + indent() << svcname << "AsyncHandler() {" << endl << + indent() << " syncHandler_ = std::auto_ptr<" << svcname << + "Handler>(new " << svcname << "Handler);" << endl << + indent() << " // Your initialization goes here" << endl << + indent() << "}" << endl; + f_skeleton << + indent() << "virtual ~" << service_name_ << "AsyncHandler();" << endl; + + vector functions = tservice->get_functions(); + vector::iterator f_iter; + for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { + f_skeleton << + endl << + indent() << function_signature(*f_iter, "CobSv", "", true) << " {" << endl; + indent_up(); + + t_type* returntype = (*f_iter)->get_returntype(); + t_field returnfield(returntype, "_return"); + + string target = returntype->is_void() ? "" : "_return"; + if (!returntype->is_void()) { + f_skeleton << + indent() << declare_field(&returnfield, true) << endl; + } + generate_function_call(f_skeleton, *f_iter, target, "syncHandler_", ""); + f_skeleton << indent() << "return cob(" << target << ");" << endl; + + scope_down(f_skeleton); + } + f_skeleton << endl << + " protected:" << endl << + indent() << "std::auto_ptr<" << svcname << "Handler> syncHandler_;" << endl; + indent_down(); + f_skeleton << + "};" << endl << endl; +} /** * Generates a multiface, which is a single server that just takes a set @@ -1531,7 +1697,7 @@ void t_cpp_generator::generate_service_multiface(t_service* tservice) { call += ")"; f_header_ << - indent() << function_signature(*f_iter) << " {" << endl; + indent() << function_signature(*f_iter, "") << " {" << endl; indent_up(); f_header_ << indent() << "uint32_t sz = ifaces_.size();" << endl << @@ -1576,75 +1742,115 @@ void t_cpp_generator::generate_service_multiface(t_service* tservice) { * * @param tservice The service to generate a server for. */ -void t_cpp_generator::generate_service_client(t_service* tservice) { +void t_cpp_generator::generate_service_client(t_service* tservice, string style) { + string ifstyle; + if (style == "Cob") { + ifstyle = "CobCl"; + } + string extends = ""; string extends_client = ""; if (tservice->get_extends() != NULL) { extends = type_name(tservice->get_extends()); - extends_client = ", public " + extends + "Client"; + extends_client = ", public " + extends + style + "Client"; } // Generate the header portion f_header_ << - "class " << service_name_ << "Client : " << - "virtual public " << service_name_ << "If" << + "class " << service_name_ << style << "Client : " << + "virtual public " << service_name_ << ifstyle << "If" << extends_client << " {" << endl << " public:" << endl; indent_up(); - f_header_ << - indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl; - if (extends.empty()) { - f_header_ << - indent() << " piprot_(prot)," << endl << - indent() << " poprot_(prot) {" << endl << - indent() << " iprot_ = prot.get();" << endl << - indent() << " oprot_ = prot.get();" << endl << - indent() << "}" << endl; - } else { + if (style != "Cob") { f_header_ << - indent() << " " << extends << "Client(prot, prot) {}" << endl; - } + indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) :" << endl; + if (extends.empty()) { + f_header_ << + indent() << " piprot_(prot)," << endl << + indent() << " poprot_(prot) {" << endl << + indent() << " iprot_ = prot.get();" << endl << + indent() << " oprot_ = prot.get();" << endl << + indent() << "}" << endl; + } else { + f_header_ << + indent() << " " << extends << style << "Client(prot, prot) {}" << endl; + } - f_header_ << - indent() << service_name_ << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl; - if (extends.empty()) { f_header_ << - indent() << " piprot_(iprot)," << endl << - indent() << " poprot_(oprot) {" << endl << - indent() << " iprot_ = iprot.get();" << endl << - indent() << " oprot_ = oprot.get();" << endl << + indent() << service_name_ << style << "Client(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) :" << endl; + if (extends.empty()) { + if (style == "Cob") { + f_header_ << + indent() << " rpc_ctx_(ctx)," << endl; + } + f_header_ << + indent() << " piprot_(iprot)," << endl << + indent() << " poprot_(oprot) {" << endl << + indent() << " iprot_ = iprot.get();" << endl << + indent() << " oprot_ = oprot.get();" << endl << + indent() << "}" << endl; + } else { + f_header_ << + indent() << " " << extends << style << "Client(iprot, oprot) {}" << endl; + } + + // Generate getters for the protocols. + f_header_ << + indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl << + indent() << " return piprot_;" << endl << indent() << "}" << endl; - } else { + f_header_ << - indent() << " " << extends << "Client(iprot, oprot) {}" << endl; - } + indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl << + indent() << " return poprot_;" << endl << + indent() << "}" << endl; + } else /* if (style == "Cob") */ { // Generate getters for the protocols. - f_header_ << - indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {" << endl << - indent() << " return piprot_;" << endl << - indent() << "}" << endl; + f_header_ << + indent() << service_name_ << style << "Client(" + << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, " + << "::apache::thrift::protocol::TProtocolFactory* protocolFactory) :" << endl; + if (extends.empty()) { + f_header_ << + indent() << " channel_(channel)," << endl << + indent() << " itrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl << + indent() << " otrans_(new ::apache::thrift::transport::TMemoryBuffer())," << endl << + indent() << " piprot_(protocolFactory->getProtocol(itrans_))," << endl << + indent() << " poprot_(protocolFactory->getProtocol(otrans_)) {" << endl << + indent() << " iprot_ = piprot_.get();" << endl << + indent() << " oprot_ = poprot_.get();" << endl << + indent() << "}" << endl; + } else { + f_header_ << + indent() << " " << extends << style << "Client(channel, protocolFactory) {}" << endl; + } + } - f_header_ << - indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {" << endl << - indent() << " return poprot_;" << endl << - indent() << "}" << endl; + if (style == "Cob") { + f_header_ << + indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {" << endl << + indent() << " return channel_;" << endl << + indent() << "}" << endl; + } vector functions = tservice->get_functions(); vector::const_iterator f_iter; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { + indent(f_header_) << function_signature(*f_iter, ifstyle) << ";" << endl; + // TODO(dreiss): Use private inheritance to avoid generating thise in cob-style. t_function send_function(g_type_void, - string("send_") + (*f_iter)->get_name(), - (*f_iter)->get_arglist()); - indent(f_header_) << function_signature(*f_iter) << ";" << endl; - indent(f_header_) << function_signature(&send_function) << ";" << endl; + string("send_") + (*f_iter)->get_name(), + (*f_iter)->get_arglist()); + indent(f_header_) << function_signature(&send_function, "") << ";" << endl; if (!(*f_iter)->is_oneway()) { t_struct noargs(program_); t_function recv_function((*f_iter)->get_returntype(), - string("recv_") + (*f_iter)->get_name(), - &noargs); - indent(f_header_) << function_signature(&recv_function) << ";" << endl; + string("recv_") + (*f_iter)->get_name(), + &noargs); + indent(f_header_) << function_signature(&recv_function, "") << ";" << endl; } } indent_down(); @@ -1653,11 +1859,19 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { f_header_ << " protected:" << endl; indent_up(); + + if (style == "Cob") { + f_header_ << + indent() << "boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;" << endl << + indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;" << endl << + indent() << "boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;" << endl; + } f_header_ << indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;" << endl << indent() << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;" << endl << indent() << "::apache::thrift::protocol::TProtocol* iprot_;" << endl << indent() << "::apache::thrift::protocol::TProtocol* oprot_;" << endl; + indent_down(); } @@ -1665,7 +1879,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { "};" << endl << endl; - string scope = service_name_ + "Client::"; + string scope = service_name_ + style + "Client::"; // Generate client method implementations for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { @@ -1673,7 +1887,7 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { // Open function indent(f_service_) << - function_signature(*f_iter, scope) << endl; + function_signature(*f_iter, ifstyle, scope) << endl; scope_up(f_service_); indent(f_service_) << "send_" << funname << "("; @@ -1695,155 +1909,171 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { } f_service_ << ");" << endl; - if (!(*f_iter)->is_oneway()) { - f_service_ << indent(); - if (!(*f_iter)->get_returntype()->is_void()) { - if (is_complex_type((*f_iter)->get_returntype())) { - f_service_ << "recv_" << funname << "(_return);" << endl; + if (style != "Cob") { + if (!(*f_iter)->is_oneway()) { + f_service_ << indent(); + if (!(*f_iter)->get_returntype()->is_void()) { + if (is_complex_type((*f_iter)->get_returntype())) { + f_service_ << "recv_" << funname << "(_return);" << endl; + } else { + f_service_ << "return recv_" << funname << "();" << endl; + } } else { - f_service_ << "return recv_" << funname << "();" << endl; + f_service_ << + "recv_" << funname << "();" << endl; } + } + } else { + if (!(*f_iter)->is_oneway()) { + f_service_ << + indent() << _this << "channel_->sendAndRecvMessage(" << + "std::tr1::bind(cob, this), " << _this << "otrans_.get(), " << + _this << "itrans_.get());" << endl; } else { f_service_ << - "recv_" << funname << "();" << endl; + indent() << _this << "channel_->sendMessage(" << + "std::tr1::bind(cob, this), " << _this << "otrans_.get());" << endl; } } scope_down(f_service_); f_service_ << endl; - // Function for sending - t_function send_function(g_type_void, - string("send_") + (*f_iter)->get_name(), - (*f_iter)->get_arglist()); - - // Open the send function - indent(f_service_) << - function_signature(&send_function, scope) << endl; - scope_up(f_service_); - - // Function arguments and results - string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs"; - string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult"; - - // Serialize the request - f_service_ << - indent() << "int32_t cseqid = 0;" << endl << - indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl << - endl << - indent() << argsname << " args;" << endl; - - for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) { - f_service_ << - indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl; - } - - f_service_ << - indent() << "args.write(oprot_);" << endl << - endl << - indent() << "oprot_->writeMessageEnd();" << endl << - indent() << "oprot_->getTransport()->flush();" << endl << - indent() << "oprot_->getTransport()->writeEnd();" << endl; - - scope_down(f_service_); - f_service_ << endl; + //if (style != "Cob") // TODO(dreiss): Libify the client and don't generate this for cob-style + if (true) { + // Function for sending + t_function send_function(g_type_void, + string("send_") + (*f_iter)->get_name(), + (*f_iter)->get_arglist()); - // Generate recv function only if not an oneway function - if (!(*f_iter)->is_oneway()) { - t_struct noargs(program_); - t_function recv_function((*f_iter)->get_returntype(), - string("recv_") + (*f_iter)->get_name(), - &noargs); - // Open function + // Open the send function indent(f_service_) << - function_signature(&recv_function, scope) << endl; + function_signature(&send_function, "", scope) << endl; scope_up(f_service_); + // Function arguments and results + string argsname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_pargs"; + string resultname = tservice->get_name() + "_" + (*f_iter)->get_name() + "_presult"; + + // Serialize the request f_service_ << + indent() << "int32_t cseqid = 0;" << endl << + indent() << "oprot_->writeMessageBegin(\"" << (*f_iter)->get_name() << "\", ::apache::thrift::protocol::T_CALL, cseqid);" << endl << endl << - indent() << "int32_t rseqid = 0;" << endl << - indent() << "std::string fname;" << endl << - indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl << - endl << - indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl << - indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl << - indent() << " ::apache::thrift::TApplicationException x;" << endl << - indent() << " x.read(iprot_);" << endl << - indent() << " iprot_->readMessageEnd();" << endl << - indent() << " iprot_->getTransport()->readEnd();" << endl << - indent() << " throw x;" << endl << - indent() << "}" << endl << - indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl << - indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl << - indent() << " iprot_->readMessageEnd();" << endl << - indent() << " iprot_->getTransport()->readEnd();" << endl << - indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl << - indent() << "}" << endl << - indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl << - indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl << - indent() << " iprot_->readMessageEnd();" << endl << - indent() << " iprot_->getTransport()->readEnd();" << endl << - indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl << - indent() << "}" << endl; + indent() << argsname << " args;" << endl; - if (!(*f_iter)->get_returntype()->is_void() && - !is_complex_type((*f_iter)->get_returntype())) { - t_field returnfield((*f_iter)->get_returntype(), "_return"); + for (fld_iter = fields.begin(); fld_iter != fields.end(); ++fld_iter) { f_service_ << - indent() << declare_field(&returnfield) << endl; + indent() << "args." << (*fld_iter)->get_name() << " = &" << (*fld_iter)->get_name() << ";" << endl; } f_service_ << - indent() << resultname << " result;" << endl; + indent() << "args.write(oprot_);" << endl << + endl << + indent() << "oprot_->writeMessageEnd();" << endl << + indent() << "oprot_->getTransport()->flush();" << endl << + indent() << "oprot_->getTransport()->writeEnd();" << endl; + + scope_down(f_service_); + f_service_ << endl; + + // Generate recv function only if not an oneway function + if (!(*f_iter)->is_oneway()) { + t_struct noargs(program_); + t_function recv_function((*f_iter)->get_returntype(), + string("recv_") + (*f_iter)->get_name(), + &noargs); + // Open function + indent(f_service_) << + function_signature(&recv_function, "", scope) << endl; + scope_up(f_service_); - if (!(*f_iter)->get_returntype()->is_void()) { f_service_ << - indent() << "result.success = &_return;" << endl; - } + endl << + indent() << "int32_t rseqid = 0;" << endl << + indent() << "std::string fname;" << endl << + indent() << "::apache::thrift::protocol::TMessageType mtype;" << endl << + endl << + indent() << "iprot_->readMessageBegin(fname, mtype, rseqid);" << endl << + indent() << "if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {" << endl << + indent() << " ::apache::thrift::TApplicationException x;" << endl << + indent() << " x.read(iprot_);" << endl << + indent() << " iprot_->readMessageEnd();" << endl << + indent() << " iprot_->getTransport()->readEnd();" << endl << + indent() << " throw x;" << endl << + indent() << "}" << endl << + indent() << "if (mtype != ::apache::thrift::protocol::T_REPLY) {" << endl << + indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl << + indent() << " iprot_->readMessageEnd();" << endl << + indent() << " iprot_->getTransport()->readEnd();" << endl << + indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);" << endl << + indent() << "}" << endl << + indent() << "if (fname.compare(\"" << (*f_iter)->get_name() << "\") != 0) {" << endl << + indent() << " iprot_->skip(::apache::thrift::protocol::T_STRUCT);" << endl << + indent() << " iprot_->readMessageEnd();" << endl << + indent() << " iprot_->getTransport()->readEnd();" << endl << + indent() << " throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::WRONG_METHOD_NAME);" << endl << + indent() << "}" << endl; - f_service_ << - indent() << "result.read(iprot_);" << endl << - indent() << "iprot_->readMessageEnd();" << endl << - indent() << "iprot_->getTransport()->readEnd();" << endl << - endl; + if (!(*f_iter)->get_returntype()->is_void() && + !is_complex_type((*f_iter)->get_returntype())) { + t_field returnfield((*f_iter)->get_returntype(), "_return"); + f_service_ << + indent() << declare_field(&returnfield) << endl; + } + + f_service_ << + indent() << resultname << " result;" << endl; - // Careful, only look for _result if not a void function - if (!(*f_iter)->get_returntype()->is_void()) { - if (is_complex_type((*f_iter)->get_returntype())) { + if (!(*f_iter)->get_returntype()->is_void()) { f_service_ << - indent() << "if (result.__isset.success) {" << endl << - indent() << " // _return pointer has now been filled" << endl << - indent() << " return;" << endl << - indent() << "}" << endl; - } else { + indent() << "result.success = &_return;" << endl; + } + + f_service_ << + indent() << "result.read(iprot_);" << endl << + indent() << "iprot_->readMessageEnd();" << endl << + indent() << "iprot_->getTransport()->readEnd();" << endl << + endl; + + // Careful, only look for _result if not a void function + if (!(*f_iter)->get_returntype()->is_void()) { + if (is_complex_type((*f_iter)->get_returntype())) { + f_service_ << + indent() << "if (result.__isset.success) {" << endl << + indent() << " // _return pointer has now been filled" << endl << + indent() << " return;" << endl << + indent() << "}" << endl; + } else { + f_service_ << + indent() << "if (result.__isset.success) {" << endl << + indent() << " return _return;" << endl << + indent() << "}" << endl; + } + } + + t_struct* xs = (*f_iter)->get_xceptions(); + const std::vector& xceptions = xs->get_members(); + vector::const_iterator x_iter; + for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { f_service_ << - indent() << "if (result.__isset.success) {" << endl << - indent() << " return _return;" << endl << + indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl << + indent() << " throw result." << (*x_iter)->get_name() << ";" << endl << indent() << "}" << endl; } - } - t_struct* xs = (*f_iter)->get_xceptions(); - const std::vector& xceptions = xs->get_members(); - vector::const_iterator x_iter; - for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { - f_service_ << - indent() << "if (result.__isset." << (*x_iter)->get_name() << ") {" << endl << - indent() << " throw result." << (*x_iter)->get_name() << ";" << endl << - indent() << "}" << endl; - } + // We only get here if we are a void function + if ((*f_iter)->get_returntype()->is_void()) { + indent(f_service_) << + "return;" << endl; + } else { + f_service_ << + indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl; + } - // We only get here if we are a void function - if ((*f_iter)->get_returntype()->is_void()) { - indent(f_service_) << - "return;" << endl; - } else { - f_service_ << - indent() << "throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, \"" << (*f_iter)->get_name() << " failed: unknown result\");" << endl; + // Close function + scope_down(f_service_); + f_service_ << endl; } - - // Close function - scope_down(f_service_); - f_service_ << endl; } } } @@ -1853,7 +2083,22 @@ void t_cpp_generator::generate_service_client(t_service* tservice) { * * @param tservice The service to generate a server for. */ -void t_cpp_generator::generate_service_processor(t_service* tservice) { +void t_cpp_generator::generate_service_processor(t_service* tservice, string style) { + string ifstyle; + string pstyle; + string finish_cob; + string finish_cob_decl; + string cob_arg; + string ret_type = "bool "; + if (style == "Cob") { + ifstyle = "CobSv"; + pstyle = "Async"; + finish_cob = "std::tr1::function cob, "; + finish_cob_decl = "std::tr1::function, "; + cob_arg = "cob, "; + ret_type = "void "; + } + // Generate the dispatch methods vector functions = tservice->get_functions(); vector::iterator f_iter; @@ -1862,13 +2107,13 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { string extends_processor = ""; if (tservice->get_extends() != NULL) { extends = type_name(tservice->get_extends()); - extends_processor = ", public " + extends + "Processor"; + extends_processor = ", public " + extends + pstyle + "Processor"; } // Generate the header portion f_header_ << - "class " << service_name_ << "Processor : " << - "virtual public ::apache::thrift::TProcessor" << + "class " << service_name_ << pstyle << "Processor : " << + "virtual public ::apache::thrift::T" << pstyle << "Processor" << extends_processor << " {" << endl; // Protected data members @@ -1876,9 +2121,9 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { " protected:" << endl; indent_up(); f_header_ << - indent() << "boost::shared_ptr<" << service_name_ << "If> iface_;" << endl; + indent() << "boost::shared_ptr<" << service_name_ << ifstyle << "If> iface_;" << endl; f_header_ << - indent() << "virtual bool process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl; + indent() << "virtual " << ret_type << "process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid);" << endl; indent_down(); // Process function declarations @@ -1886,10 +2131,20 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { " private:" << endl; indent_up(); f_header_ << - indent() << "std::map processMap_;" << endl; + indent() << "std::map processMap_;" << endl; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { indent(f_header_) << - "void process_" << (*f_iter)->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl; + "void process_" << (*f_iter)->get_name() << "(" << finish_cob << "int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot);" << endl; + if (style == "Cob") { + // XXX Factor this out, even if it is a pain. + string ret_arg = ((*f_iter)->get_returntype()->is_void() + ? "" + : ", const " + type_name((*f_iter)->get_returntype()) + "& _return"); + // XXX Don't declare throw if it doesn't exist + f_header_ << + "void return_" << (*f_iter)->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl << + "void throw_" << (*f_iter)->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl; + } } indent_down(); @@ -1903,6 +2158,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { declare_map += (*f_iter)->get_name(); declare_map += "\"] = &"; declare_map += service_name_; + declare_map += pstyle; declare_map += "Processor::process_"; declare_map += (*f_iter)->get_name(); declare_map += ";\n"; @@ -1911,28 +2167,28 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { f_header_ << " public:" << endl << - indent() << service_name_ << "Processor(boost::shared_ptr<" << service_name_ << "If> iface) :" << endl; + indent() << service_name_ << pstyle << "Processor(boost::shared_ptr<" << service_name_ << ifstyle << "If> iface) :" << endl; if (extends.empty()) { f_header_ << indent() << " iface_(iface) {" << endl; } else { f_header_ << - indent() << " " << extends << "Processor(iface)," << endl << + indent() << " " << extends << pstyle << "Processor(iface)," << endl << indent() << " iface_(iface) {" << endl; } f_header_ << declare_map << indent() << "}" << endl << endl << - indent() << "virtual bool process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl << - indent() << "virtual ~" << service_name_ << "Processor() {}" << endl; + indent() << "virtual " << ret_type << "process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot);" << endl << + indent() << "virtual ~" << service_name_ << pstyle << "Processor() {}" << endl; indent_down(); f_header_ << "};" << endl << endl; // Generate the server implementation f_service_ << - "bool " << service_name_ << "Processor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl; + ret_type << service_name_ << pstyle << "Processor::process(" << finish_cob << "boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot) {" << endl; indent_up(); f_service_ << @@ -1955,10 +2211,11 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { indent() << " oprot->writeMessageEnd();" << endl << indent() << " oprot->getTransport()->flush();" << endl << indent() << " oprot->getTransport()->writeEnd();" << endl << - indent() << " return true;" << endl << + indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl << indent() << "}" << endl << endl << - indent() << "return process_fn(iprot, oprot, fname, seqid);" << + indent() << "return process_fn(" << (style == "Cob" ? "cob, " : "") + << "iprot, oprot, fname, seqid);" << endl; indent_down(); @@ -1967,12 +2224,12 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { endl; f_service_ << - "bool " << service_name_ << "Processor::process_fn(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl; + ret_type << service_name_ << pstyle << "Processor::process_fn(" << finish_cob << "::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid) {" << endl; indent_up(); // HOT: member function pointer map f_service_ << - indent() << "std::map::iterator pfn;" << endl << + indent() << "std::map::iterator pfn;" << endl << indent() << "pfn = processMap_.find(fname);" << endl << indent() << "if (pfn == processMap_.end()) {" << endl; if (extends.empty()) { @@ -1986,15 +2243,26 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { indent() << " oprot->writeMessageEnd();" << endl << indent() << " oprot->getTransport()->flush();" << endl << indent() << " oprot->getTransport()->writeEnd();" << endl << - indent() << " return true;" << endl; + indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl; } else { f_service_ << - indent() << " return " << extends << "Processor::process_fn(iprot, oprot, fname, seqid);" << endl; + indent() << " return " + << extends << pstyle << "Processor::process_fn(" + << (style == "Cob" ? "cob, " : "") + << "iprot, oprot, fname, seqid);" << endl; } f_service_ << indent() << "}" << endl << - indent() << "(this->*(pfn->second))(seqid, iprot, oprot);" << endl << - indent() << "return true;" << endl; + indent() << "(this->*(pfn->second))(" << cob_arg << "seqid, iprot, oprot);" << endl; + + // TODO(dreiss): return pfn ret? + if (style == "Cob") { + f_service_ << + indent() << "return;" << endl; + } else { + f_service_ << + indent() << "return true;" << endl; + } indent_down(); f_service_ << @@ -2003,7 +2271,7 @@ void t_cpp_generator::generate_service_processor(t_service* tservice) { // Generate the process subfunctions for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { - generate_process_function(tservice, *f_iter); + generate_process_function(tservice, *f_iter, style); } } @@ -2036,8 +2304,11 @@ void t_cpp_generator::generate_function_helpers(t_service* tservice, generate_struct_result_writer(f_service_, &result); result.set_name(tservice->get_name() + "_" + tfunction->get_name() + "_presult"); - generate_struct_definition(f_header_, &result, false, true, true, false); + generate_struct_definition(f_header_, &result, false, true, true, gen_cob_style_); generate_struct_reader(f_service_, &result, true); + if (gen_cob_style_) { + generate_struct_writer(f_service_, &result, true); + } } @@ -2047,165 +2318,303 @@ void t_cpp_generator::generate_function_helpers(t_service* tservice, * @param tfunction The function to write a dispatcher for */ void t_cpp_generator::generate_process_function(t_service* tservice, - t_function* tfunction) { - // Open function - f_service_ << - "void " << tservice->get_name() << "Processor::" << - "process_" << tfunction->get_name() << - "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl; - scope_up(f_service_); - - string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args"; - string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result"; - - f_service_ << - indent() << "void* ctx = NULL;" << endl << - indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << - indent() << "}" << endl << - indent() << "// A glorified finally block since ctx is a void*" << endl << - indent() << "class ContextFreer {" << endl << - indent() << " public:" << endl << - indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl << - indent() << " handler_(handler), context_(context) {}" << endl << - indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl << - indent() << " private:" << endl << - indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl << - indent() << " void* context_;" << endl << - indent() << "};" << endl << - indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl << - indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl << - indent() << "}" << endl << endl << - indent() << argsname << " args;" << endl << - indent() << "args.read(iprot);" << endl << - indent() << "iprot->readMessageEnd();" << endl << - indent() << "iprot->getTransport()->readEnd();" << endl << endl << - indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl << - indent() << "}" << endl << - endl; + t_function* tfunction, + string style) { + t_struct* arg_struct = tfunction->get_arglist(); + const std::vector& fields = arg_struct->get_members(); + vector::const_iterator f_iter; t_struct* xs = tfunction->get_xceptions(); const std::vector& xceptions = xs->get_members(); vector::const_iterator x_iter; - // Declare result - if (!tfunction->is_oneway()) { + // I tried to do this as one function. I really did. But it was too hard. + if (style != "Cob") { + // Open function f_service_ << - indent() << resultname << " result;" << endl; - } + "void " << tservice->get_name() << "Processor::" << + "process_" << tfunction->get_name() << "(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl; + scope_up(f_service_); - // Try block for functions with exceptions - f_service_ << - indent() << "try {" << endl; - indent_up(); + string argsname = tservice->get_name() + "_" + tfunction->get_name() + "_args"; + string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result"; - // Generate the function call - t_struct* arg_struct = tfunction->get_arglist(); - const std::vector& fields = arg_struct->get_members(); - vector::const_iterator f_iter; + f_service_ << + indent() << "void* ctx = NULL;" << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << + indent() << "// A glorified finally block since ctx is a void*" << endl << + indent() << "class ContextFreer {" << endl << + indent() << " public:" << endl << + indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl << + indent() << " handler_(handler), context_(context) {}" << endl << + indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl << + indent() << " private:" << endl << + indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl << + indent() << " void* context_;" << endl << + indent() << "};" << endl << + indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << + indent() << argsname << " args;" << endl << + indent() << "args.read(iprot);" << endl << + indent() << "iprot->readMessageEnd();" << endl << + indent() << "iprot->getTransport()->readEnd();" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << + endl; - bool first = true; - f_service_ << indent(); - if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) { - if (is_complex_type(tfunction->get_returntype())) { - first = false; - f_service_ << "iface_->" << tfunction->get_name() << "(result.success"; - } else { - f_service_ << "result.success = iface_->" << tfunction->get_name() << "("; + // Declare result + if (!tfunction->is_oneway()) { + f_service_ << + indent() << resultname << " result;" << endl; } - } else { + + // Try block for functions with exceptions f_service_ << - "iface_->" << tfunction->get_name() << "("; - } - for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { - if (first) { - first = false; + indent() << "try {" << endl; + indent_up(); + + // Generate the function call + bool first = true; + f_service_ << indent(); + if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) { + if (is_complex_type(tfunction->get_returntype())) { + first = false; + f_service_ << "iface_->" << tfunction->get_name() << "(result.success"; + } else { + f_service_ << "result.success = iface_->" << tfunction->get_name() << "("; + } } else { - f_service_ << ", "; + f_service_ << + "iface_->" << tfunction->get_name() << "("; } - f_service_ << "args." << (*f_iter)->get_name(); - } - f_service_ << ");" << endl; + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + if (first) { + first = false; + } else { + f_service_ << ", "; + } + f_service_ << "args." << (*f_iter)->get_name(); + } + f_service_ << ");" << endl; - // Set isset on success field - if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) { - f_service_ << - indent() << "result.__isset.success = true;" << endl; - } + // Set isset on success field + if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) { + f_service_ << + indent() << "result.__isset.success = true;" << endl; + } - indent_down(); - f_service_ << indent() << "}"; + indent_down(); + f_service_ << indent() << "}"; - if (!tfunction->is_oneway()) { - for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { - f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl; - if (!tfunction->is_oneway()) { - indent_up(); - f_service_ << - indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl << - indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl; - indent_down(); - f_service_ << indent() << "}"; - } else { - f_service_ << "}"; + if (!tfunction->is_oneway()) { + for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { + f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl; + if (!tfunction->is_oneway()) { + indent_up(); + f_service_ << + indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl << + indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl; + indent_down(); + f_service_ << indent() << "}"; + } else { + f_service_ << "}"; + } } } - } - f_service_ << " catch (const std::exception& e) {" << endl; + f_service_ << " catch (const std::exception& e) {" << endl; - indent_up(); - f_service_ << - indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl << - indent() << "}" << endl; + indent_up(); + f_service_ << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl; if (!tfunction->is_oneway()) { f_service_ << endl << - indent() << "::apache::thrift::TApplicationException x(e.what());" << endl << - indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl << - indent() << "x.write(oprot);" << endl << - indent() << "oprot->writeMessageEnd();" << endl << - indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl; + indent() << "::apache::thrift::TApplicationException x(e.what());" << endl << + indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl << + indent() << "x.write(oprot);" << endl << + indent() << "oprot->writeMessageEnd();" << endl << + indent() << "oprot->getTransport()->flush();" << endl << + indent() << "oprot->getTransport()->writeEnd();" << endl; } f_service_ << indent() << "return;" << endl; indent_down(); f_service_ << indent() << "}" << endl << endl; - // Shortcut out here for oneway functions - if (tfunction->is_oneway()) { + // Shortcut out here for oneway functions + if (tfunction->is_oneway()) { + f_service_ << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << + indent() << "return;" << endl; + indent_down(); + f_service_ << "}" << endl << + endl; + return; + } + + // Serialize the result into a struct f_service_ << indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << indent() << "}" << endl << endl << - indent() << "return;" << endl; + indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << + indent() << "result.write(oprot);" << endl << + indent() << "oprot->writeMessageEnd();" << endl << + indent() << "oprot->getTransport()->flush();" << endl << + indent() << "oprot->getTransport()->writeEnd();" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl; + // Close function scope_down(f_service_); f_service_ << endl; - return; } - // Serialize the result into a struct - f_service_ << - indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << - indent() << "}" << endl << endl << - indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << - indent() << "result.write(oprot);" << endl << - indent() << "oprot->writeMessageEnd();" << endl << - indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << endl << - indent() << "if (eventHandler_.get() != NULL) {" << endl << - indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << - indent() << "}" << endl; + // Cob style. + else { + // Processor entry point. + f_service_ << + "void " << tservice->get_name() << "AsyncProcessor::" << + "process_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)" << endl; + scope_up(f_service_); + + f_service_ << + indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl << + indent() << "try {" << endl; + indent_up(); + f_service_ << + indent() << "args.read(iprot);" << endl << + indent() << "iprot->readMessageEnd();" << endl << + indent() << "iprot->getTransport()->readEnd();" << endl; + scope_down(f_service_); + + // TODO(dreiss): Handle TExceptions? Expose to server? + f_service_ << + indent() << "catch (const std::exception& exn) {" << endl << + indent() << " return cob(false);" << endl << + indent() << "}" << endl; - // Close function - scope_down(f_service_); - f_service_ << endl; + // TODO(dreiss): Figure out a strategy for exceptions in async handlers. + f_service_ << + indent() << "iface_->" << tfunction->get_name() << "("; + indent_up(); indent_up(); + if (tfunction->is_oneway()) { + // No return. Just hand off our cob. + // TODO(dreiss): Call the cob immediately? + f_service_ << + "std::tr1::bind(cob, true)" << endl; + } else { + f_service_ << endl; + string ret_placeholder = ", std::tr1::placeholders::_1"; + string comma = ""; + if (tfunction->get_returntype()->is_void()) { + ret_placeholder = ""; + } + f_service_ << + indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::" + << "return_" << tfunction->get_name() + << ", this, cob, seqid, oprot" << ret_placeholder << ")"; + if (!xceptions.empty()) { + f_service_ + << ',' << endl << + indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::" + << "throw_" << tfunction->get_name() + << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)"; + } + } + + // XXX Whitespace cleanup. + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + f_service_ + << ',' << endl << + indent() << "args." << (*f_iter)->get_name(); + } + f_service_ << ");" << endl; + indent_down(); indent_down(); + scope_down(f_service_); + f_service_ << endl; + + // Normal return. + if (!tfunction->is_oneway()) { + string ret_arg = (tfunction->get_returntype()->is_void() + ? "" + : ", const " + type_name(tfunction->get_returntype()) + "& _return"); + f_service_ << + "void " << tservice->get_name() << "AsyncProcessor::" << + "return_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl; + scope_up(f_service_); + f_service_ << + indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl; + if (!tfunction->get_returntype()->is_void()) { + // The const_cast here is unfortunate, but it would be a pain to avoid, + // and we only do a write with this struct, which is const-safe. + f_service_ << + indent() << "result.success = const_cast<" << type_name(tfunction->get_returntype()) << "*>(&_return);" << endl << + indent() << "result.__isset.success = true;" << endl; + } + // Serialize the result into a struct + f_service_ << + endl << + indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << + indent() << "result.write(oprot);" << endl << + indent() << "oprot->writeMessageEnd();" << endl << + indent() << "oprot->getTransport()->flush();" << endl << + indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "return cob(true);" << endl; + scope_down(f_service_); + f_service_ << endl; + } + + // Exception return. + if (!tfunction->is_oneway() && !xceptions.empty()) { + f_service_ << + "void " << tservice->get_name() << "AsyncProcessor::" << + "throw_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl; + scope_up(f_service_); + f_service_ << + indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl << + indent() << "try {" << endl; + indent_up(); + f_service_ << + indent() << "_throw->throw_it();" << endl << + indent() << "return cob(false);" << endl; // Is this possible? TBD. + indent_down(); + f_service_ << + indent() << '}'; + for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { + f_service_ << " catch (" << type_name((*x_iter)->get_type()) << " &" << (*x_iter)->get_name() << ") {" << endl; + indent_up(); + f_service_ << + indent() << "result." << (*x_iter)->get_name() << " = " << (*x_iter)->get_name() << ";" << endl << + indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl; + scope_down(f_service_); + } + + // Serialize the result into a struct + f_service_ << + endl << + indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << + indent() << "result.write(oprot);" << endl << + indent() << "oprot->writeMessageEnd();" << endl << + indent() << "oprot->getTransport()->flush();" << endl << + indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "return cob(true);" << endl; + + scope_down(f_service_); + f_service_ << endl; + } // for each function + } // cob style } /** @@ -2261,7 +2670,7 @@ void t_cpp_generator::generate_service_skeleton(t_service* tservice) { vector::iterator f_iter; for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { f_skeleton << - indent() << function_signature(*f_iter) << " {" << endl << + indent() << function_signature(*f_iter, "") << " {" << endl << indent() << " // Your implementation goes here" << endl << indent() << " printf(\"" << (*f_iter)->get_name() << "\\n\");" << endl << indent() << "}" << endl << @@ -2924,21 +3333,46 @@ string t_cpp_generator::declare_field(t_field* tfield, bool init, bool pointer, * @return String of rendered function definition */ string t_cpp_generator::function_signature(t_function* tfunction, + string style, string prefix, bool name_params) { t_type* ttype = tfunction->get_returntype(); t_struct* arglist = tfunction->get_arglist(); + bool has_xceptions = !tfunction->get_xceptions()->get_members().empty(); + + if (style == "") { + if (is_complex_type(ttype)) { + return + "void " + prefix + tfunction->get_name() + + "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") + + argument_list(arglist, name_params, true) + ")"; + } else { + return + type_name(ttype) + " " + prefix + tfunction->get_name() + + "(" + argument_list(arglist, name_params) + ")"; + } + } else if (style.substr(0,3) == "Cob") { + string cob_type; + string exn_cob; + if (style == "CobCl") { + cob_type = "(" + service_name_ + "CobClient* client)"; + } else if (style =="CobSv") { + cob_type = (ttype->is_void() + ? "()" + : ("(" + type_name(ttype) + " const& _return)")); + if (has_xceptions) { + exn_cob = ", std::tr1::function exn_cob"; + } + } else { + throw "UNKNOWN STYLE"; + } - if (is_complex_type(ttype)) { - bool empty = arglist->get_members().size() == 0; return "void " + prefix + tfunction->get_name() + - "(" + type_name(ttype) + (name_params ? "& _return" : "& /* _return */") + - (empty ? "" : (", " + argument_list(arglist, name_params))) + ")"; + "(std::tr1::function cob" + exn_cob + + argument_list(arglist, name_params, true) + ")"; } else { - return - type_name(ttype) + " " + prefix + tfunction->get_name() + - "(" + argument_list(arglist, name_params) + ")"; + throw "UNKNOWN STYLE"; } } @@ -2948,12 +3382,12 @@ string t_cpp_generator::function_signature(t_function* tfunction, * @param tstruct The struct definition * @return Comma sepearated list of all field names in that struct */ -string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params) { +string t_cpp_generator::argument_list(t_struct* tstruct, bool name_params, bool start_comma) { string result = ""; const vector& fields = tstruct->get_members(); vector::const_iterator f_iter; - bool first = true; + bool first = !start_comma; for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { if (first) { first = false; diff --git a/configure.ac b/configure.ac index bf8ef6d7..320c2507 100644 --- a/configure.ac +++ b/configure.ac @@ -311,6 +311,14 @@ if test "$cross_compiling" = "no" ; then AX_SIGNED_RIGHT_SHIFT fi +dnl autoscan thinks we need this macro because we have a member function +dnl called "error". Invoke the macro but don't run the check so autoscan +dnl thinks we are in the clear. It's highly unlikely that we will ever +dnl actually use the function that this checks for. +if false ; then + AC_FUNC_ERROR_AT_LINE +fi + AX_THRIFT_GEN(cpp, [C++], yes) AM_CONDITIONAL([THRIFT_GEN_cpp], [test "$ax_thrift_gen_cpp" = "yes"]) AX_THRIFT_GEN(java, [Java], yes) diff --git a/contrib/async-test/Makefile b/contrib/async-test/Makefile new file mode 100644 index 00000000..33e7f8ad --- /dev/null +++ b/contrib/async-test/Makefile @@ -0,0 +1,33 @@ +THRIFT = thrift +CXXFLAGS = `pkg-config --cflags thrift thrift-nb` -levent +LDLIBS = `pkg-config --libs thrift thrift-nb` -levent +CXXFLAGS += -g -O0 + +GENNAMES = Aggr aggr_types +GENHEADERS = $(addsuffix .h, $(GENNAMES)) +GENSRCS = $(addsuffix .cpp, $(GENNAMES)) +GENOBJS = $(addsuffix .o, $(GENNAMES)) + +PYLIBS = aggr/__init__.py + +PROGS = test-server + +all: $(PYLIBS) $(PROGS) + +test-server: test-server.o $(GENOBJS) + +test-server.o: $(GENSRCS) + +aggr/__init__.py: aggr.thrift + $(RM) $(dir $@) + $(THRIFT) --gen py:newstyle $< + mv gen-py/$(dir $@) . + +$(GENSRCS): aggr.thrift + $(THRIFT) --gen cpp:cob_style $< + mv $(addprefix gen-cpp/, $(GENSRCS) $(GENHEADERS)) . + +clean: + $(RM) -r *.o $(PROGS) $(GENSRCS) $(GENHEADERS) gen-* aggr + +.PHONY: clean diff --git a/contrib/async-test/aggr.thrift b/contrib/async-test/aggr.thrift new file mode 100644 index 00000000..c016a65d --- /dev/null +++ b/contrib/async-test/aggr.thrift @@ -0,0 +1,8 @@ +exception Error { + 1: string desc; +} + +service Aggr { + void addValue(1: i32 value); + list getValues() throws (1: Error err); +} diff --git a/contrib/async-test/test-leaf.py b/contrib/async-test/test-leaf.py new file mode 100755 index 00000000..8b7c3e3f --- /dev/null +++ b/contrib/async-test/test-leaf.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +import sys +import time +from thrift.transport import TTransport +from thrift.transport import TSocket +from thrift.protocol import TBinaryProtocol +from thrift.server import THttpServer +from aggr import Aggr + +class AggrHandler(Aggr.Iface): + def __init__(self): + self.values = [] + + def addValue(self, value): + self.values.append(value) + + def getValues(self, ): + time.sleep(1) + return self.values + +processor = Aggr.Processor(AggrHandler()) +pfactory = TBinaryProtocol.TBinaryProtocolFactory() +THttpServer.THttpServer(processor, ('', int(sys.argv[1])), pfactory).serve() diff --git a/contrib/async-test/test-server.cpp b/contrib/async-test/test-server.cpp new file mode 100644 index 00000000..a55c3484 --- /dev/null +++ b/contrib/async-test/test-server.cpp @@ -0,0 +1,97 @@ +#include +#include "protocol/TBinaryProtocol.h" +#include "async/TAsyncProtocolProcessor.h" +#include "async/TEvhttpServer.h" +#include "async/TEvhttpClientChannel.h" +#include "Aggr.h" + +using std::tr1::bind; +using std::tr1::placeholders::_1; + +using apache::thrift::TException; +using apache::thrift::protocol::TBinaryProtocolFactory; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::async::TEvhttpServer; +using apache::thrift::async::TAsyncProcessor; +using apache::thrift::async::TAsyncBufferProcessor; +using apache::thrift::async::TAsyncProtocolProcessor; +using apache::thrift::async::TAsyncChannel; +using apache::thrift::async::TEvhttpClientChannel; + +class AggrAsyncHandler : public AggrCobSvIf { + protected: + struct RequestContext { + std::tr1::function const& _return)> cob; + std::vector ret; + int pending_calls; + }; + + public: + AggrAsyncHandler() + : eb_(NULL) + , pfact_(new TBinaryProtocolFactory()) + { + leaf_ports_.push_back(8081); + leaf_ports_.push_back(8082); + } + + void addValue(std::tr1::function cob, const int32_t value) { + // Silently drop writes to the aggrgator. + return cob(); + } + + void getValues(std::tr1::function const& _return)> cob, + std::tr1::function exn_cob) { + RequestContext* ctx = new RequestContext(); + ctx->cob = cob; + ctx->pending_calls = leaf_ports_.size(); + for (std::vector::iterator it = leaf_ports_.begin(); + it != leaf_ports_.end(); ++it) { + boost::shared_ptr channel( + new TEvhttpClientChannel( + "localhost", "/", "127.0.0.1", *it, eb_)); + AggrCobClient* client = new AggrCobClient(channel, pfact_.get()); + client->getValues(std::tr1::bind(&AggrAsyncHandler::clientReturn, this, ctx, _1)); + } + } + + void setEventBase(struct event_base* eb) { + eb_ = eb; + } + + void clientReturn(RequestContext* ctx, AggrCobClient* client) { + ctx->pending_calls -= 1; + + try { + std::vector subret; + client->recv_getValues(subret); + ctx->ret.insert(ctx->ret.end(), subret.begin(), subret.end()); + } catch (TException& exn) { + // TODO: Log error + } + + delete client; + + if (ctx->pending_calls == 0) { + ctx->cob(ctx->ret); + delete ctx; + } + } + + protected: + struct event_base* eb_; + std::vector leaf_ports_; + boost::shared_ptr pfact_; +}; + + +int main() { + boost::shared_ptr handler(new AggrAsyncHandler()); + boost::shared_ptr proc(new AggrAsyncProcessor(handler)); + boost::shared_ptr pfact(new TBinaryProtocolFactory()); + boost::shared_ptr bufproc(new TAsyncProtocolProcessor(proc, pfact)); + boost::shared_ptr server(new TEvhttpServer(bufproc, 8080)); + handler->setEventBase(server->getEventBase()); + server->serve(); +} diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 664a58a4..39382c8e 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -70,9 +70,11 @@ libthrift_la_SOURCES = src/Thrift.cpp \ src/server/TSimpleServer.cpp \ src/server/TThreadPoolServer.cpp \ src/server/TThreadedServer.cpp \ + src/async/TAsyncChannel.cpp \ src/processor/PeekProcessor.cpp -libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp +libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \ + src/async/TAsyncProtocolProcessor.cpp libthriftz_la_SOURCES = src/transport/TZlibTransport.cpp @@ -148,6 +150,17 @@ include_processor_HEADERS = \ src/processor/PeekProcessor.h \ src/processor/StatsProcessor.h +include_asyncdir = $(include_thriftdir)/async +include_async_HEADERS = \ + src/async/TAsyncChannel.h \ + src/async/TAsyncProcessor.h \ + src/async/TAsyncBufferProcessor.h \ + src/async/TAsyncProtocolProcessor.h \ + src/async/TEvhttpClientChannel.h \ + src/async/TEvhttpServer.h \ + src/async/SimpleCallback.h + + noinst_PROGRAMS = concurrency_test concurrency_test_SOURCES = src/concurrency/test/Tests.cpp \ diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index 22c10f18..f71a50b6 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -119,4 +119,4 @@ class TProcessor { }} // apache::thrift -#endif // #ifndef _THRIFT_PROCESSOR_H_ +#endif // #ifndef _THRIFT_TPROCESSOR_H_ diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h index 1bce23ff..cb7d55a0 100644 --- a/lib/cpp/src/Thrift.h +++ b/lib/cpp/src/Thrift.h @@ -110,6 +110,29 @@ namespace reflection { namespace local { struct TypeSpec; }} +class TDelayedException { + public: + template static TDelayedException* delayException(const E& e); + virtual void throw_it() = 0; + virtual ~TDelayedException() {}; +}; + +template class TExceptionWrapper : public TDelayedException { + public: + TExceptionWrapper(const E& e) : e_(e) {} + virtual void throw_it() { + E temp(e_); + delete this; + throw temp; + } + private: + E e_; +}; + +template +TDelayedException* TDelayedException::delayException(const E& e) { + return new TExceptionWrapper(e); +} }} // apache::thrift diff --git a/lib/cpp/src/async/SimpleCallback.h b/lib/cpp/src/async/SimpleCallback.h new file mode 100644 index 00000000..4218328d --- /dev/null +++ b/lib/cpp/src/async/SimpleCallback.h @@ -0,0 +1,98 @@ +#ifndef _THRIFT_ASYNC_SIMPLECALLBACK_H_ +#define _THRIFT_ASYNC_SIMPLECALLBACK_H_ 1 + +#include +namespace apache { namespace thrift { + +/** + * A template class for forming simple method callbacks with either an empty + * argument list or one argument of known type. + * + * For more efficiency where tr1::function is overkill. + */ + +template ///< type of return value +class SimpleCallback { + typedef R (C::*cfptr_t)(A); ///< pointer-to-member-function type + cfptr_t fptr_; ///< the embedded function pointer + C* obj_; ///< object whose function we're wrapping + public: + /** + * Constructor for empty callback object. + */ + SimpleCallback() : + fptr_(NULL), obj_(NULL) {} + /** + * Construct callback wrapper for member function. + * + * @param fptr pointer-to-member-function + * @param "this" for object associated with callback + */ + SimpleCallback(cfptr_t fptr, const C* obj) : + fptr_(fptr), obj_(const_cast(obj)) + {} + + /** + * Make a call to the member function we've wrapped. + * + * @param i argument for the wrapped member function + * @return value from that function + */ + R operator()(A i) const { + (obj_->*fptr_)(i); + } + + operator bool() const { + return obj_ != NULL && fptr_ != NULL; + } + + ~SimpleCallback() {} +}; + +/** + * Specialization of SimpleCallback for empty argument list. + */ +template ///< type of return value +class SimpleCallback { + typedef R (C::*cfptr_t)(); ///< pointer-to-member-function type + cfptr_t fptr_; ///< the embedded function pointer + C* obj_; ///< object whose function we're wrapping + public: + /** + * Constructor for empty callback object. + */ + SimpleCallback() : + fptr_(NULL), obj_(NULL) {} + + /** + * Construct callback wrapper for member function. + * + * @param fptr pointer-to-member-function + * @param obj "this" for object associated with callback + */ + SimpleCallback(cfptr_t fptr, const C* obj) : + fptr_(fptr), obj_(const_cast(obj)) + {} + + /** + * Make a call to the member function we've wrapped. + * + * @return value from that function + */ + R operator()() const { + (obj_->*fptr_)(); + } + + operator bool() const { + return obj_ != NULL && fptr_ != NULL; + } + + ~SimpleCallback() {} +}; + +}} // apache::thrift + +#endif /* !_THRIFT_ASYNC_SIMPLECALLBACK_H_ */ diff --git a/lib/cpp/src/async/TAsyncBufferProcessor.h b/lib/cpp/src/async/TAsyncBufferProcessor.h new file mode 100644 index 00000000..06a503e5 --- /dev/null +++ b/lib/cpp/src/async/TAsyncBufferProcessor.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ +#define _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ 1 + +#include +#include + +#include "transport/TBufferTransports.h" + +namespace apache { namespace thrift { namespace async { + +class TAsyncBufferProcessor { + public: + // Process data in "in", putting the result in "out". + // Call _return(true) when done, or _return(false) to + // forcefully close the connection (if applicable). + // "in" and "out" should be TMemoryBuffer or similar, + // not a wrapper around a socket. + virtual void process( + std::tr1::function _return, + boost::shared_ptr ibuf, + boost::shared_ptr obuf) = 0; +}; + +}}} // apache::thrift::async + +#endif // #ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ diff --git a/lib/cpp/src/async/TAsyncChannel.cpp b/lib/cpp/src/async/TAsyncChannel.cpp new file mode 100644 index 00000000..2bf02fe3 --- /dev/null +++ b/lib/cpp/src/async/TAsyncChannel.cpp @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +namespace apache { namespace thrift { namespace async { + +bool TAsyncChannel::sendAndRecvMessage(const VoidCallback& cob, + TMemoryBuffer* sendBuf, + TMemoryBuffer* recvBuf) { + std::tr1::function send_done = + std::tr1::bind(&TAsyncChannel::recvMessage, this, cob, recvBuf); + + return sendMessage(send_done, sendBuf); +} + +}}} // apache::thrift::async diff --git a/lib/cpp/src/async/TAsyncChannel.h b/lib/cpp/src/async/TAsyncChannel.h new file mode 100644 index 00000000..d5cd419f --- /dev/null +++ b/lib/cpp/src/async/TAsyncChannel.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_ +#define _THRIFT_ASYNC_TASYNCCHANNEL_H_ 1 + +#include +#include +#include + +namespace apache { namespace thrift { namespace transport { +class TMemoryBuffer; +}}} + +namespace apache { namespace thrift { namespace async { +using apache::thrift::transport::TMemoryBuffer; + +class TAsyncTransport; + +class TAsyncChannel { + public: + typedef std::tr1::function VoidCallback; + + virtual ~TAsyncChannel() {} + + // is the channel in a good state? + virtual bool good() const = 0; + virtual bool error() const = 0; + virtual bool timedOut() const = 0; + + /** + * Send a message over the channel. + * + * @return true iff the cob has been or will be called, else false + */ + virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0; + + /** + * Receive a message from the channel. + * + * @return true iff the cob has been or will be called, else false + */ + virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) = 0; + + /** + * Send a message over the channel and receive a response. + * + * @return true iff the cob has been or will be called, else false + */ + virtual bool sendAndRecvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* sendBuf, + apache::thrift::transport::TMemoryBuffer* recvBuf); +}; + +}}} // apache::thrift::async + +#endif // #ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_ diff --git a/lib/cpp/src/async/TAsyncProcessor.h b/lib/cpp/src/async/TAsyncProcessor.h new file mode 100644 index 00000000..abf58161 --- /dev/null +++ b/lib/cpp/src/async/TAsyncProcessor.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TASYNCPROCESSOR_H_ +#define _THRIFT_TASYNCPROCESSOR_H_ 1 + +#include +#include +#include +#include + +namespace apache { namespace thrift { namespace async { + +/** + * Async version of a TProcessor. It is not expected to complete by the time + * the call to process returns. Instead, it calls a cob to signal completion. + */ +class TAsyncProcessor { + public: + virtual ~TAsyncProcessor() {} + + virtual void process(std::tr1::function _return, + boost::shared_ptr in, + boost::shared_ptr out) = 0; + + void process(std::tr1::function _return, + boost::shared_ptr io) { + return process(_return, io, io); + } + + protected: + TAsyncProcessor() {} +}; + +}}} // apache::thrift::async + +// XXX I'm lazy for now +namespace apache { namespace thrift { +using apache::thrift::async::TAsyncProcessor; +}} + +#endif // #ifndef _THRIFT_TASYNCPROCESSOR_H_ diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.cpp b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp new file mode 100644 index 00000000..05d504b5 --- /dev/null +++ b/lib/cpp/src/async/TAsyncProtocolProcessor.cpp @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TAsyncProtocolProcessor.h" + +using apache::thrift::transport::TBufferBase; +using apache::thrift::protocol::TProtocol; + +namespace apache { namespace thrift { namespace async { + +void TAsyncProtocolProcessor::process( + std::tr1::function _return, + boost::shared_ptr ibuf, + boost::shared_ptr obuf) { + boost::shared_ptr iprot(pfact_->getProtocol(ibuf)); + boost::shared_ptr oprot(pfact_->getProtocol(obuf)); + return underlying_->process( + std::tr1::bind( + &TAsyncProtocolProcessor::finish, + _return, + oprot, + std::tr1::placeholders::_1), + iprot, oprot); +} + +/* static */ void TAsyncProtocolProcessor::finish( + std::tr1::function _return, + boost::shared_ptr oprot, + bool healthy) { + // This is a stub function to hold a reference to oprot. + return _return(healthy); +} + +}}} // apache::thrift::async diff --git a/lib/cpp/src/async/TAsyncProtocolProcessor.h b/lib/cpp/src/async/TAsyncProtocolProcessor.h new file mode 100644 index 00000000..7ec718bd --- /dev/null +++ b/lib/cpp/src/async/TAsyncProtocolProcessor.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TNAME_ME_H_ +#define _THRIFT_TNAME_ME_H_ 1 + +#include "TAsyncProcessor.h" +#include "TAsyncBufferProcessor.h" +#include "protocol/TProtocol.h" + +namespace apache { namespace thrift { namespace async { + +class TAsyncProtocolProcessor : public TAsyncBufferProcessor { + public: + TAsyncProtocolProcessor( + boost::shared_ptr underlying, + boost::shared_ptr pfact) + : underlying_(underlying) + , pfact_(pfact) + {} + + virtual void process( + std::tr1::function _return, + boost::shared_ptr ibuf, + boost::shared_ptr obuf); + + private: + static void finish( + std::tr1::function _return, + boost::shared_ptr oprot, + bool healthy); + + boost::shared_ptr underlying_; + boost::shared_ptr pfact_; +}; + +}}} // apache::thrift::async + +#endif // #ifndef _THRIFT_TNAME_ME_H_ diff --git a/lib/cpp/src/async/TEvhttpClientChannel.cpp b/lib/cpp/src/async/TEvhttpClientChannel.cpp new file mode 100644 index 00000000..54676a11 --- /dev/null +++ b/lib/cpp/src/async/TEvhttpClientChannel.cpp @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TEvhttpClientChannel.h" +#include + +namespace apache { namespace thrift { namespace async { + + +TEvhttpClientChannel::TEvhttpClientChannel( + const std::string& host, + const std::string& path, + const char* address, + int port, + struct event_base* eb) + : host_(host) + , path_(path) + , recvBuf_(NULL) + , conn_(NULL) +{ + conn_ = evhttp_connection_new(address, port); + if (conn_ == NULL) { + abort(); // XXX + } + evhttp_connection_set_base(conn_, eb); +} + + +TEvhttpClientChannel::~TEvhttpClientChannel() { + if (conn_ != NULL) { + evhttp_connection_free(conn_); + } +} + + +bool TEvhttpClientChannel::sendAndRecvMessage( + const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* sendBuf, + apache::thrift::transport::TMemoryBuffer* recvBuf) { + cob_ = cob; + recvBuf_ = recvBuf; + + struct evhttp_request* req = evhttp_request_new(response, this); + if (req == NULL) { + abort(); // XXX + } + + int rv; + + rv = evhttp_add_header(req->output_headers, "Host", host_.c_str()); + if (rv != 0) { + abort(); // XXX + } + + rv = evhttp_add_header(req->output_headers, "Content-Type", "application/x-thrift"); + if (rv != 0) { + abort(); // XXX + } + + uint8_t* obuf; + uint32_t sz; + sendBuf->getBuffer(&obuf, &sz); + rv = evbuffer_add(req->output_buffer, obuf, sz); + if (rv != 0) { + abort(); // XXX + } + + rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str()); + if (rv != 0) { + abort(); // XXX + } + + return true; +} + + +bool TEvhttpClientChannel::sendMessage( + const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) { + abort(); // XXX +} + + +bool TEvhttpClientChannel::recvMessage( + const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message) { + abort(); // XXX +} + + +void TEvhttpClientChannel::finish(struct evhttp_request* req) { + if (req == NULL) { + return cob_(); + } else if (req->response_code != 200) { + return cob_(); + } + recvBuf_->resetBuffer( + EVBUFFER_DATA(req->input_buffer), + EVBUFFER_LENGTH(req->input_buffer)); + return cob_(); +} + + +/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) { + TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg; + self->finish(req); +} + + +}}} // apache::thrift::async diff --git a/lib/cpp/src/async/TEvhttpClientChannel.h b/lib/cpp/src/async/TEvhttpClientChannel.h new file mode 100644 index 00000000..d2bc4b30 --- /dev/null +++ b/lib/cpp/src/async/TEvhttpClientChannel.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ +#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1 + +#include +#include +#include "TAsyncChannel.h" + +struct event_base; +struct evhttp_connection; +struct evhttp_request; + +namespace apache { namespace thrift { namespace transport { +class TMemoryBuffer; +}}} + +namespace apache { namespace thrift { namespace async { + +class TEvhttpClientChannel : public TAsyncChannel { + public: + using TAsyncChannel::VoidCallback; + + TEvhttpClientChannel( + const std::string& host, + const std::string& path, + const char* address, + int port, + struct event_base* eb); + ~TEvhttpClientChannel(); + + virtual bool sendAndRecvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* sendBuf, + apache::thrift::transport::TMemoryBuffer* recvBuf); + + virtual bool sendMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message); + virtual bool recvMessage(const VoidCallback& cob, apache::thrift::transport::TMemoryBuffer* message); + + void finish(struct evhttp_request* req); + + //XXX + virtual bool good() const { return true; } + virtual bool error() const { return false; } + virtual bool timedOut() const { return false; } + + private: + static void response(struct evhttp_request* req, void* arg); + + std::string host_; + std::string path_; + VoidCallback cob_; + apache::thrift::transport::TMemoryBuffer* recvBuf_; + struct evhttp_connection* conn_; + +}; + +}}} // apache::thrift::async + +#endif // #ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ diff --git a/lib/cpp/src/async/TEvhttpServer.cpp b/lib/cpp/src/async/TEvhttpServer.cpp new file mode 100644 index 00000000..29975971 --- /dev/null +++ b/lib/cpp/src/async/TEvhttpServer.cpp @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "TEvhttpServer.h" +#include "TAsyncBufferProcessor.h" +#include "transport/TBufferTransports.h" +#include + +using apache::thrift::transport::TMemoryBuffer; + +namespace apache { namespace thrift { namespace async { + + +struct TEvhttpServer::RequestContext { + struct evhttp_request* req; + boost::shared_ptr ibuf; + boost::shared_ptr obuf; + + RequestContext(struct evhttp_request* req); +}; + + +TEvhttpServer::TEvhttpServer(boost::shared_ptr processor) + : processor_(processor) + , eb_(NULL) + , eh_(NULL) +{} + + +TEvhttpServer::TEvhttpServer(boost::shared_ptr processor, int port) + : processor_(processor) + , eb_(NULL) + , eh_(NULL) +{ + // Create event_base and evhttp. + eb_ = event_base_new(); + if (eb_ == NULL) { + abort(); // XXX + } + eh_ = evhttp_new(eb_); + if (eh_ == NULL) { + event_base_free(eb_); + abort(); // XXX + } + + // Bind to port. + int ret = evhttp_bind_socket(eh_, NULL, port); + if (ret < 0) { + evhttp_free(eh_); + event_base_free(eb_); + } + + // Register a handler. If you use the other constructor, + // you will want to do this yourself. + // Don't forget to unregister before destorying this TEvhttpServer. + evhttp_set_cb(eh_, "/", request, (void*)this); +} + + +TEvhttpServer::~TEvhttpServer() { + if (eh_ != NULL) { + evhttp_free(eh_); + } + if (eb_ != NULL) { + event_base_free(eb_); + } +} + + +int TEvhttpServer::serve() { + if (eb_ == NULL) { + abort(); // XXX + } + return event_base_dispatch(eb_); +} + + +TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) : req(req) + , ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer))) + , obuf(new TMemoryBuffer()) +{} + + +void TEvhttpServer::request(struct evhttp_request* req, void* self) { + static_cast(self)->process(req); +} + + +void TEvhttpServer::process(struct evhttp_request* req) { + RequestContext* ctx = new RequestContext(req); + return processor_->process( + std::tr1::bind( + &TEvhttpServer::complete, + this, + ctx, + std::tr1::placeholders::_1), + ctx->ibuf, + ctx->obuf); +} + + +void TEvhttpServer::complete(RequestContext* ctx, bool success) { + std::auto_ptr ptr(ctx); + + int code = 200; + const char* reason = "OK"; + + int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift"); + if (rv != 0) { + // TODO: Log an error. + } + + struct evbuffer* buf = evbuffer_new(); + if (buf == NULL) { + // TODO: Log an error. + } else { + uint8_t* obuf; + uint32_t sz; + ctx->obuf->getBuffer(&obuf, &sz); + int ret = evbuffer_add(buf, obuf, sz); + if (ret != 0) { + // TODO: Log an error. + } + } + + evhttp_send_reply(ctx->req, code, reason, buf); + if (buf != NULL) { + evbuffer_free(buf); + } +} + + +struct event_base* TEvhttpServer::getEventBase() { + return eb_; +} + + +}}} // apache::thrift::async diff --git a/lib/cpp/src/async/TEvhttpServer.h b/lib/cpp/src/async/TEvhttpServer.h new file mode 100644 index 00000000..edc6ffb7 --- /dev/null +++ b/lib/cpp/src/async/TEvhttpServer.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TEVHTTP_SERVER_H_ +#define _THRIFT_TEVHTTP_SERVER_H_ 1 + +#include + +struct event_base; +struct evhttp; +struct evhttp_request; + +namespace apache { namespace thrift { namespace async { + +class TAsyncBufferProcessor; + +class TEvhttpServer { + public: + /** + * Create a TEvhttpServer for use with an external evhttp instance. + * Must be manually installed with evhttp_set_cb, using + * TEvhttpServer::request as the callback and the + * address of the server as the extra arg. + * Do not call "serve" on this server. + */ + TEvhttpServer(boost::shared_ptr processor); + + /** + * Create a TEvhttpServer with an embedded event_base and evhttp, + * listening on port and responding on the endpoint "/". + * Call "serve" on this server to serve forever. + */ + TEvhttpServer(boost::shared_ptr processor, int port); + + ~TEvhttpServer(); + + static void request(struct evhttp_request* req, void* self); + int serve(); + + struct event_base* getEventBase(); + + private: + struct RequestContext; + + void process(struct evhttp_request* req); + void complete(RequestContext* ctx, bool success); + + boost::shared_ptr processor_; + struct event_base* eb_; + struct evhttp* eh_; +}; + +}}} // apache::thrift::async + +#endif // #ifndef _THRIFT_TEVHTTP_SERVER_H_ -- 2.17.1