THRIFT-928. cpp: Make clients call writeEnd on their transports before flush
authorDavid Reiss <dreiss@apache.org>
Wed, 6 Oct 2010 17:09:39 +0000 (17:09 +0000)
committerDavid Reiss <dreiss@apache.org>
Wed, 6 Oct 2010 17:09:39 +0000 (17:09 +0000)
Changing the order of these calls makes more sense from the perspective
of logical operations.  It also simplifies the upcoming stats collection
code.  No clients should be affected.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005128 13f79535-47bb-0310-9956-ffa450edef68

compiler/cpp/src/generate/t_cpp_generator.cc
lib/cpp/src/TProcessor.h
lib/cpp/src/async/TAsyncProcessor.h

index 0ba4540..c2f9e2e 100644 (file)
@@ -1970,8 +1970,8 @@ void t_cpp_generator::generate_service_client(t_service* tservice, string style)
         indent() << "args.write(oprot_);" << endl <<
         endl <<
         indent() << "oprot_->writeMessageEnd();" << endl <<
-        indent() << "oprot_->getTransport()->flush();" << endl <<
-        indent() << "oprot_->getTransport()->writeEnd();" << endl;
+        indent() << "oprot_->getTransport()->writeEnd();" << endl <<
+        indent() << "oprot_->getTransport()->flush();" << endl;
 
       scope_down(f_service_);
       f_service_ << endl;
@@ -2142,8 +2142,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty
                         : ", const " + type_name((*f_iter)->get_returntype()) + "& _return");
       // XXX Don't declare throw if it doesn't exist
       f_header_ <<
-        "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ");" << endl <<
-        "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw);" << endl;
+        "void return_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ");" << endl <<
+        "void throw_" << (*f_iter)->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw);" << endl;
     }
   }
   indent_down();
@@ -2209,8 +2209,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty
     indent() << "  oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
     indent() << "  x.write(oprot);" << endl <<
     indent() << "  oprot->writeMessageEnd();" << endl <<
-    indent() << "  oprot->getTransport()->flush();" << endl <<
     indent() << "  oprot->getTransport()->writeEnd();" << endl <<
+    indent() << "  oprot->getTransport()->flush();" << endl <<
     indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl <<
     indent() << "}" << endl <<
     endl <<
@@ -2241,8 +2241,8 @@ void t_cpp_generator::generate_service_processor(t_service* tservice, string sty
       indent() << "  oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
       indent() << "  x.write(oprot);" << endl <<
       indent() << "  oprot->writeMessageEnd();" << endl <<
-      indent() << "  oprot->getTransport()->flush();" << endl <<
       indent() << "  oprot->getTransport()->writeEnd();" << endl <<
+      indent() << "  oprot->getTransport()->flush();" << endl <<
       indent() << (style == "Cob" ? "  return cob(true);" : "  return true;") << endl;
   } else {
     f_service_ <<
@@ -2344,17 +2344,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl <<
-      indent() << "// A glorified finally block since ctx is a void*" << endl <<
-      indent() << "class ContextFreer {" << endl <<
-      indent() << "  public:" << endl <<
-      indent() << "    ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl <<
-      indent() << "      handler_(handler), context_(context) {}" << endl <<
-      indent() << "    ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl <<
-      indent() << "  private:" << endl <<
-      indent() << "    ::apache::thrift::TProcessorEventHandler* handler_;" << endl <<
-      indent() << "    void* context_;" << endl <<
-      indent() << "};" << endl <<
-      indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl <<
+      indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl << endl <<
@@ -2442,8 +2432,8 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
         indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl <<
         indent() << "x.write(oprot);" << endl <<
         indent() << "oprot->writeMessageEnd();" << endl <<
-        indent() << "oprot->getTransport()->flush();" << endl <<
-        indent() << "oprot->getTransport()->writeEnd();" << endl;
+        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "oprot->getTransport()->flush();" << endl;
   }
   f_service_ << indent() << "return;" << endl;
   indent_down();
@@ -2470,8 +2460,8 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
       indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
       indent() << "result.write(oprot);" << endl <<
       indent() << "oprot->writeMessageEnd();" << endl <<
-      indent() << "oprot->getTransport()->flush();" << endl <<
-      indent() << "oprot->getTransport()->writeEnd();" << endl << endl <<
+      indent() << "oprot->getTransport()->writeEnd();" << endl <<
+      indent() << "oprot->getTransport()->flush();" << endl << endl <<
       indent() << "if (eventHandler_.get() != NULL) {" << endl <<
       indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
       indent() << "}" << endl;
@@ -2491,22 +2481,43 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
 
     f_service_ <<
       indent() << tservice->get_name() + "_" + tfunction->get_name() + "_args" << " args;" << endl <<
+      indent() << "void* ctx = NULL;" << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
+      indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
       indent() << "try {" << endl;
     indent_up();
     f_service_ <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "}" << endl <<
       indent() << "args.read(iprot);" << endl <<
       indent() << "iprot->readMessageEnd();" << endl <<
-      indent() << "iprot->getTransport()->readEnd();" << endl;
+      indent() << "uint32_t bytes = iprot->getTransport()->readEnd();" << endl <<
+      indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "  eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+      indent() << "}" << endl;
     scope_down(f_service_);
 
     // TODO(dreiss): Handle TExceptions?  Expose to server?
     f_service_ <<
       indent() << "catch (const std::exception& exn) {" << endl <<
+      indent() << "  if (eventHandler_.get() != NULL) {" << endl <<
+      indent() << "    eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+      indent() << "  }" << endl <<
       indent() << "  return cob(false);" << endl <<
       indent() << "}" << endl;
 
