THRIFT-1461 Recent TNonblockingServer changes broke --enable-boostthreads=yes, Windows
Patch: Alexandre Parenteau

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1214547 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 0e44ab2..7d42a2e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -26,7 +26,7 @@
 #include "TNonblockingServer.h"
 #include <concurrency/Exception.h>
 #include <transport/TSocket.h>
-#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/PlatformThreadFactory.h>
 
 #include <iostream>
 
@@ -53,12 +53,19 @@
 
 #include <errno.h>
 #include <assert.h>
+
+#ifdef HAVE_SCHED_H
 #include <sched.h>
+#endif
 
 #ifndef AF_LOCAL
 #define AF_LOCAL AF_UNIX
 #endif
 
+#ifdef _MSC_VER
+#define PRIu32 "I32u"
+#endif
+
 namespace apache { namespace thrift { namespace server {
 
 using namespace apache::thrift::protocol;
@@ -1208,10 +1215,12 @@
 
   // Launch all the secondary IO threads in separate threads
   if (ioThreads_.size() > 1) {
-    ioThreadFactory_.reset(new PosixThreadFactory(
-      PosixThreadFactory::OTHER,  // scheduler
-      PosixThreadFactory::NORMAL, // priority
+    ioThreadFactory_.reset(new PlatformThreadFactory(
+#ifndef USE_BOOST_THREAD
+      PlatformThreadFactory::OTHER,  // scheduler
+      PlatformThreadFactory::NORMAL, // priority
       1,                          // stack size (MB)
+#endif
       false                       // detached
     ));
 
@@ -1262,7 +1271,7 @@
       GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
                           errno);
     }
-    listenSocket_ = TNonblockingServer::INVALID_SOCKET;
+    listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
   }
 
   for (int i = 0; i < 2; ++i) {
@@ -1271,26 +1280,30 @@
         GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
                             errno);
       }
-      notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
+      notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
     }
   }
 }
 
 void TNonblockingIOThread::createNotificationPipe() {
-  if (pipe(notificationPipeFDs_) != 0) {
-    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+  if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
     throw TException("can't create notification pipe");
   }
-  int flags;
-  if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
-      fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+  if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+     evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
     close(notificationPipeFDs_[0]);
     close(notificationPipeFDs_[1]);
     throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
   }
   for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+    int flags;
     if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
         fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+    if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
       close(notificationPipeFDs_[0]);
       close(notificationPipeFDs_[1]);
       throw TException("TNonblockingServer::createNotificationPipe() "
@@ -1349,7 +1362,7 @@
   }
 
   const int kSize = sizeof(conn);
-  if (write(fd, &conn, kSize) != kSize) {
+  if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
     return false;
   }
 
@@ -1357,7 +1370,7 @@
 }
 
 /* static */
-void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
   TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
   assert(ioThread);
   (void)which;
@@ -1365,7 +1378,7 @@
   while (true) {
     TNonblockingServer::TConnection* connection = 0;
     const int kSize = sizeof(connection);
-    int nBytes = read(fd, &connection, kSize);
+    int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
     if (nBytes == kSize) {
       if (connection == NULL) {
         // this is the command to stop our thread, exit the handler!
@@ -1416,12 +1429,13 @@
   // mechanism to stop the thread, but happily if we're running in the
   // same thread, this means the thread can't be blocking in the event
   // loop either.
-  if (!pthread_equal(pthread_self(), threadId_)) {
+  if (!Thread::is_current(threadId_)) {
     notify(NULL);
   }
 }
 
 void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
   // Start out with a standard, low-priority setup for the sched params.
   struct sched_param sp;
   bzero((void*) &sp, sizeof(sp));
@@ -1446,10 +1460,11 @@
   } else {
     GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
   }
+#endif
 }
 
 void TNonblockingIOThread::run() {
-  threadId_ = pthread_self();
+  threadId_ = Thread::get_current();
 
   assert(eventBase_ == 0);
   eventBase_ = event_base_new();