-- Change concept of protocol and transport factory
authorAditya Agarwal <aditya@apache.org>
Wed, 24 Jan 2007 22:53:54 +0000 (22:53 +0000)
committerAditya Agarwal <aditya@apache.org>
Wed, 24 Jan 2007 22:53:54 +0000 (22:53 +0000)
Summary:
- Transport factories now wrap around one transport
- Protocol factories now wrap around one transport (as opposed to a pair of input/output
   transports)
- TServer now takes input/output transport and protocol factories

The motivation for this change is that you could concievably want to use a different protocol or
transport for input and output. An example is that incoming data is encoded using binary protocol
but outgoing data is encrypted XML (with encryption being done on the transport level).

This change should be mostly backwards compatible because the TServer classes have constructors
that take a transport factory and use that for both the input and transport factories. The only
change might be for anyone who is using the C++ client code directly i.e. instantiating
TBinaryProtocol() directly because the constructor now only accepts one transport.

Reviewed By: Slee

Test Plan: Everything compiles (for both thrift and search).

Notes:
I am going to make the same changes in all the supported languages after this...

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664940 13f79535-47bb-0310-9956-ffa450edef68

13 files changed:
lib/cpp/src/protocol/TBinaryProtocol.cpp
lib/cpp/src/protocol/TBinaryProtocol.h
lib/cpp/src/protocol/TProtocol.h
lib/cpp/src/server/TNonblockingServer.cpp
lib/cpp/src/server/TNonblockingServer.h
lib/cpp/src/server/TServer.h
lib/cpp/src/server/TSimpleServer.cpp
lib/cpp/src/server/TSimpleServer.h
lib/cpp/src/server/TThreadPoolServer.cpp
lib/cpp/src/server/TThreadPoolServer.h
lib/cpp/src/transport/TBufferedRouterTransport.h
lib/cpp/src/transport/TTransport.h
lib/cpp/src/transport/TTransportUtils.h

