From: Roger Meier Date: Wed, 14 Dec 2011 23:35:28 +0000 (+0000) Subject: THRIFT-1461 Recent TNonblockingServer changes broke --enable-boostthreads=yes, Windows X-Git-Tag: 0.9.1~487 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=12d705390d24359900256ceda15e00de4df4a6cd;p=common%2Fthrift.git THRIFT-1461 Recent TNonblockingServer changes broke --enable-boostthreads=yes, Windows Patch: Alexandre Parenteau git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1214547 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/compiler/cpp/compiler.vcxproj b/compiler/cpp/compiler.vcxproj index 5f43577b..a01f3f2d 100644 --- a/compiler/cpp/compiler.vcxproj +++ b/compiler/cpp/compiler.vcxproj @@ -5,10 +5,18 @@ Debug Win32 + + Debug + x64 + Release Win32 + + Release + x64 + @@ -83,31 +91,62 @@ true MultiByte + + Application + true + MultiByte + Application false true MultiByte + + Application + false + true + MultiByte + + + + + + + true $(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath) thrift + $(ExecutablePath);C:\Program Files (x86)\Git\bin + + + true + $(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath) + thrift + $(ExecutablePath);C:\Program Files (x86)\Git\bin false $(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath) thrift + $(ExecutablePath);C:\Program Files (x86)\Git\bin + + + false + $(ProjectDir)\src\;$(ProjectDir)\src\windows\;$(IncludePath) + thrift + $(ExecutablePath);C:\Program Files (x86)\Git\bin @@ -125,6 +164,25 @@ flex -o "src/thriftl.cc" src/thriftl.ll +bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy + + + + + + + Level3 + Disabled + WIN32;MINGW;YY_NO_UNISTD_H;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) + config.h + CompileAsCpp + + + Console + true + + + flex -o "src/thriftl.cc" src/thriftl.ll bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy @@ -148,6 +206,29 @@ bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy flex -o "src/thriftl.cc" src/thriftl.ll +bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy + + + + + Level3 + + + MaxSpeed + true + true + WIN32;MINGW;YY_NO_UNISTD_H;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + config.h + CompileAsCpp + + + Console + true + true + true + + + flex -o "src/thriftl.cc" src/thriftl.ll bison -y -o "src/thrifty.cc" --defines="src/thrifty.h" src/thrifty.yy diff --git a/compiler/cpp/src/thrifty.yy b/compiler/cpp/src/thrifty.yy index cc024a1a..ef53cc38 100644 --- a/compiler/cpp/src/thrifty.yy +++ b/compiler/cpp/src/thrifty.yy @@ -28,7 +28,11 @@ #define __STDC_LIMIT_MACROS #define __STDC_FORMAT_MACROS #include +#ifndef _MSC_VER #include +#else +#include +#endif #include #include "main.h" #include "globals.h" diff --git a/configure.ac b/configure.ac index aa8a16ea..8943cdb8 100644 --- a/configure.ac +++ b/configure.ac @@ -325,6 +325,7 @@ AC_CHECK_HEADERS([malloc.h]) AC_CHECK_HEADERS([openssl/ssl.h]) AC_CHECK_HEADERS([openssl/rand.h]) AC_CHECK_HEADERS([openssl/x509v3.h]) +AC_CHECK_HEADERS([sched.h]) AC_CHECK_LIB(pthread, pthread_create) dnl NOTE(dreiss): I haven't been able to find any really solid docs diff --git a/lib/cpp/src/concurrency/PlatformThreadFactory.h b/lib/cpp/src/concurrency/PlatformThreadFactory.h index 9f053a03..04fdc5bb 100644 --- a/lib/cpp/src/concurrency/PlatformThreadFactory.h +++ b/lib/cpp/src/concurrency/PlatformThreadFactory.h @@ -30,7 +30,6 @@ namespace apache { namespace thrift { namespace concurrency { #ifndef USE_BOOST_THREAD typedef PosixThreadFactory PlatformThreadFactory; -#include #else typedef BoostThreadFactory PlatformThreadFactory; #endif diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index 6924aa64..20193533 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -17,6 +17,9 @@ * under the License. */ +#ifdef HAVE_CONFIG_H +#include +#endif #include "PosixThreadFactory.h" #include "Exception.h" diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h old mode 100644 new mode 100755 index a9e15af4..654778cb --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -24,10 +24,18 @@ #include #include +#ifdef HAVE_CONFIG_H +#include +#endif + #ifdef USE_BOOST_THREAD #include #endif +#ifdef HAVE_PTHREAD_H +#include +#endif + namespace apache { namespace thrift { namespace concurrency { class Thread; @@ -74,8 +82,13 @@ class Thread { #ifdef USE_BOOST_THREAD typedef boost::thread::id id_t; + + static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); } + static inline id_t get_current() { return boost::this_thread::get_id(); } #else typedef uint64_t id_t; + static inline bool is_current(pthread_t t) { return pthread_equal(pthread_self(), t); } + static inline id_t get_current() { return pthread_self(); } #endif virtual ~Thread() {}; diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp index e56a9b5f..b756bf15 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/concurrency/ThreadManager.cpp @@ -17,6 +17,10 @@ * under the License. */ +#ifdef HAVE_CONFIG_H +#include +#endif + #include "ThreadManager.h" #include "Exception.h" #include "Monitor.h" diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 0e44ab2b..7d42a2ee 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -26,7 +26,7 @@ #include "TNonblockingServer.h" #include #include -#include +#include #include @@ -53,12 +53,19 @@ #include #include + +#ifdef HAVE_SCHED_H #include +#endif #ifndef AF_LOCAL #define AF_LOCAL AF_UNIX #endif +#ifdef _MSC_VER +#define PRIu32 "I32u" +#endif + namespace apache { namespace thrift { namespace server { using namespace apache::thrift::protocol; @@ -1208,10 +1215,12 @@ void TNonblockingServer::serve() { // Launch all the secondary IO threads in separate threads if (ioThreads_.size() > 1) { - ioThreadFactory_.reset(new PosixThreadFactory( - PosixThreadFactory::OTHER, // scheduler - PosixThreadFactory::NORMAL, // priority + ioThreadFactory_.reset(new PlatformThreadFactory( +#ifndef USE_BOOST_THREAD + PlatformThreadFactory::OTHER, // scheduler + PlatformThreadFactory::NORMAL, // priority 1, // stack size (MB) +#endif false // detached )); @@ -1262,7 +1271,7 @@ TNonblockingIOThread::~TNonblockingIOThread() { GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", errno); } - listenSocket_ = TNonblockingServer::INVALID_SOCKET; + listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE; } for (int i = 0; i < 2; ++i) { @@ -1271,26 +1280,30 @@ TNonblockingIOThread::~TNonblockingIOThread() { GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ", errno); } - notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET; + notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE; } } } void TNonblockingIOThread::createNotificationPipe() { - if (pipe(notificationPipeFDs_) != 0) { - GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno); + if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { + GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR()); throw TException("can't create notification pipe"); } - int flags; - if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 || - fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) { + if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 || + evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) { close(notificationPipeFDs_[0]); close(notificationPipeFDs_[1]); throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK"); } for (int i = 0; i < 2; ++i) { +#if LIBEVENT_VERSION_NUMBER < 0x02000000 + int flags; if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 || fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) { +#else + if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) { +#endif close(notificationPipeFDs_[0]); close(notificationPipeFDs_[1]); throw TException("TNonblockingServer::createNotificationPipe() " @@ -1349,7 +1362,7 @@ bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { } const int kSize = sizeof(conn); - if (write(fd, &conn, kSize) != kSize) { + if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) { return false; } @@ -1357,7 +1370,7 @@ bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { } /* static */ -void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) { +void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) { TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v; assert(ioThread); (void)which; @@ -1365,7 +1378,7 @@ void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) { while (true) { TNonblockingServer::TConnection* connection = 0; const int kSize = sizeof(connection); - int nBytes = read(fd, &connection, kSize); + int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0); if (nBytes == kSize) { if (connection == NULL) { // this is the command to stop our thread, exit the handler! @@ -1416,12 +1429,13 @@ void TNonblockingIOThread::breakLoop(bool error) { // mechanism to stop the thread, but happily if we're running in the // same thread, this means the thread can't be blocking in the event // loop either. - if (!pthread_equal(pthread_self(), threadId_)) { + if (!Thread::is_current(threadId_)) { notify(NULL); } } void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { +#ifdef HAVE_SCHED_H // Start out with a standard, low-priority setup for the sched params. struct sched_param sp; bzero((void*) &sp, sizeof(sp)); @@ -1446,10 +1460,11 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { } else { GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno); } +#endif } void TNonblockingIOThread::run() { - threadId_ = pthread_self(); + threadId_ = Thread::get_current(); assert(eventBase_ == 0); eventBase_ = event_base_new(); diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index e5d33119..5f5ea118 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include #include @@ -48,7 +48,7 @@ using apache::thrift::transport::TSocket; using apache::thrift::protocol::TProtocol; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::ThreadManager; -using apache::thrift::concurrency::PosixThreadFactory; +using apache::thrift::concurrency::PlatformThreadFactory; using apache::thrift::concurrency::ThreadFactory; using apache::thrift::concurrency::Thread; using apache::thrift::concurrency::Mutex; @@ -146,7 +146,7 @@ class TNonblockingServer : public TServer { static const int DEFAULT_IO_THREADS = 1; /// File descriptor of an invalid socket - static const int INVALID_SOCKET = -1; + static const int INVALID_SOCKET_VALUE = -1; /// # of IO threads this server will use size_t numIOThreads_; @@ -167,7 +167,7 @@ class TNonblockingServer : public TServer { bool threadPoolProcessing_; // Factory to create the IO threads - boost::shared_ptr ioThreadFactory_; + boost::shared_ptr ioThreadFactory_; // Vector of IOThread objects that will handle our IO std::vector > ioThreads_; @@ -804,13 +804,13 @@ class TNonblockingIOThread : public Runnable { // Returns the thread id associated with this object. This should // only be called after the thread has been started. - pthread_t getThreadId() const { return threadId_; } + Thread::id_t getThreadId() const { return threadId_; } // Returns the send-fd for task complete notifications. - int getNotificationSendFD() const { return notificationPipeFDs_[1]; } + evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; } // Returns the read-fd for task complete notifications. - int getNotificationRecvFD() const { return notificationPipeFDs_[0]; } + evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; } // Returns the actual thread object associated with this IO thread. boost::shared_ptr getThread() const { return thread_; } @@ -839,7 +839,7 @@ class TNonblockingIOThread : public Runnable { * * @param fd the descriptor the event occurred on. */ - static void notifyHandler(int fd, short which, void* v); + static void notifyHandler(evutil_socket_t fd, short which, void* v); /** * C-callable event handler for listener events. Provides a callback @@ -876,7 +876,7 @@ class TNonblockingIOThread : public Runnable { const int number_; /// The actual physical thread id. - pthread_t threadId_; + Thread::id_t threadId_; /// If listenSocket_ >= 0, adds an event on the event_base to accept conns int listenSocket_; @@ -894,7 +894,7 @@ class TNonblockingIOThread : public Runnable { struct event notificationEvent_; /// File descriptors for pipe used for task completion notification. - int notificationPipeFDs_[2]; + evutil_socket_t notificationPipeFDs_[2]; /// Actual IO Thread boost::shared_ptr thread_; diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp index c16e32f6..ec30d765 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/server/TThreadPoolServer.cpp @@ -17,6 +17,10 @@ * under the License. */ +#ifdef HAVE_CONFIG_H +#include +#endif + #include "server/TThreadPoolServer.h" #include "transport/TTransportException.h" #include "concurrency/Thread.h" diff --git a/lib/cpp/src/windows/config.h b/lib/cpp/src/windows/config.h index 3af158ad..95a5676e 100644 --- a/lib/cpp/src/windows/config.h +++ b/lib/cpp/src/windows/config.h @@ -30,6 +30,8 @@ #pragma warning(disable: 4996) // Depreciated posix name. #pragma warning(disable: 4250) // Inherits via dominance. +#pragma warning(disable: 4244) // conversion from '...' to '...', possible loss of data. +#pragma warning(disable: 4267) // conversion from '...' to '...', possible loss of data. #define VERSION "0.9.0-dev" #define HAVE_GETTIMEOFDAY 1 @@ -105,9 +107,9 @@ inline int poll_win32(LPWSAPOLLFD fdArray, ULONG fds, INT timeout) } #endif // WINVER -inline void close(SOCKET socket) +inline int close(SOCKET socket) { - ::closesocket(socket); + return ::closesocket(socket); } #endif // _THRIFT_WINDOWS_CONFIG_H_ diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index 1626c6e9..bf419354 100644 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -65,8 +65,12 @@ TESTS = \ UnitTests_SOURCES = \ UnitTestMain.cpp \ TMemoryBufferTest.cpp \ - TBufferBaseTest.cpp \ + TBufferBaseTest.cpp + +if !WITH_BOOSTTHREADS +UnitTests_SOURCES += \ RWMutexStarveTest.cpp +endif UnitTests_LDADD = \ libtestgencpp.la \