Thrift now a TLP - INFRA-3116
git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
new file mode 100644
index 0000000..9b47aa5
--- /dev/null
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstring>
+#include <sys/socket.h>
+#include <sys/poll.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <errno.h>
+
+#include "TSocket.h"
+#include "TServerSocket.h"
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace transport {
+
+using namespace std;
+using boost::shared_ptr;
+
+TServerSocket::TServerSocket(int port) :
+ port_(port),
+ serverSocket_(-1),
+ acceptBacklog_(1024),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ retryLimit_(0),
+ retryDelay_(0),
+ tcpSendBuffer_(0),
+ tcpRecvBuffer_(0),
+ intSock1_(-1),
+ intSock2_(-1) {}
+
+TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
+ port_(port),
+ serverSocket_(-1),
+ acceptBacklog_(1024),
+ sendTimeout_(sendTimeout),
+ recvTimeout_(recvTimeout),
+ retryLimit_(0),
+ retryDelay_(0),
+ tcpSendBuffer_(0),
+ tcpRecvBuffer_(0),
+ intSock1_(-1),
+ intSock2_(-1) {}
+
+TServerSocket::~TServerSocket() {
+ close();
+}
+
+void TServerSocket::setSendTimeout(int sendTimeout) {
+ sendTimeout_ = sendTimeout;
+}
+
+void TServerSocket::setRecvTimeout(int recvTimeout) {
+ recvTimeout_ = recvTimeout;
+}
+
+void TServerSocket::setRetryLimit(int retryLimit) {
+ retryLimit_ = retryLimit;
+}
+
+void TServerSocket::setRetryDelay(int retryDelay) {
+ retryDelay_ = retryDelay;
+}
+
+void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
+ tcpSendBuffer_ = tcpSendBuffer;
+}
+
+void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
+ tcpRecvBuffer_ = tcpRecvBuffer;
+}
+
+void TServerSocket::listen() {
+ int sv[2];
+ if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
+ GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno);
+ intSock1_ = -1;
+ intSock2_ = -1;
+ } else {
+ intSock1_ = sv[1];
+ intSock2_ = sv[0];
+ }
+
+ struct addrinfo hints, *res, *res0;
+ int error;
+ char port[sizeof("65536") + 1];
+ std::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = PF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+ sprintf(port, "%d", port_);
+
+ // Wildcard address
+ error = getaddrinfo(NULL, port, &hints, &res0);
+ if (error) {
+ GlobalOutput.printf("getaddrinfo %d: %s", error, gai_strerror(error));
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
+ }
+
+ // Pick the ipv6 address first since ipv4 addresses can be mapped
+ // into ipv6 space.
+ for (res = res0; res; res = res->ai_next) {
+ if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+ break;
+ }
+
+ serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (serverSocket_ == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
+ }
+
+ // Set reusaddress to prevent 2MSL delay on accept
+ int one = 1;
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_REUSEADDR ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR", errno_copy);
+ }
+
+ // Set TCP buffer sizes
+ if (tcpSendBuffer_ > 0) {
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
+ &tcpSendBuffer_, sizeof(tcpSendBuffer_))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
+ }
+ }
+
+ if (tcpRecvBuffer_ > 0) {
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
+ &tcpRecvBuffer_, sizeof(tcpRecvBuffer_))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
+ }
+ }
+
+ // Defer accept
+ #ifdef TCP_DEFER_ACCEPT
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
+ }
+ #endif // #ifdef TCP_DEFER_ACCEPT
+
+ #ifdef IPV6_V6ONLY
+ int zero = 0;
+ if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
+ &zero, sizeof(zero))) {
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno);
+ }
+ #endif // #ifdef IPV6_V6ONLY
+
+ // Turn linger off, don't want to block on calls to close
+ struct linger ling = {0, 0};
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
+ &ling, sizeof(ling))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
+ }
+
+ // TCP Nodelay, speed over bandwidth
+ if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+ &one, sizeof(one))) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+ }
+
+ // Set NONBLOCK on the accept socket
+ int flags = fcntl(serverSocket_, F_GETFL, 0);
+ if (flags == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() fcntl() F_GETFL ", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ }
+
+ if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() fcntl() O_NONBLOCK ", errno_copy);
+ throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
+ }
+
+ // prepare the port information
+ // we may want to try to bind more than once, since SO_REUSEADDR doesn't
+ // always seem to work. The client can configure the retry variables.
+ int retries = 0;
+ do {
+ if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
+ break;
+ }
+
+ // use short circuit evaluation here to only sleep if we need to
+ } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+
+ // free addrinfo
+ freeaddrinfo(res0);
+
+ // throw an error if we failed to bind properly
+ if (retries > retryLimit_) {
+ char errbuf[1024];
+ sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+ GlobalOutput(errbuf);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
+ }
+
+ // Call listen
+ if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
+ close();
+ throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
+ }
+
+ // The socket is now listening!
+}
+
+shared_ptr<TTransport> TServerSocket::acceptImpl() {
+ if (serverSocket_ < 0) {
+ throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
+ }
+
+ struct pollfd fds[2];
+
+ int maxEintrs = 5;
+ int numEintrs = 0;
+
+ while (true) {
+ std::memset(fds, 0 , sizeof(fds));
+ fds[0].fd = serverSocket_;
+ fds[0].events = POLLIN;
+ if (intSock2_ >= 0) {
+ fds[1].fd = intSock2_;
+ fds[1].events = POLLIN;
+ }
+ int ret = poll(fds, 2, -1);
+
+ if (ret < 0) {
+ // error cases
+ if (errno == EINTR && (numEintrs++ < maxEintrs)) {
+ // EINTR needs to be handled manually and we can tolerate
+ // a certain number
+ continue;
+ }
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() poll() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
+ } else if (ret > 0) {
+ // Check for an interrupt signal
+ if (intSock2_ >= 0 && (fds[1].revents & POLLIN)) {
+ int8_t buf;
+ if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno);
+ }
+ throw TTransportException(TTransportException::INTERRUPTED);
+ }
+
+ // Check for the actual server socket being ready
+ if (fds[0].revents & POLLIN) {
+ break;
+ }
+ } else {
+ GlobalOutput("TServerSocket::acceptImpl() poll 0");
+ throw TTransportException(TTransportException::UNKNOWN);
+ }
+ }
+
+ struct sockaddr_storage clientAddress;
+ int size = sizeof(clientAddress);
+ int clientSocket = ::accept(serverSocket_,
+ (struct sockaddr *) &clientAddress,
+ (socklen_t *) &size);
+
+ if (clientSocket < 0) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
+ }
+
+ // Make sure client socket is blocking
+ int flags = fcntl(clientSocket, F_GETFL, 0);
+ if (flags == -1) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_GETFL ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy);
+ }
+
+ if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_SETFL ~O_NONBLOCK ", errno_copy);
+ throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy);
+ }
+
+ shared_ptr<TSocket> client(new TSocket(clientSocket));
+ if (sendTimeout_ > 0) {
+ client->setSendTimeout(sendTimeout_);
+ }
+ if (recvTimeout_ > 0) {
+ client->setRecvTimeout(recvTimeout_);
+ }
+
+ return client;
+}
+
+void TServerSocket::interrupt() {
+ if (intSock1_ >= 0) {
+ int8_t byte = 0;
+ if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
+ GlobalOutput.perror("TServerSocket::interrupt() send() ", errno);
+ }
+ }
+}
+
+void TServerSocket::close() {
+ if (serverSocket_ >= 0) {
+ shutdown(serverSocket_, SHUT_RDWR);
+ ::close(serverSocket_);
+ }
+ if (intSock1_ >= 0) {
+ ::close(intSock1_);
+ }
+ if (intSock2_ >= 0) {
+ ::close(intSock2_);
+ }
+ serverSocket_ = -1;
+ intSock1_ = -1;
+ intSock2_ = -1;
+}
+
+}}} // apache::thrift::transport