index 481012c..05de125 100644 (file)
@@ -79,30 +79,30 @@ uint32_t TBinaryProtocol::writeSetEnd() {
 
 uint32_t TBinaryProtocol::writeBool(const bool value) {
   uint8_t tmp =  value ? 1 : 0;
-  outputTransport_->write(&tmp, 1);
+  trans_->write(&tmp, 1);
   return 1;
 }
 
 uint32_t TBinaryProtocol::writeByte(const int8_t byte) {
-  outputTransport_->write((uint8_t*)&byte, 1);
+  trans_->write((uint8_t*)&byte, 1);
   return 1;
 }
 
 uint32_t TBinaryProtocol::writeI16(const int16_t i16) {
   int16_t net = (int16_t)htons(i16);
-  outputTransport_->write((uint8_t*)&net, 2);
+  trans_->write((uint8_t*)&net, 2);
   return 2;
 }
 
 uint32_t TBinaryProtocol::writeI32(const int32_t i32) {
   int32_t net = (int32_t)htonl(i32);
-  outputTransport_->write((uint8_t*)&net, 4);
+  trans_->write((uint8_t*)&net, 4);
   return 4;
 }
 
 uint32_t TBinaryProtocol::writeI64(const int64_t i64) {
   int64_t net = (int64_t)htonll(i64);
-  outputTransport_->write((uint8_t*)&net, 8);
+  trans_->write((uint8_t*)&net, 8);
   return 8;
 }
   
@@ -117,14 +117,14 @@ uint32_t TBinaryProtocol::writeDouble(const double dub) {
   b[5] = d[2];
   b[6] = d[1];
   b[7] = d[0];
-  outputTransport_->write((uint8_t*)b, 8);
+  trans_->write((uint8_t*)b, 8);
   return 8;
 }
 
   
 uint32_t TBinaryProtocol::writeString(const string& str) {
   uint32_t result = writeI32(str.size());
-  outputTransport_->write((uint8_t*)str.data(), str.size());
+  trans_->write((uint8_t*)str.data(), str.size());
   return result + str.size();
 }
 
@@ -233,21 +233,21 @@ uint32_t TBinaryProtocol::readSetEnd() {
 
 uint32_t TBinaryProtocol::readBool(bool& value) {
   uint8_t b[1];
-  inputTransport_->readAll(b, 1);
+  trans_->readAll(b, 1);
   value = *(int8_t*)b != 0;
   return 1;
 }
 
 uint32_t TBinaryProtocol::readByte(int8_t& byte) {
   uint8_t b[1];
-  inputTransport_->readAll(b, 1);
+  trans_->readAll(b, 1);
   byte = *(int8_t*)b;
   return 1;
 }
 
 uint32_t TBinaryProtocol::readI16(int16_t& i16) {
   uint8_t b[2];
-  inputTransport_->readAll(b, 2);
+  trans_->readAll(b, 2);
   i16 = *(int16_t*)b;
   i16 = (int16_t)ntohs(i16);
   return 2;
@@ -255,7 +255,7 @@ uint32_t TBinaryProtocol::readI16(int16_t& i16) {
 
 uint32_t TBinaryProtocol::readI32(int32_t& i32) {
   uint8_t b[4];
-  inputTransport_->readAll(b, 4);
+  trans_->readAll(b, 4);
   i32 = *(int32_t*)b;
   i32 = (int32_t)ntohl(i32);
   return 4;
@@ -263,7 +263,7 @@ uint32_t TBinaryProtocol::readI32(int32_t& i32) {
 
 uint32_t TBinaryProtocol::readI64(int64_t& i64) {
   uint8_t b[8];
-  inputTransport_->readAll(b, 8);
+  trans_->readAll(b, 8);
   i64 = *(int64_t*)b;
   i64 = (int64_t)ntohll(i64);
   return 8;
@@ -272,7 +272,7 @@ uint32_t TBinaryProtocol::readI64(int64_t& i64) {
 uint32_t TBinaryProtocol::readDouble(double& dub) {
   uint8_t b[8];
   uint8_t d[8];
-  inputTransport_->readAll(b, 8);
+  trans_->readAll(b, 8);
   d[0] = b[7];
   d[1] = b[6];
   d[2] = b[5];
@@ -294,7 +294,7 @@ uint32_t TBinaryProtocol::readString(string& str) {
 
   // Use the heap here to prevent stack overflow for v. large strings
   uint8_t *b = new uint8_t[size];
-  inputTransport_->readAll(b, size);
+  trans_->readAll(b, size);
   str = string((char*)b, size);
   delete [] b;
 
index de9a836..c0a1837 100644 (file)
@@ -17,8 +17,8 @@ using namespace boost;
  */
     class TBinaryProtocol : public TProtocol {
  public:
-  TBinaryProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
-    TProtocol(in, out) {}
+  TBinaryProtocol(shared_ptr<TTransport> trans) :
+    TProtocol(trans) {}
 
   ~TBinaryProtocol() {}
 
@@ -137,9 +137,8 @@ class TBinaryProtocolFactory : public TProtocolFactory {
 
   virtual ~TBinaryProtocolFactory() {}
 
-  std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) {
-    boost::shared_ptr<TProtocol> prot(new TBinaryProtocol(in, out));
-    return std::make_pair(prot, prot);
+  boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TProtocol>(new TBinaryProtocol(trans));
   }
 };
 
index 8077b27..39ee39e 100644 (file)
@@ -282,22 +282,24 @@ class TProtocol {
     }
   }
 
-  shared_ptr<TTransport> getInputTransport() {
-    return inputTransport_;
+  inline shared_ptr<TTransport> getTransport() {
+    return trans_;
   }
 
-  shared_ptr<TTransport> getOutputTransport() {
-    return outputTransport_;
+  // TODO: remove these two calls, they are for backwards
+  // compatibility
+  inline shared_ptr<TTransport> getInputTransport() {
+    return trans_;
+  }
+  inline shared_ptr<TTransport> getOutputTransport() {
+    return trans_;
   }
 
  protected:
-  TProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
-    inputTransport_(in),
-    outputTransport_(out) {}
+  TProtocol(shared_ptr<TTransport> trans):
+    trans_(trans) {}
     
-  shared_ptr<TTransport> inputTransport_;
-
-  shared_ptr<TTransport> outputTransport_;
+  shared_ptr<TTransport> trans_;
 
  private:
   TProtocol() {}
@@ -312,7 +314,7 @@ class TProtocolFactory {
 
   virtual ~TProtocolFactory() {}
 
-  virtual std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) = 0;
+  virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0;
 };
 
 }}} // facebook::thrift::protocol
index 5755514..25cd7b5 100644 (file)
@@ -28,17 +28,13 @@ namespace facebook { namespace thrift { namespace server {
   // Set flags, which also registers the event
   setFlags(eventFlags);
 
-  // TODO: this needs to be replaced by the new version of TTransportFactory
-  factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
-  //  factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
+  // get input/transports
+  factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+  factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
 
   // Create protocol
-  std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
-  iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
-                                                outputTransport_);
-  inputProtocol_ = iop.first;
-  outputProtocol_ = iop.second;
-
+  inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+  outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
 }
 
 void TConnection::workSocket() {
@@ -353,7 +349,7 @@ void TConnection::close() {
 
   // close any factory produced transports
   factoryInputTransport_->close();
-  //  factoryOutputTransport_->close();
+  factoryOutputTransport_->close();
 
   // Give this object back to the server that owns it
   server_->returnConnection(this);
@@ -366,7 +362,7 @@ void TConnection::close() {
 TConnection* TNonblockingServer::createConnection(int socket, short flags) {
   // Check the stack
   if (connectionStack_.empty()) {
-    return new TConnection(socket, flags, this, this->getTransportFactory());
+    return new TConnection(socket, flags, this);
   } else {
     TConnection* result = connectionStack_.top();
     connectionStack_.pop();
index 4ea3fa3..08ecec6 100644 (file)
@@ -60,19 +60,31 @@ class TNonblockingServer : public TServer {
   TNonblockingServer(shared_ptr<TProcessor> processor, 
                      shared_ptr<TProtocolFactory> protocolFactory,
                      int port) :
-    TServer(processor, protocolFactory),
+    TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {}
+    frameResponses_(true) {
+    setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(protocolFactory);
+    setOutputProtocolFactory(protocolFactory);
+  }
 
-  TNonblockingServer(shared_ptr<TProcessor> processor, 
-                     shared_ptr<TProtocolFactory>  protocolFactory,
-                     shared_ptr<TTransportFactory> transportFactory,
+  TNonblockingServer(shared_ptr<TProcessor> processor,
+                     shared_ptr<TTransportFactory> inputTransportFactory,
+                     shared_ptr<TTransportFactory> outputTransportFactory,
+                     shared_ptr<TProtocolFactory> inputProtocolFactory,
+                     shared_ptr<TProtocolFactory> outputProtocolFactory,
                      int port) :
-    TServer(processor, protocolFactory, transportFactory),
+    TServer(processor),
     serverSocket_(0),
     port_(port),
-    frameResponses_(true) {}
+    frameResponses_(true) {
+    setInputTransportFactory(inputTransportFactory);
+    setOutputTransportFactory(outputTransportFactory);
+    setInputProtocolFactory(inputProtocolFactory);
+    setOutputProtocolFactory(outputProtocolFactory);
+  }
         
   ~TNonblockingServer() {}
 
@@ -175,13 +187,13 @@ class TConnection {
 
   // extra transport generated by transport factory (e.g. BufferedRouterTransport)
   shared_ptr<TTransport> factoryInputTransport_;
-  //  shared_ptr<TTransport> factoryOutputTransport_;
-
-  // Protocol encoder
-  shared_ptr<TProtocol> outputProtocol_;
+  shared_ptr<TTransport> factoryOutputTransport_;
 
   // Protocol decoder
   shared_ptr<TProtocol> inputProtocol_;
+
+  // Protocol encoder
+  shared_ptr<TProtocol> outputProtocol_;
   
   // Go into read mode
   void setRead() {
@@ -205,8 +217,7 @@ class TConnection {
  public:
 
   // Constructor
-  TConnection(int socket, short eventFlags, TNonblockingServer *s, 
-              shared_ptr<TTransportFactory> transportFactory) {
+  TConnection(int socket, short eventFlags, TNonblockingServer *s) {
     readBuffer_ = (uint8_t*)malloc(1024);
     if (readBuffer_ == NULL) {
       throw new facebook::thrift::TException("Out of memory.");
index b9f4fca..ad9c291 100644 (file)
@@ -38,73 +38,92 @@ public:
     return serverTransport_;
   }
 
-  shared_ptr<TTransportFactory> getTransportFactory() {
-    return transportFactory_;
+  shared_ptr<TTransportFactory> getInputTransportFactory() {
+    return inputTransportFactory_;
+  }
+
+  shared_ptr<TTransportFactory> getOutputTransportFactory() {
+    return outputTransportFactory_;
   }
   
-  shared_ptr<TProtocolFactory> getProtocolFactory() {
-    return protocolFactory_;
+  shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+    return inputProtocolFactory_;
   }
 
+  shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+    return outputProtocolFactory_;
+  }
 
 protected:
+  TServer(shared_ptr<TProcessor> processor):
+    processor_(processor) {
+    setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+    setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+  }
+
+  TServer(shared_ptr<TProcessor> processor,
+          shared_ptr<TServerTransport> serverTransport):
+    processor_(processor),
+    serverTransport_(serverTransport) {
+    setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+    setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+    setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+  }
+
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
           shared_ptr<TTransportFactory> transportFactory,
-          shared_ptr<TProtocolFactory> protocolFactory) :
+          shared_ptr<TProtocolFactory> protocolFactory):
     processor_(processor),
     serverTransport_(serverTransport),
-    transportFactory_(transportFactory),
-    protocolFactory_(protocolFactory) {}
+    inputTransportFactory_(transportFactory),
+    outputTransportFactory_(transportFactory),
+    inputProtocolFactory_(protocolFactory),
+    outputProtocolFactory_(protocolFactory) {}
 
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
-          shared_ptr<TTransportFactory> transportFactory) :
+          shared_ptr<TTransportFactory> inputTransportFactory,
+          shared_ptr<TTransportFactory> outputTransportFactory,
+          shared_ptr<TProtocolFactory> inputProtocolFactory,
+          shared_ptr<TProtocolFactory> outputProtocolFactory):
     processor_(processor),
     serverTransport_(serverTransport),
-    transportFactory_(transportFactory) {
-  protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
+    inputTransportFactory_(inputTransportFactory),
+    outputTransportFactory_(outputTransportFactory),
+    inputProtocolFactory_(inputProtocolFactory),
+    outputProtocolFactory_(outputProtocolFactory) {}
 
-  TServer(shared_ptr<TProcessor> processor,
-          shared_ptr<TServerTransport> serverTransport) :
-    processor_(processor),
-    serverTransport_(serverTransport) {
-    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
-    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+  // Class variables
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TServerTransport> serverTransport_;
+
+  shared_ptr<TTransportFactory> inputTransportFactory_;
+  shared_ptr<TTransportFactory> outputTransportFactory_;
+
+  shared_ptr<TProtocolFactory> inputProtocolFactory_;
+  shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+  void setInputTransportFactory(shared_ptr<TTransportFactory> inputTransportFactory) {
+    inputTransportFactory_ = inputTransportFactory;
   }
 
-  TServer(shared_ptr<TProcessor> processor) :
-    processor_(processor) {
-    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
-    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+  void setOutputTransportFactory(shared_ptr<TTransportFactory> outputTransportFactory) {
+    outputTransportFactory_ = outputTransportFactory;
   }
 
-  TServer(shared_ptr<TProcessor> processor, 
-          shared_ptr<TTransportFactory> transportFactory) :
-    processor_(processor),
-    transportFactory_(transportFactory) {
-    protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+  void setInputProtocolFactory(shared_ptr<TProtocolFactory> inputProtocolFactory) {
+    inputProtocolFactory_ = inputProtocolFactory;
   }
 
-  TServer(shared_ptr<TProcessor> processor, 
-          shared_ptr<TProtocolFactory> protocolFactory) :
-    processor_(processor) {
-    transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
-    protocolFactory_ = protocolFactory;
+  void setOutputProtocolFactory(shared_ptr<TProtocolFactory> outputProtocolFactory) {
+    outputProtocolFactory_ = outputProtocolFactory;
   }
 
-  TServer(shared_ptr<TProcessor>        processor,           
-          shared_ptr<TProtocolFactory>  protocolFactory,
-          shared_ptr<TTransportFactory> transportFactory):
-    processor_(processor),
-    transportFactory_(transportFactory),
-    protocolFactory_(protocolFactory) {}
-  shared_ptr<TProcessor> processor_;
-  shared_ptr<TServerTransport> serverTransport_;
-  shared_ptr<TTransportFactory> transportFactory_;
-  shared_ptr<TProtocolFactory> protocolFactory_;
 };
   
 }}} // facebook::thrift::server
index 8d62bf9..1e7e7fb 100644 (file)
@@ -14,8 +14,10 @@ namespace facebook { namespace thrift { namespace server {
 void TSimpleServer::serve() {
 
   shared_ptr<TTransport> client;
-  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
-  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
 
   try {
     // Start the server listening
@@ -29,12 +31,14 @@ void TSimpleServer::serve() {
   try {
     while (true) {
       client = serverTransport_->accept();
-      iot = transportFactory_->getIOTransports(client);
-      iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
       try {
-        while (processor_->process(iop.first, iop.second)) {
+        while (processor_->process(inputProtocol, outputProtocol)) {
           // Peek ahead, is the remote side closed?
-          if (!iot.first->peek()) {
+          if (!inputTransport->peek()) {
             break;
           }
         }
@@ -43,8 +47,8 @@ void TSimpleServer::serve() {
       } catch (TException& tx) {
         cerr << "TSimpleServer exception: " << tx.what() << endl;
       }
-      iot.first->close();
-      iot.second->close();
+      inputTransport->close();
+      outputTransport->close();
       client->close();
     }
   } catch (TTransportException& ttx) {
index 6470519..cf3ed10 100644 (file)
@@ -21,6 +21,16 @@ class TSimpleServer : public TServer {
                 shared_ptr<TTransportFactory> transportFactory,
                 shared_ptr<TProtocolFactory> protocolFactory) :
     TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+
+  TSimpleServer(shared_ptr<TProcessor> processor,
+                shared_ptr<TServerTransport> serverTransport,
+                shared_ptr<TTransportFactory> inputTransportFactory,
+                shared_ptr<TTransportFactory> outputTransportFactory,
+                shared_ptr<TProtocolFactory> inputProtocolFactory,
+                shared_ptr<TProtocolFactory> outputProtocolFactory):
+    TServer(processor, serverTransport, 
+            inputTransportFactory, outputTransportFactory,
+            inputProtocolFactory, outputProtocolFactory) {}
     
   ~TSimpleServer() {}
 
index 357152b..2f85c8b 100644 (file)
@@ -29,7 +29,7 @@ public:
   void run() {     
     try {
       while (processor_->process(input_, output_)) {
-        if (!input_->getInputTransport()->peek()) {
+        if (!input_->getTransport()->peek()) {
           break;
         }
       }
@@ -40,8 +40,8 @@ public:
     } catch (...) {
       cerr << "TThreadPoolServer uncaught exception." << endl;
     }
-    input_->getInputTransport()->close();
-    output_->getOutputTransport()->close();
+    input_->getTransport()->close();
+    output_->getTransport()->close();
   }
 
  private:
@@ -55,19 +55,31 @@ TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
                                      shared_ptr<TServerTransport> serverTransport,
                                      shared_ptr<TTransportFactory> transportFactory,
                                      shared_ptr<TProtocolFactory> protocolFactory,
-
                                      shared_ptr<ThreadManager> threadManager) :
   TServer(processor, serverTransport, transportFactory, protocolFactory), 
-  threadManager_(threadManager) {
-}
+  threadManager_(threadManager) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+                                     shared_ptr<TServerTransport> serverTransport,
+                                     shared_ptr<TTransportFactory> inputTransportFactory,
+                                     shared_ptr<TTransportFactory> outputTransportFactory,
+                                     shared_ptr<TProtocolFactory> inputProtocolFactory,
+                                     shared_ptr<TProtocolFactory> outputProtocolFactory, 
+                                     shared_ptr<ThreadManager> threadManager) :
+  TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+          inputProtocolFactory, outputProtocolFactory),
+  threadManager_(threadManager) {}
+
 
 TThreadPoolServer::~TThreadPoolServer() {}
 
 void TThreadPoolServer::serve() {
 
   shared_ptr<TTransport> client;
-  pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
-  pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  shared_ptr<TProtocol> inputProtocol;
+  shared_ptr<TProtocol> outputProtocol;
 
   try {
     // Start the server listening
@@ -82,11 +94,13 @@ void TThreadPoolServer::serve() {
       // Fetch client from server
       client = serverTransport_->accept();
       // Make IO transports
-      iot = transportFactory_->getIOTransports(client);
-      iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+      inputTransport = inputTransportFactory_->getTransport(client);
+      outputTransport = outputTransportFactory_->getTransport(client);
+      inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+      outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
 
       // Add to threadmanager pool
-      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
+      threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
     } catch (TTransportException& ttx) {
       break;
     }
index 5c5899e..aabd686 100644 (file)
@@ -24,6 +24,14 @@ public:
                     shared_ptr<TProtocolFactory> protocolFactory,
                    shared_ptr<ThreadManager> threadManager);
 
+  TThreadPoolServer(shared_ptr<TProcessor> processor,
+                   shared_ptr<TServerTransport> serverTransport,
+                    shared_ptr<TTransportFactory> inputTransportFactory,
+                    shared_ptr<TTransportFactory> outputTransportFactory,
+                    shared_ptr<TProtocolFactory> inputProtocolFactory,
+                    shared_ptr<TProtocolFactory> outputProtocolFactory, 
+                   shared_ptr<ThreadManager> threadManager);
+
   virtual ~TThreadPoolServer();
 
   virtual void serve();
index b2e4d4f..add3107 100644 (file)
@@ -117,9 +117,8 @@ class TBufferedRouterTransportFactory : public TTransportFactory {
   /**
    * Wraps the transport into a buffered one.
    */
-  virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
-    boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
-    return std::make_pair(buffered, buffered);
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TTransport>(new TBufferedRouterTransport(trans, rTrans_));
   }
 
  private:
index 5e4ae6b..02dd89c 100644 (file)
@@ -155,8 +155,8 @@ class TTransportFactory {
   /**
    * Default implementation does nothing, just returns the transport given.
    */
-  virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
-    return std::make_pair(trans, trans);
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return trans;
   }
 
 };
index 427cc0e..79a137c 100644 (file)
@@ -122,9 +122,8 @@ class TBufferedTransportFactory : public TTransportFactory {
   /**
    * Wraps the transport into a buffered one.
    */
-  virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
-    boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
-    return std::make_pair(buffered, buffered);
+  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+    return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
   }
 
 };