for (uint32_t id = 0; id < numIOThreads_; ++id) {
// the first IO thread also does the listening on server socket
- THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : -1);
+ THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
shared_ptr<TNonblockingIOThread> thread(
new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
THRIFT_GET_SOCKET_ERROR);
}
- listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
+ listenSocket_ = THRIFT_INVALID_SOCKET;
}
for (int i = 0; i < 2; ++i) {
GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
THRIFT_GET_SOCKET_ERROR);
}
- notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
+ notificationPipeFDs_[i] = THRIFT_INVALID_SOCKET;
}
}
}
/// # of IO threads to use by default
static const int DEFAULT_IO_THREADS = 1;
- /// File descriptor of an invalid socket
- static const THRIFT_SOCKET INVALID_SOCKET_VALUE = -1;
-
/// # of IO threads this server will use
size_t numIOThreads_;
void handleEvent(THRIFT_SOCKET fd, short which);
void init(int port) {
- serverSocket_ = -1;
+ serverSocket_ = THRIFT_INVALID_SOCKET;
numIOThreads_ = DEFAULT_IO_THREADS;
nextIOThread_ = 0;
useHighPriorityIOThreads_ = false;
# define THRIFT_EPIPE WSAECONNRESET
# define THRIFT_NO_SOCKET_CACHING SO_EXCLUSIVEADDRUSE
# define THRIFT_SOCKET SOCKET
+# define THRIFT_INVALID_SOCKET INVALID_SOCKET
# define THRIFT_SOCKETPAIR thrift_socketpair
# define THRIFT_FCNTL thrift_fcntl
# define THRIFT_O_NONBLOCK 1
# define THRIFT_EPIPE EPIPE
# define THRIFT_NO_SOCKET_CACHING SO_REUSEADDR
# define THRIFT_SOCKET int
+# define THRIFT_INVALID_SOCKET (-1)
# define THRIFT_SOCKETPAIR socketpair
# define THRIFT_FCNTL fcntl
# define THRIFT_O_NONBLOCK O_NONBLOCK
TServerSocket::TServerSocket(int port) :
port_(port),
- serverSocket_(-1),
+ serverSocket_(THRIFT_INVALID_SOCKET),
acceptBacklog_(DEFAULT_BACKLOG),
sendTimeout_(0),
recvTimeout_(0),
retryDelay_(0),
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
- intSock1_(-1),
- intSock2_(-1) {}
+ intSock1_(THRIFT_INVALID_SOCKET),
+ intSock2_(THRIFT_INVALID_SOCKET) {}
TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
port_(port),
- serverSocket_(-1),
+ serverSocket_(THRIFT_INVALID_SOCKET),
acceptBacklog_(DEFAULT_BACKLOG),
sendTimeout_(sendTimeout),
recvTimeout_(recvTimeout),
retryDelay_(0),
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
- intSock1_(-1),
- intSock2_(-1) {}
+ intSock1_(THRIFT_INVALID_SOCKET),
+ intSock2_(THRIFT_INVALID_SOCKET) {}
TServerSocket::TServerSocket(string path) :
port_(0),
path_(path),
- serverSocket_(-1),
+ serverSocket_(THRIFT_INVALID_SOCKET),
acceptBacklog_(DEFAULT_BACKLOG),
sendTimeout_(0),
recvTimeout_(0),
retryDelay_(0),
tcpSendBuffer_(0),
tcpRecvBuffer_(0),
- intSock1_(-1),
- intSock2_(-1) {}
+ intSock1_(THRIFT_INVALID_SOCKET),
+ intSock2_(THRIFT_INVALID_SOCKET) {}
TServerSocket::~TServerSocket() {
close();
THRIFT_SOCKET sv[2];
if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) {
GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR);
- intSock1_ = -1;
- intSock2_ = -1;
+ intSock1_ = THRIFT_INVALID_SOCKET;
+ intSock2_ = THRIFT_INVALID_SOCKET;
} else {
intSock1_ = sv[1];
intSock2_ = sv[0];
serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
}
- if (serverSocket_ == -1) {
+ if (serverSocket_ == THRIFT_INVALID_SOCKET) {
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
close();
}
shared_ptr<TTransport> TServerSocket::acceptImpl() {
- if (serverSocket_ == -1) {
+ if (serverSocket_ == THRIFT_INVALID_SOCKET) {
throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
}
std::memset(fds, 0 , sizeof(fds));
fds[0].fd = serverSocket_;
fds[0].events = THRIFT_POLLIN;
- if (intSock2_ != -1) {
+ if (intSock2_ != THRIFT_INVALID_SOCKET) {
fds[1].fd = intSock2_;
fds[1].events = THRIFT_POLLIN;
}
throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
} else if (ret > 0) {
// Check for an interrupt signal
- if (intSock2_ != -1 && (fds[1].revents & THRIFT_POLLIN)) {
+ if (intSock2_ != THRIFT_INVALID_SOCKET
+ && (fds[1].revents & THRIFT_POLLIN)) {
int8_t buf;
if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", THRIFT_GET_SOCKET_ERROR);
}
void TServerSocket::interrupt() {
- if (intSock1_ != -1) {
+ if (intSock1_ != THRIFT_INVALID_SOCKET) {
int8_t byte = 0;
if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) {
GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR);
}
void TServerSocket::close() {
- if (serverSocket_ != -1) {
+ if (serverSocket_ != THRIFT_INVALID_SOCKET) {
shutdown(serverSocket_, THRIFT_SHUT_RDWR);
::THRIFT_CLOSESOCKET(serverSocket_);
}
- if (intSock1_ != -1) {
+ if (intSock1_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(intSock1_);
}
- if (intSock2_ != -1) {
+ if (intSock2_ != THRIFT_INVALID_SOCKET) {
::THRIFT_CLOSESOCKET(intSock2_);
}
- serverSocket_ = -1;
- intSock1_ = -1;
- intSock2_ = -1;
+ serverSocket_ = THRIFT_INVALID_SOCKET;
+ intSock1_ = THRIFT_INVALID_SOCKET;
+ intSock2_ = THRIFT_INVALID_SOCKET;
}
}}} // apache::thrift::transport
host_(host),
port_(port),
path_(""),
- socket_(-1),
+ socket_(THRIFT_INVALID_SOCKET),
connTimeout_(0),
sendTimeout_(0),
recvTimeout_(0),
host_(""),
port_(0),
path_(path),
- socket_(-1),
+ socket_(THRIFT_INVALID_SOCKET),
connTimeout_(0),
sendTimeout_(0),
recvTimeout_(0),
host_(""),
port_(0),
path_(""),
- socket_(-1),
+ socket_(THRIFT_INVALID_SOCKET),
connTimeout_(0),
sendTimeout_(0),
recvTimeout_(0),
}
bool TSocket::isOpen() {
- return (socket_ != -1);
+ return (socket_ != THRIFT_INVALID_SOCKET);
}
bool TSocket::peek() {
socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
}
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
int errno_copy = THRIFT_GET_SOCKET_ERROR;
GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
}
void TSocket::close() {
- if (socket_ != -1) {
+ if (socket_ != THRIFT_INVALID_SOCKET) {
shutdown(socket_, THRIFT_SHUT_RDWR);
::THRIFT_CLOSESOCKET(socket_);
}
- socket_ = -1;
+ socket_ = THRIFT_INVALID_SOCKET;
}
void TSocket::setSocketFD(THRIFT_SOCKET socket) {
- if (socket_ != -1) {
+ if (socket_ != THRIFT_INVALID_SOCKET) {
close();
}
socket_ = socket;
}
uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
}
}
uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
}
void TSocket::setLinger(bool on, int linger) {
lingerOn_ = on;
lingerVal_ = linger;
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
return;
}
void TSocket::setNoDelay(bool noDelay) {
noDelay_ = noDelay;
- if (socket_ == -1 || !path_.empty()) {
+ if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty()) {
return;
}
}
recvTimeout_ = ms;
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
return;
}
}
sendTimeout_ = ms;
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
return;
}
struct sockaddr* addrPtr;
socklen_t addrLen;
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
return host_;
}
struct sockaddr* addrPtr;
socklen_t addrLen;
- if (socket_ == -1) {
+ if (socket_ == THRIFT_INVALID_SOCKET) {
return peerAddress_;
}
TSocketPoolServer::TSocketPoolServer()
: host_(""),
port_(0),
- socket_(-1),
+ socket_(THRIFT_INVALID_SOCKET),
lastFailTime_(0),
consecutiveFailures_(0) {}
TSocketPoolServer::TSocketPoolServer(const string &host, int port)
: host_(host),
port_(port),
- socket_(-1),
+ socket_(THRIFT_INVALID_SOCKET),
lastFailTime_(0),
consecutiveFailures_(0) {}
size_t numServers = servers_.size();
if (numServers == 0) {
- socket_ = -1;
+ socket_ = THRIFT_INVALID_SOCKET;
throw TTransportException(TTransportException::NOT_OPEN);
}
} catch (TException e) {
string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
GlobalOutput(errStr.c_str());
- socket_ = -1;
+ socket_ = THRIFT_INVALID_SOCKET;
continue;
}
void TSocketPool::close() {
TSocket::close();
if (currentServer_) {
- currentServer_->socket_ = -1;
+ currentServer_->socket_ = THRIFT_INVALID_SOCKET;
}
}