From 1ea90526b0acf89e3e06174fdfef524b671ce0bf Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Fri, 19 Jan 2007 02:02:12 +0000 Subject: [PATCH] -- Nonblocking server changes to allow logging Summary: -- the constructor needs to accept a transport factory -- TConnection close() needs to close factor generated transports Reviewed By: Mark Slee Test Plan: Tested with search redologger git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664930 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TNonblockingServer.cpp | 24 +++++++++++++--- lib/cpp/src/server/TNonblockingServer.h | 34 +++++++++++++++-------- lib/cpp/src/server/TServer.h | 30 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 15 deletions(-) diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 9fb5c232..5755514e 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -9,7 +9,7 @@ namespace facebook { namespace thrift { namespace server { -void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { + void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { socket_ = socket; server_ = s; appState_ = APP_INIT; @@ -27,6 +27,18 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { // Set flags, which also registers the event setFlags(eventFlags); + + // TODO: this needs to be replaced by the new version of TTransportFactory + factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first; + // factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first; + + // Create protocol + std::pair,shared_ptr > iop; + iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ , + outputTransport_); + inputProtocol_ = iop.first; + outputProtocol_ = iop.second; + } void TConnection::workSocket() { @@ -152,11 +164,11 @@ void TConnection::transition() { // Invoke the processor server_->getProcessor()->process(inputProtocol_, outputProtocol_); } catch (TTransportException &ttx) { - fprintf(stderr, "Server::process() %s\n", ttx.what()); + fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what()); close(); return; } catch (TException &x) { - fprintf(stderr, "Server::process() %s\n", x.what()); + fprintf(stderr, "TException: Server::process() %s\n", x.what()); close(); return; } catch (...) { @@ -339,6 +351,10 @@ void TConnection::close() { } socket_ = 0; + // close any factory produced transports + factoryInputTransport_->close(); + // factoryOutputTransport_->close(); + // Give this object back to the server that owns it server_->returnConnection(this); } @@ -350,7 +366,7 @@ void TConnection::close() { TConnection* TNonblockingServer::createConnection(int socket, short flags) { // Check the stack if (connectionStack_.empty()) { - return new TConnection(socket, flags, this); + return new TConnection(socket, flags, this, this->getTransportFactory()); } else { TConnection* result = connectionStack_.top(); connectionStack_.pop(); diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 11b58b1a..c8bfcb5c 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -50,12 +50,23 @@ class TNonblockingServer : public TServer { void handleEvent(int fd, short which); public: - TNonblockingServer(shared_ptr processor, int port) : - TServer(processor), + TNonblockingServer(shared_ptr processor, + shared_ptr protocolFactory, + int port) : + TServer(processor, protocolFactory), serverSocket_(0), port_(port), frameResponses_(true) {} - + + TNonblockingServer(shared_ptr processor, + shared_ptr protocolFactory, + shared_ptr transportFactory, + int port) : + TServer(processor, protocolFactory, transportFactory), + serverSocket_(0), + port_(port), + frameResponses_(true) {} + ~TNonblockingServer() {} void setFrameResponses(bool frameResponses) { @@ -155,6 +166,10 @@ class TConnection { // Transport that processor writes to shared_ptr outputTransport_; + // extra transport generated by transport factory (e.g. BufferedRouterTransport) + shared_ptr factoryInputTransport_; + // shared_ptr factoryOutputTransport_; + // Protocol encoder shared_ptr outputProtocol_; @@ -183,7 +198,8 @@ class TConnection { public: // Constructor - TConnection(int socket, short eventFlags, TNonblockingServer *s) { + TConnection(int socket, short eventFlags, TNonblockingServer *s, + shared_ptr transportFactory) { readBuffer_ = (uint8_t*)malloc(1024); if (readBuffer_ == NULL) { throw new facebook::thrift::TException("Out of memory."); @@ -191,15 +207,11 @@ class TConnection { readBufferSize_ = 1024; // Allocate input and output tranpsorts + // these only need to be allocated once per TConnection (they don't need to be + // reallocated on init() call) inputTransport_ = shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); outputTransport_ = shared_ptr(new TMemoryBuffer()); - - // Create protocol - std::pair,shared_ptr > iop; - iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_); - inputProtocol_ = iop.first; - outputProtocol_ = iop.second; - + init(socket, eventFlags, s); } diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 4b20432d..b9f4fca8 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -33,11 +33,20 @@ public: shared_ptr getProcessor() { return processor_; } + + shared_ptr getServerTransport() { + return serverTransport_; + } + + shared_ptr getTransportFactory() { + return transportFactory_; + } shared_ptr getProtocolFactory() { return protocolFactory_; } + protected: TServer(shared_ptr processor, shared_ptr serverTransport, @@ -70,6 +79,27 @@ protected: transportFactory_ = boost::shared_ptr(new TTransportFactory()); protocolFactory_ = boost::shared_ptr(new TBinaryProtocolFactory()); } + + TServer(shared_ptr processor, + shared_ptr transportFactory) : + processor_(processor), + transportFactory_(transportFactory) { + protocolFactory_ = boost::shared_ptr(new TBinaryProtocolFactory()); + } + + TServer(shared_ptr processor, + shared_ptr protocolFactory) : + processor_(processor) { + transportFactory_ = boost::shared_ptr(new TTransportFactory()); + protocolFactory_ = protocolFactory; + } + + TServer(shared_ptr processor, + shared_ptr protocolFactory, + shared_ptr transportFactory): + processor_(processor), + transportFactory_(transportFactory), + protocolFactory_(protocolFactory) {} shared_ptr processor_; shared_ptr serverTransport_; -- 2.17.1