+    if (tfunction->is_oneway()) {
+      f_service_ <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->onewayComplete(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl;
+    }
     // TODO(dreiss): Figure out a strategy for exceptions in async handlers.
     f_service_ <<
+      indent() << "freer.unregister();" << endl <<
       indent() << "iface_->" << tfunction->get_name() << "(";
     indent_up(); indent_up();
     if (tfunction->is_oneway()) {
@@ -2524,13 +2535,13 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
       f_service_ <<
         indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
                  << "return_" << tfunction->get_name()
-                 << ", this, cob, seqid, oprot" << ret_placeholder << ")";
+                 << ", this, cob, seqid, oprot, ctx" << ret_placeholder << ")";
       if (!xceptions.empty()) {
         f_service_
-                   << ',' << endl <<
+              << ',' << endl <<
           indent() << "std::tr1::bind(&" << tservice->get_name() << "AsyncProcessor::"
-                   << "throw_" << tfunction->get_name()
-                   << ", this, cob, seqid, oprot, std::tr1::placeholders::_1)";
+              << "throw_" << tfunction->get_name()
+              << ", this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1)";
       }
     }
 
@@ -2552,7 +2563,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
                         : ", const " + type_name(tfunction->get_returntype()) + "& _return");
       f_service_ <<
         "void " << tservice->get_name() << "AsyncProcessor::" <<
-        "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot" << ret_arg << ')' << endl;
+        "return_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx" << ret_arg << ')' << endl;
       scope_up(f_service_);
       f_service_ <<
         indent() << tservice->get_name() << "_" << tfunction->get_name() << "_presult result;" << endl;
@@ -2566,11 +2577,21 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
       // Serialize the result into a struct
       f_service_ <<
         endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl <<
+        indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl << endl <<
         indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
         indent() << "result.write(oprot);" << endl <<
         indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl <<
         indent() << "oprot->getTransport()->flush();" << endl <<
-        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+        indent() << "}" << endl <<
         indent() << "return cob(true);" << endl;
       scope_down(f_service_);
       f_service_ << endl;
@@ -2580,7 +2601,7 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
     if (!tfunction->is_oneway() && !xceptions.empty()) {
       f_service_ <<
         "void " << tservice->get_name() << "AsyncProcessor::" <<
-        "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, ::apache::thrift::TDelayedException* _throw)" << endl;
+        "throw_" << tfunction->get_name() << "(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, ::apache::thrift::TDelayedException* _throw)" << endl;
       scope_up(f_service_);
       f_service_ <<
         indent() << tservice->get_name() << "_" << tfunction->get_name() << "_result result;" << endl << endl <<
@@ -2600,17 +2621,27 @@ void t_cpp_generator::generate_process_function(t_service* tservice,
           indent() << "result.__isset." << (*x_iter)->get_name() << " = true;" << endl;
         scope_down(f_service_);
       }
+      // TODO(dreiss): Handle the case where an undeclared exception is thrown?
 
       // Serialize the result into a struct
       f_service_ <<
         endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl <<
+        indent() << "::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, \"" << tfunction->get_name() << "\");" << endl << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl <<
+        indent() << "}" << endl << endl <<
         indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl <<
         indent() << "result.write(oprot);" << endl <<
         indent() << "oprot->writeMessageEnd();" << endl <<
+        indent() << "uint32_t bytes = oprot->getTransport()->writeEnd();" << endl <<
         indent() << "oprot->getTransport()->flush();" << endl <<
-        indent() << "oprot->getTransport()->writeEnd();" << endl <<
+        indent() << "if (eventHandler_.get() != NULL) {" << endl <<
+        indent() << "  eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\", bytes);" << endl <<
+        indent() << "}" << endl <<
         indent() << "return cob(true);" << endl;
-
       scope_down(f_service_);
       f_service_ << endl;
     } // for each function
index f71a50b..896f5ae 100644 (file)
@@ -85,6 +85,21 @@ class TProcessorEventHandler {
   TProcessorEventHandler() {}
 };
 
+/**
+ * A helper class used by the generated code to free each context.
+ */
+class TProcessorContextFreer {
+ public:
+  TProcessorContextFreer(TProcessorEventHandler* handler, void* context, const char* method) :
+    handler_(handler), context_(context), method_(method) {}
+  ~TProcessorContextFreer() { if (handler_ != NULL) handler_->freeContext(context_, method_); }
+  void unregister() { handler_ = NULL; }
+ private:
+  apache::thrift::TProcessorEventHandler* handler_;
+  void* context_;
+  const char* method_;
+};
+
 /**
  * A processor is a generic object that acts upon two streams of data, one
  * an input and the other an output. The definition of this object is loose,
index abf5816..a0b5428 100644 (file)
@@ -31,6 +31,9 @@ namespace apache { namespace thrift { namespace async {
  * Async version of a TProcessor.  It is not expected to complete by the time
  * the call to process returns.  Instead, it calls a cob to signal completion.
  */
+
+class TEventServer; // forward declaration
+
 class TAsyncProcessor {
  public:
   virtual ~TAsyncProcessor() {}
@@ -44,8 +47,27 @@ class TAsyncProcessor {
     return process(_return, io, io);
   }
 
+  boost::shared_ptr<TProcessorEventHandler> getEventHandler() {
+    return eventHandler_;
+  }
+
+  void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) {
+    eventHandler_ = eventHandler;
+  }
+
+  const TEventServer* getAsyncServer() {
+    return asyncServer_;
+  }
  protected:
   TAsyncProcessor() {}
+
+  boost::shared_ptr<TProcessorEventHandler> eventHandler_;
+  const TEventServer* asyncServer_;
+ private:
+  friend class TEventServer;
+  void setAsyncServer(const TEventServer* server) {
+    asyncServer_ = server;
+  }
 };
 
 }}} // apache::thrift::async