From 6dd9cd0e3bb0bac0d4a70594956d035b75d4d7c8 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Thu, 1 Sep 2011 18:06:20 +0000 Subject: [PATCH] THRIFT-1314. cpp: add TProcessorFactory Patch: Adam Simpkins git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1164190 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/TProcessor.h | 39 +++++++++++++++++++++++ lib/cpp/src/server/TNonblockingServer.cpp | 12 +++++-- lib/cpp/src/server/TServer.h | 31 ++++++++++++++---- lib/cpp/src/server/TSimpleServer.cpp | 9 ++++-- lib/cpp/src/server/TThreadPoolServer.cpp | 7 +++- lib/cpp/src/server/TThreadedServer.cpp | 5 ++- 6 files changed, 89 insertions(+), 14 deletions(-) diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index a5937761..26c3ee46 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -189,6 +189,45 @@ class ReleaseHandler { boost::shared_ptr handlerFactory_; }; +struct TConnectionInfo { + // The input and output protocols + boost::shared_ptr input; + boost::shared_ptr output; + // The underlying transport used for the connection + // This is the transport that was returned by TServerTransport::accept(), + // and it may be different than the transport pointed to by the input and + // output protocols. + boost::shared_ptr transport; +}; + +class TProcessorFactory { + public: + virtual ~TProcessorFactory() {} + + /** + * Get the TProcessor to use for a particular connection. + * + * This method is always invoked in the same thread that the connection was + * accepted on. This generally means that this call does not need to be + * thread safe, as it will always be invoked from a single thread. + */ + virtual boost::shared_ptr getProcessor( + const TConnectionInfo& connInfo) = 0; +}; + +class TSingletonProcessorFactory : public TProcessorFactory { + public: + TSingletonProcessorFactory(boost::shared_ptr processor) : + processor_(processor) {} + + boost::shared_ptr getProcessor(const TConnectionInfo&) { + return processor_; + } + + private: + boost::shared_ptr processor_; +}; + }} // apache::thrift #endif // #ifndef _THRIFT_TPROCESSOR_H_ diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index b817260f..8d72a150 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -88,6 +88,9 @@ class TNonblockingServer::TConnection { /// Server handle TNonblockingServer* server_; + /// TProcessor + boost::shared_ptr processor_; + /// Object wrapping network socket boost::shared_ptr tSocket_; @@ -420,6 +423,9 @@ void TNonblockingServer::TConnection::init(int socket, short eventFlags, } else { connectionContext_ = NULL; } + + // Get the processor + processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_); } void TNonblockingServer::TConnection::workSocket() { @@ -572,7 +578,7 @@ void TNonblockingServer::TConnection::transition() { // Create task and dispatch to the thread manager boost::shared_ptr task = - boost::shared_ptr(new Task(server_->getProcessor(), + boost::shared_ptr(new Task(processor_, inputProtocol_, outputProtocol_, this)); @@ -595,8 +601,8 @@ void TNonblockingServer::TConnection::transition() { } else { try { // Invoke the processor - server_->getProcessor()->process(inputProtocol_, outputProtocol_, - connectionContext_); + processor_->process(inputProtocol_, outputProtocol_, + connectionContext_); } catch (const TTransportException &ttx) { GlobalOutput.printf("TNonblockingServer transport error in " "process(): %s", ttx.what()); diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 4dddfeae..6bd1398a 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -112,8 +112,8 @@ class TServer : public concurrency::Runnable { serve(); } - boost::shared_ptr getProcessor() { - return processor_; + boost::shared_ptr getProcessorFactory() { + return processorFactory_; } boost::shared_ptr getServerTransport() { @@ -142,7 +142,7 @@ class TServer : public concurrency::Runnable { protected: TServer(boost::shared_ptr processor): - processor_(processor) { + processorFactory_(new TSingletonProcessorFactory(processor)) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); setOutputTransportFactory(boost::shared_ptr(new TTransportFactory())); setInputProtocolFactory(boost::shared_ptr(new TBinaryProtocolFactory())); @@ -151,7 +151,7 @@ protected: TServer(boost::shared_ptr processor, boost::shared_ptr serverTransport): - processor_(processor), + processorFactory_(new TSingletonProcessorFactory(processor)), serverTransport_(serverTransport) { setInputTransportFactory(boost::shared_ptr(new TTransportFactory())); setOutputTransportFactory(boost::shared_ptr(new TTransportFactory())); @@ -163,7 +163,7 @@ protected: boost::shared_ptr serverTransport, boost::shared_ptr transportFactory, boost::shared_ptr protocolFactory): - processor_(processor), + processorFactory_(new TSingletonProcessorFactory(processor)), serverTransport_(serverTransport), inputTransportFactory_(transportFactory), outputTransportFactory_(transportFactory), @@ -176,16 +176,33 @@ protected: boost::shared_ptr outputTransportFactory, boost::shared_ptr inputProtocolFactory, boost::shared_ptr outputProtocolFactory): - processor_(processor), + processorFactory_(new TSingletonProcessorFactory(processor)), serverTransport_(serverTransport), inputTransportFactory_(inputTransportFactory), outputTransportFactory_(outputTransportFactory), inputProtocolFactory_(inputProtocolFactory), outputProtocolFactory_(outputProtocolFactory) {} + /** + * Get a TProcessor to handle calls on a particular connection. + * + * This method should only be called once per connection (never once per + * call). This allows the TProcessorFactory to return a different processor + * for each connection if it desires. + */ + boost::shared_ptr getProcessor( + boost::shared_ptr inputProtocol, + boost::shared_ptr outputProtocol, + boost::shared_ptr transport) { + TConnectionInfo connInfo; + connInfo.input = inputProtocol; + connInfo.output = outputProtocol; + connInfo.transport = transport; + return processorFactory_->getProcessor(connInfo); + } // Class variables - boost::shared_ptr processor_; + boost::shared_ptr processorFactory_; boost::shared_ptr serverTransport_; boost::shared_ptr inputTransportFactory_; diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp index 229261f2..b13b9767 100644 --- a/lib/cpp/src/server/TSimpleServer.cpp +++ b/lib/cpp/src/server/TSimpleServer.cpp @@ -87,6 +87,10 @@ void TSimpleServer::serve() { break; } + // Get the processor + shared_ptr processor = getProcessor(inputProtocol, + outputProtocol, client); + void* connectionContext = NULL; if (eventHandler_ != NULL) { connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol); @@ -96,8 +100,9 @@ void TSimpleServer::serve() { if (eventHandler_ != NULL) { eventHandler_->processContext(connectionContext, client); } - if (!processor_->process(inputProtocol, outputProtocol, connectionContext) || - // Peek ahead, is the remote side closed? + if (!processor->process(inputProtocol, outputProtocol, + connectionContext) || + // Peek ahead, is the remote side closed? !inputProtocol->getTransport()->peek()) { break; } diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index 22040764..20183f19 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -170,8 +170,13 @@ void TThreadPoolServer::serve() { inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + shared_ptr processor = getProcessor(inputProtocol, + outputProtocol, client); + // Add to threadmanager pool - threadManager_->add(shared_ptr(new TThreadPoolServer::Task(*this, processor_, inputProtocol, outputProtocol, client)), timeout_); + shared_ptr task(new TThreadPoolServer::Task( + *this, processor, inputProtocol, outputProtocol, client)); + threadManager_->add(task, timeout_); } catch (TTransportException& ttx) { if (inputTransport != NULL) { inputTransport->close(); } diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp index feeb0e94..43d1df21 100644 --- a/lib/cpp/src/server/TThreadedServer.cpp +++ b/lib/cpp/src/server/TThreadedServer.cpp @@ -180,8 +180,11 @@ void TThreadedServer::serve() { inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + shared_ptr processor = getProcessor(inputProtocol, + outputProtocol, client); + TThreadedServer::Task* task = new TThreadedServer::Task(*this, - processor_, + processor, inputProtocol, outputProtocol, client); -- 2.17.1