outputTransport->close();
client->close();
} catch (TTransportException& ttx) {
- inputTransport->close();
- outputTransport->close();
- client->close();
+ if (inputTransport.get() != NULL) { inputTransport->close(); }
+ if (outputTransport.get() != NULL) { outputTransport->close(); }
+ if (client.get() != NULL) { client->close(); }
cerr << "TServerTransport died on accept: " << ttx.what() << endl;
continue;
} catch (TException& tx) {
- inputTransport->close();
- outputTransport->close();
- client->close();
+ if (inputTransport.get() != NULL) { inputTransport->close(); }
+ if (outputTransport.get() != NULL) { outputTransport->close(); }
+ if (client.get() != NULL) { client->close(); }
cerr << "Some kind of accept exception: " << tx.what() << endl;
continue;
} catch (string s) {
- inputTransport->close();
- outputTransport->close();
- client->close();
+ if (inputTransport.get() != NULL) { inputTransport->close(); }
+ if (outputTransport.get() != NULL) { outputTransport->close(); }
+ if (client.get() != NULL) { client->close(); }
cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
break;
}
try {
// Fetch client from server
client = serverTransport_->accept();
+
// Make IO transports
inputTransport = inputTransportFactory_->getTransport(client);
outputTransport = outputTransportFactory_->getTransport(client);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_,
- inputProtocol,
- outputProtocol)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, inputProtocol, outputProtocol)));
+
} catch (TTransportException& ttx) {
- inputTransport->close();
- outputTransport->close();
- client->close();
- cerr << "TThreadPoolServer: TServerTransport died on accept: " << ttx.what() << endl;
+ if (inputTransport.get() != NULL) { inputTransport->close(); }
+ if (outputTransport.get() != NULL) { outputTransport->close(); }
+ if (client.get() != NULL) { client->close(); }
+ if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+ cerr << "TThreadPoolServer: TServerTransport died on accept: " << ttx.what() << endl;
+ }
continue;
} catch (TException& tx) {
- inputTransport->close();
- outputTransport->close();
- client->close();
+ if (inputTransport.get() != NULL) { inputTransport->close(); }
+ if (outputTransport.get() != NULL) { outputTransport->close(); }
+ if (client.get() != NULL) { client->close(); }
cerr << "TThreadPoolServer: Caught TException: " << tx.what() << endl;
continue;
} catch (string s) {
- inputTransport->close();
- outputTransport->close();
- client->close();
+ if (inputTransport.get() != NULL) { inputTransport->close(); }
+ if (outputTransport.get() != NULL) { outputTransport->close(); }
+ if (client.get() != NULL) { client->close(); }
cerr << "TThreadPoolServer: Unknown exception: " << s << endl;
break;
}
} catch (TException &tx) {
cerr << "TThreadPoolServer: Exception shutting down: " << tx.what() << endl;
}
+ stop_ = false;
}
- stop_ = false;
}
virtual void serve();
- virtual void stop() { stop_ = true; }
+ virtual void stop() {
+ stop_ = true;
+ serverTransport_->interrupt();
+ }
protected:
// http://developers.facebook.com/thrift/
#include <sys/socket.h>
+#include <sys/select.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+#include <fcntl.h>
#include <errno.h>
#include "TSocket.h"
serverSocket_(-1),
acceptBacklog_(1024),
sendTimeout_(0),
- recvTimeout_(0) {}
+ recvTimeout_(0),
+ interrupt_(false) {}
TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
port_(port),
serverSocket_(-1),
acceptBacklog_(1024),
sendTimeout_(sendTimeout),
- recvTimeout_(recvTimeout) {}
+ recvTimeout_(recvTimeout),
+ interrupt_(false) {}
TServerSocket::~TServerSocket() {
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY");
}
+ // Set NONBLOCK on the accept socket
+ int flags = fcntl(serverSocket_, F_GETFL, 0);
+ if (flags == -1) {
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed");
+ }
+ if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed");
+ }
+
// Bind to a port
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
}
+ // 200ms timeout on accept
+ struct timeval c = {0, 200000};
+ fd_set fds;
+
+ while (true) {
+ FD_ZERO(&fds);
+ FD_SET(serverSocket_, &fds);
+ int ret = select(serverSocket_+1, &fds, NULL, NULL, &c);
+
+ // Check for interrupt case
+ if (ret == 0 && interrupt_) {
+ interrupt_ = false;
+ throw TTransportException(TTransportException::INTERRUPTED);
+ }
+
+ // Reset interrupt flag no matter what
+ interrupt_ = false;
+
+ if (ret > 0) {
+ break;
+ } else if (ret == 0) {
+ if (errno != EINTR && errno != EAGAIN) {
+ perror("TServerSocket::select() errcode");
+ throw TTransportException(TTransportException::UNKNOWN);
+ }
+ } else {
+ perror("TServerSocket::select() negret");
+ throw TTransportException(TTransportException::UNKNOWN);
+ }
+ }
+
struct sockaddr_in clientAddress;
int size = sizeof(clientAddress);
int clientSocket = ::accept(serverSocket_,
perror("TServerSocket::accept()");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
+
+ // Make sure client socket is blocking
+ int flags = fcntl(clientSocket, F_GETFL, 0);
+ if (flags == -1) {
+ perror("TServerSocket::select() fcntl GETFL");
+ throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
+ }
+ if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
+ perror("TServerSocket::select() fcntl SETFL");
+ throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
+ }
shared_ptr<TSocket> client(new TSocket(clientSocket));
if (sendTimeout_ > 0) {
}
if (recvTimeout_ > 0) {
client->setRecvTimeout(recvTimeout_);
- }
+ }
+
return client;
}
void listen();
void close();
+ void interrupt() {
+ interrupt_ = true;
+ }
+
protected:
shared_ptr<TTransport> acceptImpl();
int acceptBacklog_;
int sendTimeout_;
int recvTimeout_;
+ volatile bool interrupt_;
};
}}} // facebook::thrift::transport
return result;
}
+ /**
+ * For "smart" TServerTransport implementations that work in a multi
+ * threaded context this can be used to break out of an accept() call.
+ * It is expected that the transport will throw a TTransportException
+ * with the interrupted error code.
+ */
+ virtual void interrupt() {}
+
/**
* Closes this transport such that future calls to accept will do nothing.
*/
// Set the socket to be non blocking for connect if a timeout exists
int flags = fcntl(socket_, F_GETFL, 0);
if (connTimeout_ > 0) {
- fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
+ if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) {
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed");
+ }
} else {
- fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
+ if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) {
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed");
+ }
}
// Conn timeout
ALREADY_OPEN = 2,
TIMED_OUT = 3,
END_OF_FILE = 4,
+ INTERRUPTED = 5
};
TTransportException() :