-- Change concept of protocol and transport factory

Summary:
- Transport factories now wrap around one transport
- Protocol factories now wrap around one transport (as opposed to a pair of input/output
   transports)
- TServer now takes input/output transport and protocol factories

The motivation for this change is that you could concievably want to use a different protocol or
transport for input and output. An example is that incoming data is encoded using binary protocol
but outgoing data is encrypted XML (with encryption being done on the transport level).

This change should be mostly backwards compatible because the TServer classes have constructors
that take a transport factory and use that for both the input and transport factories. The only
change might be for anyone who is using the C++ client code directly i.e. instantiating
TBinaryProtocol() directly because the constructor now only accepts one transport.

Reviewed By: Slee

Test Plan: Everything compiles (for both thrift and search).

Notes:
I am going to make the same changes in all the supported languages after this...


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664940 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 5755514..25cd7b5 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -28,17 +28,13 @@
   // Set flags, which also registers the event
   setFlags(eventFlags);
 
-  // TODO: this needs to be replaced by the new version of TTransportFactory
-  factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
-  //  factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
+  // get input/transports
+  factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+  factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
 
   // Create protocol
-  std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
-  iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
-                                                outputTransport_);
-  inputProtocol_ = iop.first;
-  outputProtocol_ = iop.second;
-
+  inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+  outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
 }
 
 void TConnection::workSocket() {
@@ -353,7 +349,7 @@
 
   // close any factory produced transports
   factoryInputTransport_->close();
-  //  factoryOutputTransport_->close();
+  factoryOutputTransport_->close();
 
   // Give this object back to the server that owns it
   server_->returnConnection(this);
@@ -366,7 +362,7 @@
 TConnection* TNonblockingServer::createConnection(int socket, short flags) {
   // Check the stack
   if (connectionStack_.empty()) {
-    return new TConnection(socket, flags, this, this->getTransportFactory());
+    return new TConnection(socket, flags, this);
   } else {
     TConnection* result = connectionStack_.top();
     connectionStack_.pop();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 4ea3fa3..08ecec6 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -60,19 +60,31 @@
   TNonblockingServer(shared_ptr<TProcessor> processor, 
                      shared_ptr<TProtocolFactory> protocolFactory,
                      int port) :
-    TServer(processor, protocolFactory),
+    TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {}
+    frameResponses_(true) {
+    setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(protocolFactory);
+    setOutputProtocolFactory(protocolFactory);
+  }
 
-  TNonblockingServer(shared_ptr<TProcessor> processor, 
-                     shared_ptr<TProtocolFactory>  protocolFactory,
-                     shared_ptr<TTransportFactory> transportFactory,
+  TNonblockingServer(shared_ptr<TProcessor> processor,
+                     shared_ptr<TTransportFactory> inputTransportFactory,
+                     shared_ptr<TTransportFactory> outputTransportFactory,
+                     shared_ptr<TProtocolFactory> inputProtocolFactory,
+                     shared_ptr<TProtocolFactory> outputProtocolFactory,
                      int port) :
-    TServer(processor, protocolFactory, transportFactory),
+    TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {}
+    frameResponses_(true) {
+    setInputTransportFactory(inputTransportFactory);
+    setOutputTransportFactory(outputTransportFactory);
+    setInputProtocolFactory(inputProtocolFactory);
+    setOutputProtocolFactory(outputProtocolFactory);
+  }
         
   ~TNonblockingServer() {}
 
@@ -175,13 +187,13 @@
 
   // extra transport generated by transport factory (e.g. BufferedRouterTransport)
   shared_ptr<TTransport> factoryInputTransport_;
-  //  shared_ptr<TTransport> factoryOutputTransport_;
-
-  // Protocol encoder
-  shared_ptr<TProtocol> outputProtocol_;
+  shared_ptr<TTransport> factoryOutputTransport_;
 
   // Protocol decoder
   shared_ptr<TProtocol> inputProtocol_;
+
+  // Protocol encoder
+  shared_ptr<TProtocol> outputProtocol_;
   
   // Go into read mode
   void setRead() {
@@ -205,8 +217,7 @@
  public:
 
   // Constructor
-  TConnection(int socket, short eventFlags, TNonblockingServer *s, 
-              shared_ptr<TTransportFactory> transportFactory) {
+  TConnection(int socket, short eventFlags, TNonblockingServer *s) {
     readBuffer_ = (uint8_t*)malloc(1024);
     if (readBuffer_ == NULL) {
       throw new facebook::thrift::TException("Out of memory.");
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index b9f4fca..ad9c291 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -38,73 +38,92 @@
     return serverTransport_;
   }
 
-  shared_ptr<TTransportFactory> getTransportFactory() {
-    return transportFactory_;
+  shared_ptr<TTransportFactory> getInputTransportFactory() {
+    return inputTransportFactory_;
+  }
+
+  shared_ptr<TTransportFactory> getOutputTransportFactory() {
+    return outputTransportFactory_;
   }
   
-  shared_ptr<TProtocolFactory> getProtocolFactory() {
-    return protocolFactory_;
+  shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+    return inputProtocolFactory_;
   }
 
+  shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+    return outputProtocolFactory_;
+  }
 
 protected:
+  TServer(shared_ptr<TProcessor> processor):
+    processor_(processor) {
+    setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+    setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+  }
+
+  TServer(shared_ptr<TProcessor> processor,
+          shared_ptr<TServerTransport> serverTransport):
+    processor_(processor),
+    serverTransport_(serverTransport) {
+    setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+    setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+  }
+
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
           shared_ptr<TTransportFactory> transportFactory,
-          shared_ptr<TProtocolFactory> protocolFactory) :
+          shared_ptr<TProtocolFactory> protocolFactory):
     processor_(processor),
     serverTransport_(serverTransport),
-    transportFactory_(transportFactory),
-    protocolFactory_(protocolFactory) {}
+    inputTransportFactory_(transportFactory),
+    outputTransportFactory_(transportFactory),
+    inputProtocolFactory_(protocolFactory),
+    outputProtocolFactory_(protocolFactory) {}
 
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
-          shared_ptr<TTransportFactory> transportFactory) :
+          shared_ptr<TTransportFactory> inputTransportFactory,
+          shared_ptr<TTransportFactory> outputTransportFactory,
+          shared_ptr<TProtocolFactory> inputProtocolFactory,
+          shared_ptr<TProtocolFactory> outputProtocolFactory):
     processor_(processor),
     serverTransport_(serverTransport),
-    transportFactory_(transportFactory) {
-  protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
+    inputTransportFactory_(inputTransportFactory),
+    outputTransportFactory_(outputTransportFactory),
+    inputProtocolFactory_(inputProtocolFactory),
+    outputProtocolFactory_(outputProtocolFactory) {}
 
-  TServer(shared_ptr<TProcessor> processor,
-          shared_ptr<TServerTransport> serverTransport) :
-    processor_(processor),
-    serverTransport_(serverTransport) {
-    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
-    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
-  }
-
-  TServer(shared_ptr<TProcessor> processor) :
-    processor_(processor) {
-    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
-    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
-  }
-
-  TServer(shared_ptr<TProcessor> processor, 
-          shared_ptr<TTransportFactory> transportFactory) :
-    processor_(processor),
-    transportFactory_(transportFactory) {
-    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
-  }
-
-  TServer(shared_ptr<TProcessor> processor, 
-          shared_ptr<TProtocolFactory> protocolFactory) :
-    processor_(processor) {
-    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
-    protocolFactory_ = protocolFactory;
-  }
-
-  TServer(shared_ptr<TProcessor>        processor,           
-          shared_ptr<TProtocolFactory>  protocolFactory,
-          shared_ptr<TTransportFactory> transportFactory):
-    processor_(processor),
-    transportFactory_(transportFactory),
-    protocolFactory_(protocolFactory) {}
  
+  // Class variables
   shared_ptr<TProcessor> processor_;
   shared_ptr<TServerTransport> serverTransport_;
-  shared_ptr<TTransportFactory> transportFactory_;
-  shared_ptr<TProtocolFactory> protocolFactory_;
+
+  shared_ptr<TTransportFactory> inputTransportFactory_;
+  shared_ptr<TTransportFactory> outputTransportFactory_;
+
+  shared_ptr<TProtocolFactory> inputProtocolFactory_;
+  shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+  void setInputTransportFactory(shared_ptr<TTransportFactory> inputTransportFactory) {
+    inputTransportFactory_ = inputTransportFactory;
+  }
+
+  void setOutputTransportFactory(shared_ptr<TTransportFactory> outputTransportFactory) {
+    outputTransportFactory_ = outputTransportFactory;
+  }
+
+  void setInputProtocolFactory(shared_ptr<TProtocolFactory> inputProtocolFactory) {
+    inputProtocolFactory_ = inputProtocolFactory;
+  }
+
+  void setOutputProtocolFactory(shared_ptr<TProtocolFactory> outputProtocolFactory) {
+    outputProtocolFactory_ = outputProtocolFactory;
+  }
+
 };
   
 }}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 8d62bf9..1e7e7fb 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -14,8 +14,10 @@
 void TSimpleServer::serve() {
 
   shared_ptr<TTransport> client;
-  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
-  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
 
   try {
     // Start the server listening
@@ -29,12 +31,14 @@
   try {
     while (true) {
       client = serverTransport_->accept();
-      iot = transportFactory_->getIOTransports(client);
-      iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
       try {
-        while (processor_->process(iop.first, iop.second)) {
+        while (processor_->process(inputProtocol, outputProtocol)) {
           // Peek ahead, is the remote side closed?
-          if (!iot.first->peek()) {
+          if (!inputTransport->peek()) {
             break;
           }
         }
@@ -43,8 +47,8 @@
       } catch (TException& tx) {
         cerr << "TSimpleServer exception: " << tx.what() << endl;
       }
-      iot.first->close();
-      iot.second->close();
+      inputTransport->close();
+      outputTransport->close();
       client->close();
     }
   } catch (TTransportException& ttx) {
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index 6470519..cf3ed10 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -21,6 +21,16 @@
                 shared_ptr<TTransportFactory> transportFactory,
                 shared_ptr<TProtocolFactory> protocolFactory) :
     TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+
+  TSimpleServer(shared_ptr<TProcessor> processor,
+                shared_ptr<TServerTransport> serverTransport,
+                shared_ptr<TTransportFactory> inputTransportFactory,
+                shared_ptr<TTransportFactory> outputTransportFactory,
+                shared_ptr<TProtocolFactory> inputProtocolFactory,
+                shared_ptr<TProtocolFactory> outputProtocolFactory):
+    TServer(processor, serverTransport, 
+            inputTransportFactory, outputTransportFactory,
+            inputProtocolFactory, outputProtocolFactory) {}
     
   ~TSimpleServer() {}
 
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 357152b..2f85c8b 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -29,7 +29,7 @@
   void run() {     
     try {
       while (processor_->process(input_, output_)) {
-        if (!input_->getInputTransport()->peek()) {
+        if (!input_->getTransport()->peek()) {
           break;
         }
       }
@@ -40,8 +40,8 @@
     } catch (...) {
       cerr << "TThreadPoolServer uncaught exception." << endl;
     }
-    input_->getInputTransport()->close();
-    output_->getOutputTransport()->close();
+    input_->getTransport()->close();
+    output_->getTransport()->close();
   }
 
  private:
@@ -55,19 +55,31 @@
                                      shared_ptr<TServerTransport> serverTransport,
                                      shared_ptr<TTransportFactory> transportFactory,
                                      shared_ptr<TProtocolFactory> protocolFactory,
-
                                      shared_ptr<ThreadManager> threadManager) :
   TServer(processor, serverTransport, transportFactory, protocolFactory), 
-  threadManager_(threadManager) {
-}
+  threadManager_(threadManager) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+                                     shared_ptr<TServerTransport> serverTransport,
+                                     shared_ptr<TTransportFactory> inputTransportFactory,
+                                     shared_ptr<TTransportFactory> outputTransportFactory,
+                                     shared_ptr<TProtocolFactory> inputProtocolFactory,
+                                     shared_ptr<TProtocolFactory> outputProtocolFactory, 
+                                     shared_ptr<ThreadManager> threadManager) :
+  TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+          inputProtocolFactory, outputProtocolFactory),
+  threadManager_(threadManager) {}
+
 
 TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::serve() {
 
   shared_ptr<TTransport> client;
-  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
-  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
 
   try {
     // Start the server listening
@@ -82,11 +94,13 @@
       // Fetch client from server
       client = serverTransport_->accept();
       // Make IO transports
-      iot = transportFactory_->getIOTransports(client);
-      iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
     } catch (TTransportException& ttx) {
       break;
     }
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index 5c5899e..aabd686 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -24,6 +24,14 @@
                     shared_ptr<TProtocolFactory> protocolFactory,
 		    shared_ptr<ThreadManager> threadManager);
 
+  TThreadPoolServer(shared_ptr<TProcessor> processor,
+		    shared_ptr<TServerTransport> serverTransport,
+                    shared_ptr<TTransportFactory> inputTransportFactory,
+                    shared_ptr<TTransportFactory> outputTransportFactory,
+                    shared_ptr<TProtocolFactory> inputProtocolFactory,
+                    shared_ptr<TProtocolFactory> outputProtocolFactory, 
+		    shared_ptr<ThreadManager> threadManager);
+
   virtual ~TThreadPoolServer();
 
   virtual void serve();