uint32_t TBinaryProtocol::writeBool(const bool value) {
uint8_t tmp = value ? 1 : 0;
- outputTransport_->write(&tmp, 1);
+ trans_->write(&tmp, 1);
return 1;
}
uint32_t TBinaryProtocol::writeByte(const int8_t byte) {
- outputTransport_->write((uint8_t*)&byte, 1);
+ trans_->write((uint8_t*)&byte, 1);
return 1;
}
uint32_t TBinaryProtocol::writeI16(const int16_t i16) {
int16_t net = (int16_t)htons(i16);
- outputTransport_->write((uint8_t*)&net, 2);
+ trans_->write((uint8_t*)&net, 2);
return 2;
}
uint32_t TBinaryProtocol::writeI32(const int32_t i32) {
int32_t net = (int32_t)htonl(i32);
- outputTransport_->write((uint8_t*)&net, 4);
+ trans_->write((uint8_t*)&net, 4);
return 4;
}
uint32_t TBinaryProtocol::writeI64(const int64_t i64) {
int64_t net = (int64_t)htonll(i64);
- outputTransport_->write((uint8_t*)&net, 8);
+ trans_->write((uint8_t*)&net, 8);
return 8;
}
b[5] = d[2];
b[6] = d[1];
b[7] = d[0];
- outputTransport_->write((uint8_t*)b, 8);
+ trans_->write((uint8_t*)b, 8);
return 8;
}
uint32_t TBinaryProtocol::writeString(const string& str) {
uint32_t result = writeI32(str.size());
- outputTransport_->write((uint8_t*)str.data(), str.size());
+ trans_->write((uint8_t*)str.data(), str.size());
return result + str.size();
}
uint32_t TBinaryProtocol::readBool(bool& value) {
uint8_t b[1];
- inputTransport_->readAll(b, 1);
+ trans_->readAll(b, 1);
value = *(int8_t*)b != 0;
return 1;
}
uint32_t TBinaryProtocol::readByte(int8_t& byte) {
uint8_t b[1];
- inputTransport_->readAll(b, 1);
+ trans_->readAll(b, 1);
byte = *(int8_t*)b;
return 1;
}
uint32_t TBinaryProtocol::readI16(int16_t& i16) {
uint8_t b[2];
- inputTransport_->readAll(b, 2);
+ trans_->readAll(b, 2);
i16 = *(int16_t*)b;
i16 = (int16_t)ntohs(i16);
return 2;
uint32_t TBinaryProtocol::readI32(int32_t& i32) {
uint8_t b[4];
- inputTransport_->readAll(b, 4);
+ trans_->readAll(b, 4);
i32 = *(int32_t*)b;
i32 = (int32_t)ntohl(i32);
return 4;
uint32_t TBinaryProtocol::readI64(int64_t& i64) {
uint8_t b[8];
- inputTransport_->readAll(b, 8);
+ trans_->readAll(b, 8);
i64 = *(int64_t*)b;
i64 = (int64_t)ntohll(i64);
return 8;
uint32_t TBinaryProtocol::readDouble(double& dub) {
uint8_t b[8];
uint8_t d[8];
- inputTransport_->readAll(b, 8);
+ trans_->readAll(b, 8);
d[0] = b[7];
d[1] = b[6];
d[2] = b[5];
// Use the heap here to prevent stack overflow for v. large strings
uint8_t *b = new uint8_t[size];
- inputTransport_->readAll(b, size);
+ trans_->readAll(b, size);
str = string((char*)b, size);
delete [] b;
*/
class TBinaryProtocol : public TProtocol {
public:
- TBinaryProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
- TProtocol(in, out) {}
+ TBinaryProtocol(shared_ptr<TTransport> trans) :
+ TProtocol(trans) {}
~TBinaryProtocol() {}
virtual ~TBinaryProtocolFactory() {}
- std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) {
- boost::shared_ptr<TProtocol> prot(new TBinaryProtocol(in, out));
- return std::make_pair(prot, prot);
+ boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TProtocol>(new TBinaryProtocol(trans));
}
};
}
}
- shared_ptr<TTransport> getInputTransport() {
- return inputTransport_;
+ inline shared_ptr<TTransport> getTransport() {
+ return trans_;
}
- shared_ptr<TTransport> getOutputTransport() {
- return outputTransport_;
+ // TODO: remove these two calls, they are for backwards
+ // compatibility
+ inline shared_ptr<TTransport> getInputTransport() {
+ return trans_;
+ }
+ inline shared_ptr<TTransport> getOutputTransport() {
+ return trans_;
}
protected:
- TProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
- inputTransport_(in),
- outputTransport_(out) {}
+ TProtocol(shared_ptr<TTransport> trans):
+ trans_(trans) {}
- shared_ptr<TTransport> inputTransport_;
-
- shared_ptr<TTransport> outputTransport_;
+ shared_ptr<TTransport> trans_;
private:
TProtocol() {}
virtual ~TProtocolFactory() {}
- virtual std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) = 0;
+ virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0;
};
}}} // facebook::thrift::protocol
// Set flags, which also registers the event
setFlags(eventFlags);
- // TODO: this needs to be replaced by the new version of TTransportFactory
- factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
- // factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
+ // get input/transports
+ factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
+ factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
// Create protocol
- std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
- iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
- outputTransport_);
- inputProtocol_ = iop.first;
- outputProtocol_ = iop.second;
-
+ inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
}
void TConnection::workSocket() {
// close any factory produced transports
factoryInputTransport_->close();
- // factoryOutputTransport_->close();
+ factoryOutputTransport_->close();
// Give this object back to the server that owns it
server_->returnConnection(this);
TConnection* TNonblockingServer::createConnection(int socket, short flags) {
// Check the stack
if (connectionStack_.empty()) {
- return new TConnection(socket, flags, this, this->getTransportFactory());
+ return new TConnection(socket, flags, this);
} else {
TConnection* result = connectionStack_.top();
connectionStack_.pop();
TNonblockingServer(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
int port) :
- TServer(processor, protocolFactory),
+ TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {}
+ frameResponses_(true) {
+ setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(protocolFactory);
+ setOutputProtocolFactory(protocolFactory);
+ }
- TNonblockingServer(shared_ptr<TProcessor> processor,
- shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TTransportFactory> transportFactory,
+ TNonblockingServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
int port) :
- TServer(processor, protocolFactory, transportFactory),
+ TServer(processor),
serverSocket_(0),
port_(port),
- frameResponses_(true) {}
+ frameResponses_(true) {
+ setInputTransportFactory(inputTransportFactory);
+ setOutputTransportFactory(outputTransportFactory);
+ setInputProtocolFactory(inputProtocolFactory);
+ setOutputProtocolFactory(outputProtocolFactory);
+ }
~TNonblockingServer() {}
// extra transport generated by transport factory (e.g. BufferedRouterTransport)
shared_ptr<TTransport> factoryInputTransport_;
- // shared_ptr<TTransport> factoryOutputTransport_;
-
- // Protocol encoder
- shared_ptr<TProtocol> outputProtocol_;
+ shared_ptr<TTransport> factoryOutputTransport_;
// Protocol decoder
shared_ptr<TProtocol> inputProtocol_;
+
+ // Protocol encoder
+ shared_ptr<TProtocol> outputProtocol_;
// Go into read mode
void setRead() {
public:
// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s,
- shared_ptr<TTransportFactory> transportFactory) {
+ TConnection(int socket, short eventFlags, TNonblockingServer *s) {
readBuffer_ = (uint8_t*)malloc(1024);
if (readBuffer_ == NULL) {
throw new facebook::thrift::TException("Out of memory.");
return serverTransport_;
}
- shared_ptr<TTransportFactory> getTransportFactory() {
- return transportFactory_;
+ shared_ptr<TTransportFactory> getInputTransportFactory() {
+ return inputTransportFactory_;
+ }
+
+ shared_ptr<TTransportFactory> getOutputTransportFactory() {
+ return outputTransportFactory_;
}
- shared_ptr<TProtocolFactory> getProtocolFactory() {
- return protocolFactory_;
+ shared_ptr<TProtocolFactory> getInputProtocolFactory() {
+ return inputProtocolFactory_;
}
+ shared_ptr<TProtocolFactory> getOutputProtocolFactory() {
+ return outputProtocolFactory_;
+ }
protected:
+ TServer(shared_ptr<TProcessor> processor):
+ processor_(processor) {
+ setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport):
+ processor_(processor),
+ serverTransport_(serverTransport) {
+ setInputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setOutputTransportFactory(shared_ptr<TTransportFactory>(new TTransportFactory()));
+ setInputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ setOutputProtocolFactory(shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()));
+ }
+
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<TProtocolFactory> protocolFactory) :
+ shared_ptr<TProtocolFactory> protocolFactory):
processor_(processor),
serverTransport_(serverTransport),
- transportFactory_(transportFactory),
- protocolFactory_(protocolFactory) {}
+ inputTransportFactory_(transportFactory),
+ outputTransportFactory_(transportFactory),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory) {}
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
- shared_ptr<TTransportFactory> transportFactory) :
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory):
processor_(processor),
serverTransport_(serverTransport),
- transportFactory_(transportFactory) {
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
- }
+ inputTransportFactory_(inputTransportFactory),
+ outputTransportFactory_(outputTransportFactory),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory) {}
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerTransport> serverTransport) :
- processor_(processor),
- serverTransport_(serverTransport) {
- transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+
+ // Class variables
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TServerTransport> serverTransport_;
+
+ shared_ptr<TTransportFactory> inputTransportFactory_;
+ shared_ptr<TTransportFactory> outputTransportFactory_;
+
+ shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ shared_ptr<TProtocolFactory> outputProtocolFactory_;
+
+ void setInputTransportFactory(shared_ptr<TTransportFactory> inputTransportFactory) {
+ inputTransportFactory_ = inputTransportFactory;
}
- TServer(shared_ptr<TProcessor> processor) :
- processor_(processor) {
- transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ void setOutputTransportFactory(shared_ptr<TTransportFactory> outputTransportFactory) {
+ outputTransportFactory_ = outputTransportFactory;
}
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TTransportFactory> transportFactory) :
- processor_(processor),
- transportFactory_(transportFactory) {
- protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ void setInputProtocolFactory(shared_ptr<TProtocolFactory> inputProtocolFactory) {
+ inputProtocolFactory_ = inputProtocolFactory;
}
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TProtocolFactory> protocolFactory) :
- processor_(processor) {
- transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
- protocolFactory_ = protocolFactory;
+ void setOutputProtocolFactory(shared_ptr<TProtocolFactory> outputProtocolFactory) {
+ outputProtocolFactory_ = outputProtocolFactory;
}
- TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TProtocolFactory> protocolFactory,
- shared_ptr<TTransportFactory> transportFactory):
- processor_(processor),
- transportFactory_(transportFactory),
- protocolFactory_(protocolFactory) {}
-
- shared_ptr<TProcessor> processor_;
- shared_ptr<TServerTransport> serverTransport_;
- shared_ptr<TTransportFactory> transportFactory_;
- shared_ptr<TProtocolFactory> protocolFactory_;
};
}}} // facebook::thrift::server
void TSimpleServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
- pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
try {
// Start the server listening
try {
while (true) {
client = serverTransport_->accept();
- iot = transportFactory_->getIOTransports(client);
- iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
try {
- while (processor_->process(iop.first, iop.second)) {
+ while (processor_->process(inputProtocol, outputProtocol)) {
// Peek ahead, is the remote side closed?
- if (!iot.first->peek()) {
+ if (!inputTransport->peek()) {
break;
}
}
} catch (TException& tx) {
cerr << "TSimpleServer exception: " << tx.what() << endl;
}
- iot.first->close();
- iot.second->close();
+ inputTransport->close();
+ outputTransport->close();
client->close();
}
} catch (TTransportException& ttx) {
shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TProtocolFactory> protocolFactory) :
TServer(processor, serverTransport, transportFactory, protocolFactory) {}
+
+ TSimpleServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory):
+ TServer(processor, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory) {}
~TSimpleServer() {}
void run() {
try {
while (processor_->process(input_, output_)) {
- if (!input_->getInputTransport()->peek()) {
+ if (!input_->getTransport()->peek()) {
break;
}
}
} catch (...) {
cerr << "TThreadPoolServer uncaught exception." << endl;
}
- input_->getInputTransport()->close();
- output_->getOutputTransport()->close();
+ input_->getTransport()->close();
+ output_->getTransport()->close();
}
private:
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
shared_ptr<TProtocolFactory> protocolFactory,
-
shared_ptr<ThreadManager> threadManager) :
TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadManager_(threadManager) {
-}
+ threadManager_(threadManager) {}
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory),
+ threadManager_(threadManager) {}
+
TThreadPoolServer::~TThreadPoolServer() {}
void TThreadPoolServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
- pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ shared_ptr<TTransport> inputTransport;
+ shared_ptr<TTransport> outputTransport;
+ shared_ptr<TProtocol> inputProtocol;
+ shared_ptr<TProtocol> outputProtocol;
try {
// Start the server listening
// Fetch client from server
client = serverTransport_->accept();
// Make IO transports
- iot = transportFactory_->getIOTransports(client);
- iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+ inputTransport = inputTransportFactory_->getTransport(client);
+ outputTransport = outputTransportFactory_->getTransport(client);
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
} catch (TTransportException& ttx) {
break;
}
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<ThreadManager> threadManager);
+ TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> inputTransportFactory,
+ shared_ptr<TTransportFactory> outputTransportFactory,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<ThreadManager> threadManager);
+
virtual ~TThreadPoolServer();
virtual void serve();
/**
* Wraps the transport into a buffered one.
*/
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
- return std::make_pair(buffered, buffered);
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TBufferedRouterTransport(trans, rTrans_));
}
private:
/**
* Default implementation does nothing, just returns the transport given.
*/
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- return std::make_pair(trans, trans);
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return trans;
}
};
/**
* Wraps the transport into a buffered one.
*/
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
- return std::make_pair(buffered, buffered);
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new TBufferedTransport(trans));
}
};