From 18cd0f03340c3eeead9b691e0d9b8a055035eec2 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:09:39 +0000 Subject: [PATCH] THRIFT-928. cpp: Make clients call writeEnd on their transports before flush Changing the order of these calls makes more sense from the perspective of logical operations. It also simplifies the upcoming stats collection code. No clients should be affected. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005128 13f79535-47bb-0310-9956-ffa450edef68 --- compiler/cpp/src/generate/t_cpp_generator.cc | 93 +++++++++++++------- lib/cpp/src/TProcessor.h | 15 ++++ lib/cpp/src/async/TAsyncProcessor.h | 22 +++++ 3 files changed, 99 insertions(+), 31 deletions(-) diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index 0ba4540f..c2f9e2e7 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -1970,8 +1970,8 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style) indent() << "args.write(oprot_);" << endl << endl << indent() << "oprot_->writeMessageEnd();" << endl << - indent() << "oprot_->getTransport()->flush();" << endl << - indent() << "oprot_->getTransport()->writeEnd();" << endl; + indent() << "oprot_->getTransport()->writeEnd();" << endl << + indent() << "oprot_->getTransport()->flush();" << endl; scope_down(f_service_); f_service_ << endl; @@ -2142,8 +2142,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty : ", const " + type_name((*f_iter)->get_returntype()) + "& _return"); // XXX Don't declare throw if it doesn't exist f_header_ << - "void return_" << (*f_iter)->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl << - "void throw_" << (*f_iter)->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl; + "void return_" << (*f_iter)->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ");" << endl << + "void throw_" << (*f_iter)->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw);" << endl; } } indent_down(); @@ -2209,8 +2209,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty indent() << " oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl << indent() << " x.write(oprot);" << endl << indent() << " oprot->writeMessageEnd();" << endl << - indent() << " oprot->getTransport()->flush();" << endl << indent() << " oprot->getTransport()->writeEnd();" << endl << + indent() << " oprot->getTransport()->flush();" << endl << indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl << indent() << "}" << endl << endl << @@ -2241,8 +2241,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty indent() << " oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl << indent() << " x.write(oprot);" << endl << indent() << " oprot->writeMessageEnd();" << endl << - indent() << " oprot->getTransport()->flush();" << endl << indent() << " oprot->getTransport()->writeEnd();" << endl << + indent() << " oprot->getTransport()->flush();" << endl << indent() << (style == "Cob" ? " return cob(true);" : " return true;") << endl; } else { f_service_ << @@ -2344,17 +2344,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << "if (eventHandler_.get() != NULL) {" << endl << indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << indent() << "}" << endl << - indent() << "// A glorified finally block since ctx is a void*" << endl << - indent() << "class ContextFreer {" << endl << - indent() << " public:" << endl << - indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl << - indent() << " handler_(handler), context_(context) {}" << endl << - indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl << - indent() << " private:" << endl << - indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl << - indent() << " void* context_;" << endl << - indent() << "};" << endl << - indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl << + indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl << indent() << "}" << endl << endl << @@ -2442,8 +2432,8 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl << indent() << "x.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << - indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl; + indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "oprot->getTransport()->flush();" << endl; } f_service_ << indent() << "return;" << endl; indent_down(); @@ -2470,8 +2460,8 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << indent() << "result.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << - indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << endl << + indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "oprot->getTransport()->flush();" << endl << endl << indent() << "if (eventHandler_.get() != NULL) {" << endl << indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << indent() << "}" << endl; @@ -2491,22 +2481,43 @@ void t_cpp_generator::generate_process_function(t_service* tservice, f_service_ << indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl << + indent() << "void* ctx = NULL;" << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << + indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl << indent() << "try {" << endl; indent_up(); f_service_ << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << indent() << "args.read(iprot);" << endl << indent() << "iprot->readMessageEnd();" << endl << - indent() << "iprot->getTransport()->readEnd();" << endl; + indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << + indent() << "}" << endl; scope_down(f_service_); // TODO(dreiss): Handle TExceptions? Expose to server? f_service_ << indent() << "catch (const std::exception& exn) {" << endl << + indent() << " if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << " }" << endl << indent() << " return cob(false);" << endl << indent() << "}" << endl; + if (tfunction->is_oneway()) { + f_service_ << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->onewayComplete(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl; + } // TODO(dreiss): Figure out a strategy for exceptions in async handlers. f_service_ << + indent() << "freer.unregister();" << endl << indent() << "iface_->" << tfunction->get_name() << "("; indent_up(); indent_up(); if (tfunction->is_oneway()) { @@ -2524,13 +2535,13 @@ void t_cpp_generator::generate_process_function(t_service* tservice, f_service_ << indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::" << "return_" << tfunction->get_name() - << ", this, cob, seqid, oprot" << ret_placeholder << ")"; + << ", this, cob, seqid, oprot, ctx" << ret_placeholder << ")"; if (!xceptions.empty()) { f_service_ - << ',' << endl << + << ',' << endl << indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::" - << "throw_" << tfunction->get_name() - << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)"; + << "throw_" << tfunction->get_name() + << ", this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1)"; } } @@ -2552,7 +2563,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, : ", const " + type_name(tfunction->get_returntype()) + "& _return"); f_service_ << "void " << tservice->get_name() << "AsyncProcessor::" << - "return_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl; + "return_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ')' << endl; scope_up(f_service_); f_service_ << indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl; @@ -2566,11 +2577,21 @@ void t_cpp_generator::generate_process_function(t_service* tservice, // Serialize the result into a struct f_service_ << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << + indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << indent() << "result.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << + indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl << indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << + indent() << "}" << endl << indent() << "return cob(true);" << endl; scope_down(f_service_); f_service_ << endl; @@ -2580,7 +2601,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice, if (!tfunction->is_oneway() && !xceptions.empty()) { f_service_ << "void " << tservice->get_name() << "AsyncProcessor::" << - "throw_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl; + "throw_" << tfunction->get_name() << "(std::tr1::function cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw)" << endl; scope_up(f_service_); f_service_ << indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl << @@ -2600,17 +2621,27 @@ void t_cpp_generator::generate_process_function(t_service* tservice, indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl; scope_down(f_service_); } + // TODO(dreiss): Handle the case where an undeclared exception is thrown? // Serialize the result into a struct f_service_ << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << + indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << indent() << "result.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << + indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl << indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl << + indent() << "}" << endl << indent() << "return cob(true);" << endl; - scope_down(f_service_); f_service_ << endl; } // for each function diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index f71a50b6..896f5ae6 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -85,6 +85,21 @@ class TProcessorEventHandler { TProcessorEventHandler() {} }; +/** + * A helper class used by the generated code to free each context. + */ +class TProcessorContextFreer { + public: + TProcessorContextFreer(TProcessorEventHandler* handler, void* context, const char* method) : + handler_(handler), context_(context), method_(method) {} + ~TProcessorContextFreer() { if (handler_ != NULL) handler_->freeContext(context_, method_); } + void unregister() { handler_ = NULL; } + private: + apache::thrift::TProcessorEventHandler* handler_; + void* context_; + const char* method_; +}; + /** * A processor is a generic object that acts upon two streams of data, one * an input and the other an output. The definition of this object is loose, diff --git a/lib/cpp/src/async/TAsyncProcessor.h b/lib/cpp/src/async/TAsyncProcessor.h index abf58161..a0b54281 100644 --- a/lib/cpp/src/async/TAsyncProcessor.h +++ b/lib/cpp/src/async/TAsyncProcessor.h @@ -31,6 +31,9 @@ namespace apache { namespace thrift { namespace async { * Async version of a TProcessor. It is not expected to complete by the time * the call to process returns. Instead, it calls a cob to signal completion. */ + +class TEventServer; // forward declaration + class TAsyncProcessor { public: virtual ~TAsyncProcessor() {} @@ -44,8 +47,27 @@ class TAsyncProcessor { return process(_return, io, io); } + boost::shared_ptr getEventHandler() { + return eventHandler_; + } + + void setEventHandler(boost::shared_ptr eventHandler) { + eventHandler_ = eventHandler; + } + + const TEventServer* getAsyncServer() { + return asyncServer_; + } protected: TAsyncProcessor() {} + + boost::shared_ptr eventHandler_; + const TEventServer* asyncServer_; + private: + friend class TEventServer; + void setAsyncServer(const TEventServer* server) { + asyncServer_ = server; + } }; }}} // apache::thrift::async -- 2.17.1