From: David Reiss Date: Wed, 6 Oct 2010 17:09:33 +0000 (+0000) Subject: THRIFT-928. cpp: Processor-level event callbacks X-Git-Tag: 0.6.0~134 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=d7192063e141c8374c2256e9fea9b7571e5fc71e;p=common%2Fthrift.git THRIFT-928. cpp: Processor-level event callbacks - Add a TProcessorEventHandler callback interface. - Add methods to TProcessor to hold an instance of the interface. - Add code to the compiler to make the processor call callbacks at key points. - Add an optional processor event handler to the test server. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005126 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/compiler/cpp/src/generate/t_cpp_generator.cc b/compiler/cpp/src/generate/t_cpp_generator.cc index 59749b7a..ef0dd366 100644 --- a/compiler/cpp/src/generate/t_cpp_generator.cc +++ b/compiler/cpp/src/generate/t_cpp_generator.cc @@ -2059,10 +2059,31 @@ void t_cpp_generator::generate_process_function(t_service* tservice, string resultname = tservice->get_name() + "_" + tfunction->get_name() + "_result"; f_service_ << + indent() << "void* ctx = NULL;" << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " ctx = eventHandler_->getContext(\"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << + indent() << "// A glorified finally block since ctx is a void*" << endl << + indent() << "class ContextFreer {" << endl << + indent() << " public:" << endl << + indent() << " ContextFreer(::apache::thrift::TProcessorEventHandler* handler, void* context) :" << endl << + indent() << " handler_(handler), context_(context) {}" << endl << + indent() << " ~ContextFreer() { if (handler_ != NULL) handler_->freeContext(" << "context_, \"" << tfunction->get_name() << "\"); }" << endl << + indent() << " private:" << endl << + indent() << " ::apache::thrift::TProcessorEventHandler* handler_;" << endl << + indent() << " void* context_;" << endl << + indent() << "};" << endl << + indent() << "ContextFreer freer(eventHandler_.get(), ctx);" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->preRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << indent() << argsname << " args;" << endl << indent() << "args.read(iprot);" << endl << indent() << "iprot->readMessageEnd();" << endl << - indent() << "iprot->getTransport()->readEnd();" << endl << + indent() << "iprot->getTransport()->readEnd();" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postRead(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl; t_struct* xs = tfunction->get_xceptions(); @@ -2135,38 +2156,52 @@ void t_cpp_generator::generate_process_function(t_service* tservice, f_service_ << " catch (const std::exception& e) {" << endl; + indent_up(); + f_service_ << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->handlerError(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl; + if (!tfunction->is_oneway()) { - indent_up(); f_service_ << + endl << indent() << "::apache::thrift::TApplicationException x(e.what());" << endl << indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_EXCEPTION, seqid);" << endl << indent() << "x.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl << - indent() << "return;" << endl; - indent_down(); + indent() << "oprot->getTransport()->writeEnd();" << endl; } - f_service_ << indent() << "}" << endl; + f_service_ << indent() << "return;" << endl; + indent_down(); + f_service_ << indent() << "}" << endl << endl; // Shortcut out here for oneway functions if (tfunction->is_oneway()) { f_service_ << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->asyncComplete(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << indent() << "return;" << endl; - indent_down(); - f_service_ << "}" << endl << - endl; + // Close function + scope_down(f_service_); + f_service_ << endl; return; } // Serialize the result into a struct f_service_ << - endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->preWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl << endl << indent() << "oprot->writeMessageBegin(\"" << tfunction->get_name() << "\", ::apache::thrift::protocol::T_REPLY, seqid);" << endl << indent() << "result.write(oprot);" << endl << indent() << "oprot->writeMessageEnd();" << endl << indent() << "oprot->getTransport()->flush();" << endl << - indent() << "oprot->getTransport()->writeEnd();" << endl; + indent() << "oprot->getTransport()->writeEnd();" << endl << endl << + indent() << "if (eventHandler_.get() != NULL) {" << endl << + indent() << " eventHandler_->postWrite(ctx, \"" << tfunction->get_name() << "\");" << endl << + indent() << "}" << endl; // Close function scope_down(f_service_); diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index f2d5279a..22c10f18 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -26,6 +26,65 @@ namespace apache { namespace thrift { +/** + * Virtual interface class that can handle events from the processor. To + * use this you should subclass it and implement the methods that you care + * about. Your subclass can also store local data that you may care about, + * such as additional "arguments" to these methods (stored in the object + * instance's state). + */ +class TProcessorEventHandler { + public: + + virtual ~TProcessorEventHandler() {} + + /** + * Called before calling other callback methods. + * Expected to return some sort of context object. + * The return value is passed to all other callbacks + * for that function invocation. + */ + virtual void* getContext(const char* fn_name) { return NULL; } + + /** + * Expected to free resources associated with a context. + */ + virtual void freeContext(void* ctx, const char* fn_name) { } + + /** + * Called before reading arguments. + */ + virtual void preRead(void* ctx, const char* fn_name) {} + + /** + * Called between reading arguments and calling the handler. + */ + virtual void postRead(void* ctx, const char* fn_name) {} + + /** + * Called between calling the handler and writing the response. + */ + virtual void preWrite(void* ctx, const char* fn_name) {} + + /** + * Called after writing the response. + */ + virtual void postWrite(void* ctx, const char* fn_name) {} + + /** + * Called when an async function call completes successfully. + */ + virtual void asyncComplete(void* ctx, const char* fn_name) {} + + /** + * Called if the handler throws an undeclared exception. + */ + virtual void handlerError(void* ctx, const char* fn_name) {} + + protected: + TProcessorEventHandler() {} +}; + /** * A processor is a generic object that acts upon two streams of data, one * an input and the other an output. The definition of this object is loose, @@ -44,8 +103,18 @@ class TProcessor { return process(io, io); } + boost::shared_ptr getEventHandler() { + return eventHandler_; + } + + void setEventHandler(boost::shared_ptr eventHandler) { + eventHandler_ = eventHandler; + } + protected: TProcessor() {} + + boost::shared_ptr eventHandler_; }; }} // apache::thrift diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index d6063ac3..18bdc548 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -287,6 +287,39 @@ class TestHandler : public ThriftTestIf { } }; + +class TestProcessorEventHandler : public TProcessorEventHandler { + virtual void* getContext(const char* fn_name) { + return new std::string(fn_name); + } + virtual void freeContext(void* ctx, const char* fn_name) { + delete static_cast(ctx); + } + virtual void preRead(void* ctx, const char* fn_name) { + communicate("preRead", ctx, fn_name); + } + virtual void postRead(void* ctx, const char* fn_name) { + communicate("postRead", ctx, fn_name); + } + virtual void preWrite(void* ctx, const char* fn_name) { + communicate("preWrite", ctx, fn_name); + } + virtual void postWrite(void* ctx, const char* fn_name) { + communicate("postWrite", ctx, fn_name); + } + virtual void asyncComplete(void* ctx, const char* fn_name) { + communicate("asyncComplete", ctx, fn_name); + } + virtual void handlerError(void* ctx, const char* fn_name) { + communicate("handlerError", ctx, fn_name); + } + + void communicate(const char* event, void* ctx, const char* fn_name) { + std::cout << event << ": " << *static_cast(ctx) << " = " << fn_name << std::endl; + } +}; + + int main(int argc, char **argv) { int port = 9090; @@ -297,7 +330,7 @@ int main(int argc, char **argv) { ostringstream usage; usage << - argv[0] << " [--port=] [--server-type=] [--protocol-type=] [--workers=]" << endl << + argv[0] << " [--port=] [--server-type=] [--protocol-type=] [--workers=] [--processor-events]" << endl << "\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\". Default is " << serverType << endl << @@ -365,6 +398,12 @@ int main(int argc, char **argv) { shared_ptr testProcessor(new ThriftTestProcessor(testHandler)); + + if (!args["processor-events"].empty()) { + testProcessor->setEventHandler(shared_ptr( + new TestProcessorEventHandler())); + } + // Transport shared_ptr serverSocket(new TServerSocket(port));