From: Roger Meier Date: Fri, 8 Jul 2011 12:23:31 +0000 (+0000) Subject: THRIFT-1217 Use evutil_socketpair instead of pipe X-Git-Tag: 0.7.0~39 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=30aae0ca877c9f5863ff881b29edc6a38df9d85a;p=common%2Fthrift.git THRIFT-1217 Use evutil_socketpair instead of pipe Patch: alexandre parenteau git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1144286 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 69ae2351..4774b361 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -22,10 +22,20 @@ #include #include + +#ifdef HAVE_SYS_SOCKET_H #include +#endif + +#ifdef HAVE_NETINET_IN_H #include #include +#endif + +#ifdef HAVE_NETDB_H #include +#endif + #include #include #include @@ -708,7 +718,7 @@ void TNonblockingServer::listenSocket() { #ifdef IPV6_V6ONLY if (res->ai_family == AF_INET6) { int zero = 0; - if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) { + if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) { GlobalOutput("TServerSocket::listen() IPV6_V6ONLY"); } } @@ -718,9 +728,9 @@ void TNonblockingServer::listenSocket() { int one = 1; // Set reuseaddr to avoid 2MSL delay on server restart - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one)); - if (bind(s, res->ai_addr, res->ai_addrlen) == -1) { + if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) { close(s); freeaddrinfo(res0); throw TException("TNonblockingServer::serve() bind"); @@ -750,20 +760,20 @@ void TNonblockingServer::listenSocket(int s) { struct linger ling = {0, 0}; // Keepalive to ensure full result flushing - setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)); + setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one)); // Turn linger off to avoid hung sockets - setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); + setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling)); // Set TCP nodelay if available, MAC OS X Hack // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html #ifndef TCP_NOPUSH - setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one)); #endif #ifdef TCP_LOW_MIN_RTO if (TSocket::getUseLowMinRto()) { - setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one)); + setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one)); } #endif @@ -777,13 +787,12 @@ void TNonblockingServer::listenSocket(int s) { } void TNonblockingServer::createNotificationPipe() { - if (pipe(notificationPipeFDs_) != 0) { - GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno); - throw TException("can't create notification pipe"); + if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { + GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR()); + throw TException("can't create notification pipe"); } - int flags; - if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 || - fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) { + if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 || + evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) { close(notificationPipeFDs_[0]); close(notificationPipeFDs_[1]); throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK"); diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 0252f10d..7b1cf4dd 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -44,6 +44,36 @@ using apache::thrift::concurrency::ThreadManager; // Forward declaration of class class TConnection; +#ifdef LIBEVENT_VERSION_NUMBER +#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24) +#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF) +#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF) +#else +// assume latest version 1 series +#define LIBEVENT_VERSION_MAJOR 1 +#define LIBEVENT_VERSION_MINOR 14 +#define LIBEVENT_VERSION_REL 13 +#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8)) +#endif + +#if LIBEVENT_VERSION_NUMBER < 0x02000000 + typedef int evutil_socket_t; +#endif + +#ifndef SOCKOPT_CAST_T +#define SOCKOPT_CAST_T void +#endif + +template +inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) { + return reinterpret_cast(v); +} + +template +inline SOCKOPT_CAST_T* cast_sockopt(T* v) { + return reinterpret_cast(v); +} + /** * This is a non-blocking server in C++ for high performance that operates a * single IO thread. It assumes that all incoming requests are framed with a @@ -176,7 +206,7 @@ class TNonblockingServer : public TServer { uint64_t nTotalConnectionsDropped_; /// File descriptors for pipe used for task completion notification. - int notificationPipeFDs_[2]; + evutil_socket_t notificationPipeFDs_[2]; /** * This is a stack of all the objects that have been created but that @@ -634,7 +664,7 @@ class TNonblockingServer : public TServer { * @param which the flags associated with the event. * @param v void* callback arg where we placed TNonblockingServer's "this". */ - static void eventHandler(int fd, short which, void* v) { + static void eventHandler(evutil_socket_t fd, short which, void* v) { ((TNonblockingServer*)v)->handleEvent(fd, which); } @@ -874,7 +904,7 @@ class TConnection { * @param which the flags associated with the event. * @param v void* callback arg where we placed TConnection's "this". */ - static void eventHandler(int fd, short /* which */, void* v) { + static void eventHandler(evutil_socket_t fd, short /* which */, void* v) { assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD()); ((TConnection*)v)->workSocket(); } @@ -887,17 +917,17 @@ class TConnection { * * @param fd the descriptor the event occured on. */ - static void taskHandler(int fd, short /* which */, void* /* v */) { + static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) { TConnection* connection; ssize_t nBytes; - while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*))) + while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0)) == sizeof(TConnection*)) { connection->transition(); } if (nBytes > 0) { throw TException("TConnection::taskHandler unexpected partial read"); } - if (errno != EWOULDBLOCK && errno != EAGAIN) { + if (errno && errno != EWOULDBLOCK && errno != EAGAIN) { GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno); } } @@ -911,8 +941,8 @@ class TConnection { */ bool notifyServer() { TConnection* connection = this; - if (write(server_->getNotificationSendFD(), (const void*)&connection, - sizeof(TConnection*)) != sizeof(TConnection*)) { + if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection), + sizeof(TConnection*), 0) != sizeof(TConnection*)) { return false; }