namespace facebook { namespace thrift { namespace server {
-void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
socket_ = socket;
server_ = s;
appState_ = APP_INIT;
// Set flags, which also registers the event
setFlags(eventFlags);
+
+ // TODO: this needs to be replaced by the new version of TTransportFactory
+ factoryInputTransport_ = (s->getTransportFactory()->getIOTransports(inputTransport_)).first;
+ // factoryOutputTransport_ = (transportFactory->getIOTransports(outputTransport_)).first;
+
+ // Create protocol
+ std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ iop = s->getProtocolFactory()->getIOProtocols(factoryInputTransport_ ,
+ outputTransport_);
+ inputProtocol_ = iop.first;
+ outputProtocol_ = iop.second;
+
}
void TConnection::workSocket() {
// Invoke the processor
server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &ttx) {
- fprintf(stderr, "Server::process() %s\n", ttx.what());
+ fprintf(stderr, "TTransportException: Server::process() %s\n", ttx.what());
close();
return;
} catch (TException &x) {
- fprintf(stderr, "Server::process() %s\n", x.what());
+ fprintf(stderr, "TException: Server::process() %s\n", x.what());
close();
return;
} catch (...) {
}
socket_ = 0;
+ // close any factory produced transports
+ factoryInputTransport_->close();
+ // factoryOutputTransport_->close();
+
// Give this object back to the server that owns it
server_->returnConnection(this);
}
TConnection* TNonblockingServer::createConnection(int socket, short flags) {
// Check the stack
if (connectionStack_.empty()) {
- return new TConnection(socket, flags, this);
+ return new TConnection(socket, flags, this, this->getTransportFactory());
} else {
TConnection* result = connectionStack_.top();
connectionStack_.pop();
void handleEvent(int fd, short which);
public:
- TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
- TServer(processor),
+ TNonblockingServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ int port) :
+ TServer(processor, protocolFactory),
serverSocket_(0),
port_(port),
frameResponses_(true) {}
-
+
+ TNonblockingServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TTransportFactory> transportFactory,
+ int port) :
+ TServer(processor, protocolFactory, transportFactory),
+ serverSocket_(0),
+ port_(port),
+ frameResponses_(true) {}
+
~TNonblockingServer() {}
void setFrameResponses(bool frameResponses) {
// Transport that processor writes to
shared_ptr<TMemoryBuffer> outputTransport_;
+ // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+ shared_ptr<TTransport> factoryInputTransport_;
+ // shared_ptr<TTransport> factoryOutputTransport_;
+
// Protocol encoder
shared_ptr<TProtocol> outputProtocol_;
public:
// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+ TConnection(int socket, short eventFlags, TNonblockingServer *s,
+ shared_ptr<TTransportFactory> transportFactory) {
readBuffer_ = (uint8_t*)malloc(1024);
if (readBuffer_ == NULL) {
throw new facebook::thrift::TException("Out of memory.");
readBufferSize_ = 1024;
// Allocate input and output tranpsorts
+ // these only need to be allocated once per TConnection (they don't need to be
+ // reallocated on init() call)
inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
-
- // Create protocol
- std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
- iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
- inputProtocol_ = iop.first;
- outputProtocol_ = iop.second;
-
+
init(socket, eventFlags, s);
}
shared_ptr<TProcessor> getProcessor() {
return processor_;
}
+
+ shared_ptr<TServerTransport> getServerTransport() {
+ return serverTransport_;
+ }
+
+ shared_ptr<TTransportFactory> getTransportFactory() {
+ return transportFactory_;
+ }
shared_ptr<TProtocolFactory> getProtocolFactory() {
return protocolFactory_;
}
+
protected:
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
}
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TTransportFactory> transportFactory) :
+ processor_(processor),
+ transportFactory_(transportFactory) {
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory) :
+ processor_(processor) {
+ transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+ protocolFactory_ = protocolFactory;
+ }
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<TTransportFactory> transportFactory):
+ processor_(processor),
+ transportFactory_(transportFactory),
+ protocolFactory_(protocolFactory) {}
shared_ptr<TProcessor> processor_;
shared_ptr<TServerTransport> serverTransport_;