From 7cb7fc8a7342e714afbf517086d8a62622758148 Mon Sep 17 00:00:00 2001 From: Carl Yeksigian Date: Fri, 7 Jun 2013 07:33:01 -0400 Subject: [PATCH] THRIFT-1753: Multiple C++ Windows, OSX, and iOS portability issues Client: cpp Patch: Ben Craig --- lib/cpp/libthrift.vcxproj | 236 ++---------------- lib/cpp/libthriftnb.vcxproj | 70 +----- lib/cpp/src/thrift/TLogging.h | 12 +- lib/cpp/src/thrift/Thrift.cpp | 13 + lib/cpp/src/thrift/Thrift.h | 16 +- .../src/thrift/async/TEvhttpClientChannel.cpp | 2 +- lib/cpp/src/thrift/async/TEvhttpServer.cpp | 2 +- .../src/thrift/concurrency/BoostMonitor.cpp | 48 ++-- .../src/thrift/concurrency/FunctionRunner.h | 20 +- lib/cpp/src/thrift/concurrency/Monitor.cpp | 29 ++- lib/cpp/src/thrift/concurrency/Monitor.h | 14 +- lib/cpp/src/thrift/concurrency/Mutex.cpp | 12 +- .../concurrency/PlatformThreadFactory.h | 17 +- lib/cpp/src/thrift/concurrency/StdMonitor.cpp | 218 ++++++++++++++++ lib/cpp/src/thrift/concurrency/StdMutex.cpp | 56 +++++ .../thrift/concurrency/StdThreadFactory.cpp | 172 +++++++++++++ .../src/thrift/concurrency/StdThreadFactory.h | 72 ++++++ lib/cpp/src/thrift/concurrency/Thread.h | 22 +- .../src/thrift/concurrency/TimerManager.cpp | 15 +- lib/cpp/src/thrift/concurrency/TimerManager.h | 10 +- lib/cpp/src/thrift/concurrency/Util.cpp | 23 +- lib/cpp/src/thrift/concurrency/Util.h | 18 +- .../src/thrift/protocol/TDebugProtocol.cpp | 2 +- .../src/thrift/server/TNonblockingServer.cpp | 108 ++++---- .../src/thrift/server/TNonblockingServer.h | 18 +- lib/cpp/src/thrift/transport/PlatformSocket.h | 94 +++++++ lib/cpp/src/thrift/transport/TFDTransport.cpp | 15 +- .../src/thrift/transport/TFileTransport.cpp | 144 ++++------- lib/cpp/src/thrift/transport/TFileTransport.h | 27 +- lib/cpp/src/thrift/transport/TPipe.cpp | 2 +- lib/cpp/src/thrift/transport/TSSLSocket.cpp | 30 +-- .../src/thrift/transport/TServerSocket.cpp | 134 +++++----- lib/cpp/src/thrift/transport/TServerSocket.h | 12 +- lib/cpp/src/thrift/transport/TSocket.cpp | 131 +++++----- lib/cpp/src/thrift/transport/TSocket.h | 17 +- lib/cpp/src/thrift/transport/TSocketPool.h | 2 +- lib/cpp/src/thrift/windows/GetTimeOfDay.cpp | 24 +- lib/cpp/src/thrift/windows/GetTimeOfDay.h | 12 +- lib/cpp/src/thrift/windows/SocketPair.cpp | 12 +- lib/cpp/src/thrift/windows/SocketPair.h | 3 +- lib/cpp/src/thrift/windows/StdAfx.cpp | 20 -- lib/cpp/src/thrift/windows/StdAfx.h | 41 --- .../src/thrift/windows/TWinsockSingleton.cpp | 12 +- .../src/thrift/windows/TWinsockSingleton.h | 17 ++ lib/cpp/src/thrift/windows/TargetVersion.h | 31 --- lib/cpp/src/thrift/windows/WinFcntl.cpp | 62 ++++- lib/cpp/src/thrift/windows/WinFcntl.h | 16 +- lib/cpp/src/thrift/windows/config.h | 144 ++++------- lib/cpp/src/thrift/windows/force_inc.h | 77 ------ 49 files changed, 1281 insertions(+), 1023 deletions(-) create mode 100644 lib/cpp/src/thrift/concurrency/StdMonitor.cpp create mode 100644 lib/cpp/src/thrift/concurrency/StdMutex.cpp create mode 100644 lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp create mode 100644 lib/cpp/src/thrift/concurrency/StdThreadFactory.h create mode 100644 lib/cpp/src/thrift/transport/PlatformSocket.h diff --git a/lib/cpp/libthrift.vcxproj b/lib/cpp/libthrift.vcxproj index d83a9ce2..e9990a5a 100644 --- a/lib/cpp/libthrift.vcxproj +++ b/lib/cpp/libthrift.vcxproj @@ -35,140 +35,23 @@ - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - %(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + + + + - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + + + + + + + + true @@ -178,39 +61,12 @@ - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + true true @@ -221,38 +77,10 @@ true true - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - + + - - NotUsing - NotUsing - NotUsing - NotUsing - NotUsing - NotUsing - NotUsing - NotUsing - @@ -291,18 +119,14 @@ - - - - {DD26F57E-60F2-4F37-A616-D219A9BF338F} @@ -412,8 +236,7 @@ NotUsing Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthrift.pdb MultiThreadedDebugDLL @@ -427,8 +250,7 @@ NotUsing Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthrift.pdb MultiThreadedDebug @@ -442,8 +264,7 @@ NotUsing Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) Windows @@ -455,8 +276,7 @@ NotUsing Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) MultiThreadedDebug @@ -471,8 +291,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthrift.pdb @@ -489,8 +308,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthrift.pdb MultiThreaded @@ -508,8 +326,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) Windows @@ -525,8 +342,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) MultiThreaded diff --git a/lib/cpp/libthriftnb.vcxproj b/lib/cpp/libthriftnb.vcxproj index 4559181d..b186e83f 100755 --- a/lib/cpp/libthriftnb.vcxproj +++ b/lib/cpp/libthriftnb.vcxproj @@ -35,47 +35,10 @@ - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - _CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions) - - + + + + @@ -84,7 +47,6 @@ - @@ -195,8 +157,7 @@ Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthriftnb.pdb @@ -210,8 +171,7 @@ Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthriftnb.pdb MultiThreadedDebug @@ -226,8 +186,7 @@ Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) Windows @@ -240,8 +199,7 @@ Level3 Disabled - WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) MultiThreadedDebug @@ -257,8 +215,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthriftnb.pdb @@ -276,8 +233,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) $(IntDir)libthriftnb.pdb MultiThreaded @@ -296,8 +252,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) Windows @@ -314,8 +269,7 @@ MaxSpeed true true - WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) - thrift\windows\force_inc.h;%(ForcedIncludeFiles) + HAVE_CONFIG_H=1;WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) MultiThreaded diff --git a/lib/cpp/src/thrift/TLogging.h b/lib/cpp/src/thrift/TLogging.h index 934e8fc1..ff571803 100644 --- a/lib/cpp/src/thrift/TLogging.h +++ b/lib/cpp/src/thrift/TLogging.h @@ -29,11 +29,7 @@ * */ -#ifndef HAVE_CLOCK_GETTIME #include -#else -#include -#endif #ifdef HAVE_STDINT_H #include @@ -82,7 +78,7 @@ time_t now; \ char dbgtime[26] ; \ time(&now); \ - ctime_r(&now, dbgtime); \ + THRIFT_CTIME_R(&now, dbgtime); \ dbgtime[24] = '\0'; \ fprintf(stderr,"[%s,%d] [%s] " format_string " \n", __FILE__, __LINE__,dbgtime,##__VA_ARGS__); \ } \ @@ -115,7 +111,7 @@ time_t now; \ char dbgtime[26] ; \ time(&now); \ - ctime_r(&now, dbgtime); \ + THRIFT_CTIME_R(&now, dbgtime); \ dbgtime[24] = '\0'; \ fprintf(stderr,"[%s,%d] [%s] ERROR: " format_string " \n", __FILE__, __LINE__,dbgtime,##__VA_ARGS__); \ } @@ -132,7 +128,7 @@ time_t now; \ char dbgtime[26] ; \ time(&now); \ - ctime_r(&now, dbgtime); \ + THRIFT_CTIME_R(&now, dbgtime); \ dbgtime[24] = '\0'; \ fprintf(stderr,"[%s,%d] [%s] ERROR: Going to abort " format_string " \n", __FILE__, __LINE__,dbgtime,##__VA_ARGS__); \ exit(1); \ @@ -151,7 +147,7 @@ time_t now; \ char dbgtime[26] ; \ time(&now); \ - ctime_r(&now, dbgtime); \ + THRIFT_CTIME_R(&now, dbgtime); \ dbgtime[24] = '\0'; \ fprintf(stderr,"[%s] " format_string " \n", dbgtime,##__VA_ARGS__); \ } \ diff --git a/lib/cpp/src/thrift/Thrift.cpp b/lib/cpp/src/thrift/Thrift.cpp index 6c7f8aee..b1e13862 100644 --- a/lib/cpp/src/thrift/Thrift.cpp +++ b/lib/cpp/src/thrift/Thrift.cpp @@ -29,6 +29,7 @@ namespace apache { namespace thrift { TOutput GlobalOutput; void TOutput::printf(const char *message, ...) { +#ifndef THRIFT_SQUELCH_CONSOLE_OUTPUT // Try to reduce heap usage, even if printf is called rarely. static const int STACK_BUF_SIZE = 256; char stack_buf[STACK_BUF_SIZE]; @@ -77,6 +78,18 @@ void TOutput::printf(const char *message, ...) { f_(heap_buf); } free(heap_buf); +#endif +} + +void TOutput::errorTimeWrapper(const char* msg) { +#ifndef THRIFT_SQUELCH_CONSOLE_OUTPUT + time_t now; + char dbgtime[26]; + time(&now); + THRIFT_CTIME_R(&now, dbgtime); + dbgtime[24] = 0; + fprintf(stderr, "Thrift: %s %s\n", dbgtime, msg); +#endif } void TOutput::perror(const char *message, int errno_copy) { diff --git a/lib/cpp/src/thrift/Thrift.h b/lib/cpp/src/thrift/Thrift.h index 5cdc37e7..8c228141 100644 --- a/lib/cpp/src/thrift/Thrift.h +++ b/lib/cpp/src/thrift/Thrift.h @@ -20,13 +20,10 @@ #ifndef _THRIFT_THRIFT_H_ #define _THRIFT_THRIFT_H_ 1 -#ifdef _WIN32 -#include -#endif - #ifdef HAVE_CONFIG_H #include "config.h" #endif +#include "transport/PlatformSocket.h" #include #include @@ -72,6 +69,8 @@ #define THRIFT_OVERLOAD_IF(T, Y) \ THRIFT_OVERLOAD_IF_DEFN(T, Y) = NULL +#define THRIFT_UNUSED_VARIABLE(x) ((x)=(x)) + namespace apache { namespace thrift { class TEnumIterator : public std::iterator > { @@ -125,14 +124,7 @@ class TOutput { void printf(const char *message, ...); - inline static void errorTimeWrapper(const char* msg) { - time_t now; - char dbgtime[26]; - time(&now); - ctime_r(&now, dbgtime); - dbgtime[24] = 0; - fprintf(stderr, "Thrift: %s %s\n", dbgtime, msg); - } + static void errorTimeWrapper(const char* msg); /** Just like strerror_r but returns a C++ string object. */ static std::string strerror_s(int errno_copy); diff --git a/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp b/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp index 58715032..0e747831 100644 --- a/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp +++ b/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp @@ -142,7 +142,7 @@ void TEvhttpClientChannel::finish(struct evhttp_request* req) { } recvBuf_->resetBuffer( EVBUFFER_DATA(req->input_buffer), - EVBUFFER_LENGTH(req->input_buffer)); + static_cast(EVBUFFER_LENGTH(req->input_buffer))); cob_(); return; } diff --git a/lib/cpp/src/thrift/async/TEvhttpServer.cpp b/lib/cpp/src/thrift/async/TEvhttpServer.cpp index 85ed838f..83fb5fab 100644 --- a/lib/cpp/src/thrift/async/TEvhttpServer.cpp +++ b/lib/cpp/src/thrift/async/TEvhttpServer.cpp @@ -99,7 +99,7 @@ int TEvhttpServer::serve() { TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) : req(req) - , ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer))) + , ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), static_cast(EVBUFFER_LENGTH(req->input_buffer)))) , obuf(new TMemoryBuffer()) {} diff --git a/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp b/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp index dd318fea..9487a2f9 100644 --- a/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp +++ b/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp @@ -23,9 +23,8 @@ #include "Monitor.h" #include "Exception.h" #include "Util.h" - +#include #include -#include #include #include @@ -71,7 +70,7 @@ class Monitor::Impl : public boost::condition_variable_any { */ void wait(int64_t timeout_ms) { int result = waitForTimeRelative(timeout_ms); - if (result == ETIMEDOUT) { + if (result == THRIFT_ETIMEDOUT) { throw TimedOutException(); } else if (result != 0) { throw TException( @@ -83,7 +82,7 @@ class Monitor::Impl : public boost::condition_variable_any { * Waits until the specified timeout in milliseconds for the condition to * occur, or waits forever if timeout_ms == 0. * - * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ int waitForTimeRelative(int64_t timeout_ms) { if (timeout_ms == 0LL) { @@ -96,36 +95,47 @@ class Monitor::Impl : public boost::condition_variable_any { assert(mutexImpl); boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); - int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT; + int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : THRIFT_ETIMEDOUT; lock.release(); return res; } /** - * Waits until the absolute time specified using struct timespec. - * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const THRIFT_TIMESPEC* abstime) { + struct timeval temp; + temp.tv_sec = static_cast(abstime->tv_sec); + temp.tv_usec = static_cast(abstime->tv_nsec) / 1000; + return waitForTime(&temp); + } + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const timespec* abstime) { + int waitForTime(const struct timeval* abstime) { assert(mutex_); boost::timed_mutex* mutexImpl = - reinterpret_cast(mutex_->getUnderlyingImpl()); + static_cast(mutex_->getUnderlyingImpl()); assert(mutexImpl); - struct timespec currenttime; - Util::toTimespec(currenttime, Util::currentTime()); + struct timeval currenttime; + Util::toTimeval(currenttime, Util::currentTime()); long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); - long tv_nsec = static_cast(abstime->tv_nsec - currenttime.tv_nsec); + long tv_usec = static_cast(abstime->tv_usec - currenttime.tv_usec); if(tv_sec < 0) tv_sec = 0; - if(tv_nsec < 0) - tv_nsec = 0; + if(tv_usec < 0) + tv_usec = 0; boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); int res = timed_wait(lock, boost::get_system_time() + boost::posix_time::seconds(tv_sec) + - boost::posix_time::microseconds(tv_nsec / 1000) - ) ? 0 : ETIMEDOUT; + boost::posix_time::microseconds(tv_usec) + ) ? 0 : THRIFT_ETIMEDOUT; lock.release(); return res; } @@ -179,7 +189,11 @@ void Monitor::unlock() const { const_cast(impl_)->unlock(); } void Monitor::wait(int64_t timeout) const { const_cast(impl_)->wait(timeout); } -int Monitor::waitForTime(const timespec* abstime) const { +int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { + return const_cast(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTime(const timeval* abstime) const { return const_cast(impl_)->waitForTime(abstime); } diff --git a/lib/cpp/src/thrift/concurrency/FunctionRunner.h b/lib/cpp/src/thrift/concurrency/FunctionRunner.h index d3d2d410..e3b2bf32 100644 --- a/lib/cpp/src/thrift/concurrency/FunctionRunner.h +++ b/lib/cpp/src/thrift/concurrency/FunctionRunner.h @@ -20,8 +20,8 @@ #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H #define _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H 1 -#include "thrift/cxxfunctional.h" -#include "thrift/lib/cpp/concurrency/Thread.h" +#include +#include namespace apache { namespace thrift { namespace concurrency { @@ -68,20 +68,26 @@ class FunctionRunner : public Runnable { return boost::shared_ptr(new FunctionRunner(func, arg)); } - +private: + static void pthread_func_wrapper(PthreadFuncPtr func, void *arg) + { + //discard return value + func(arg); + } +public: /** * Given a 'pthread_create' style callback, this FunctionRunner will * execute the given callback. Note that the 'void*' return value is ignored. */ FunctionRunner(PthreadFuncPtr func, void* arg) - : func_(apache::thrift::stdcxx::bind(func, arg)), repFunc_(0) + : func_(apache::thrift::stdcxx::bind(pthread_func_wrapper, func, arg)) { } /** * Given a generic callback, this FunctionRunner will execute it. */ FunctionRunner(const VoidFunc& cob) - : func_(cob), repFunc_(0) + : func_(cob) { } /** @@ -91,13 +97,13 @@ class FunctionRunner : public Runnable { * be intervalMs plus execution time of the callback. */ FunctionRunner(const BoolFunc& cob, int intervalMs) - : func_(0), repFunc_(cob), intervalMs_(intervalMs) + : repFunc_(cob), intervalMs_(intervalMs) { } void run() { if (repFunc_) { while(repFunc_()) { - usleep(intervalMs_*1000); + THRIFT_SLEEP_USEC(intervalMs_*1000); } } else { func_(); diff --git a/lib/cpp/src/thrift/concurrency/Monitor.cpp b/lib/cpp/src/thrift/concurrency/Monitor.cpp index 3d6440c5..965029b5 100644 --- a/lib/cpp/src/thrift/concurrency/Monitor.cpp +++ b/lib/cpp/src/thrift/concurrency/Monitor.cpp @@ -20,11 +20,11 @@ #include "Monitor.h" #include "Exception.h" #include "Util.h" +#include #include #include -#include #include @@ -77,7 +77,7 @@ class Monitor::Impl { */ void wait(int64_t timeout_ms) const { int result = waitForTimeRelative(timeout_ms); - if (result == ETIMEDOUT) { + if (result == THRIFT_ETIMEDOUT) { // pthread_cond_timedwait has been observed to return early on // various platforms, so comment out this assert. //assert(Util::currentTime() >= (now + timeout)); @@ -92,23 +92,23 @@ class Monitor::Impl { * Waits until the specified timeout in milliseconds for the condition to * occur, or waits forever if timeout_ms == 0. * - * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ int waitForTimeRelative(int64_t timeout_ms) const { if (timeout_ms == 0LL) { return waitForever(); } - struct timespec abstime; + struct THRIFT_TIMESPEC abstime; Util::toTimespec(abstime, Util::currentTime() + timeout_ms); return waitForTime(&abstime); } /** - * Waits until the absolute time specified using struct timespec. - * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const timespec* abstime) const { + int waitForTime(const THRIFT_TIMESPEC* abstime) const { assert(mutex_); pthread_mutex_t* mutexImpl = reinterpret_cast(mutex_->getUnderlyingImpl()); @@ -120,6 +120,12 @@ class Monitor::Impl { abstime); } + int waitForTime(const struct timeval* abstime) const { + struct THRIFT_TIMESPEC temp; + temp.tv_sec = abstime->tv_sec; + temp.tv_nsec = abstime->tv_usec * 1000; + return waitForTime(&temp); + } /** * Waits forever until the condition occurs. * Returns 0 if condition occurs, or an error code otherwise. @@ -136,12 +142,14 @@ class Monitor::Impl { void notify() { // XXX Need to assert that caller owns mutex int iret = pthread_cond_signal(&pthread_cond_); + THRIFT_UNUSED_VARIABLE(iret); assert(iret == 0); } void notifyAll() { // XXX Need to assert that caller owns mutex int iret = pthread_cond_broadcast(&pthread_cond_); + THRIFT_UNUSED_VARIABLE(iret); assert(iret == 0); } @@ -164,6 +172,7 @@ class Monitor::Impl { if (condInitialized_) { condInitialized_ = false; int iret = pthread_cond_destroy(&pthread_cond_); + THRIFT_UNUSED_VARIABLE(iret); assert(iret == 0); } } @@ -189,7 +198,11 @@ void Monitor::unlock() const { impl_->unlock(); } void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); } -int Monitor::waitForTime(const timespec* abstime) const { +int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { + return impl_->waitForTime(abstime); +} + +int Monitor::waitForTime(const timeval* abstime) const { return impl_->waitForTime(abstime); } diff --git a/lib/cpp/src/thrift/concurrency/Monitor.h b/lib/cpp/src/thrift/concurrency/Monitor.h index aa6fe939..185e246e 100644 --- a/lib/cpp/src/thrift/concurrency/Monitor.h +++ b/lib/cpp/src/thrift/concurrency/Monitor.h @@ -70,15 +70,21 @@ class Monitor : boost::noncopyable { * Waits a maximum of the specified timeout in milliseconds for the condition * to occur, or waits forever if timeout_ms == 0. * - * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ int waitForTimeRelative(int64_t timeout_ms) const; /** - * Waits until the absolute time specified using struct timespec. - * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. */ - int waitForTime(const timespec* abstime) const; + int waitForTime(const THRIFT_TIMESPEC* abstime) const; + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) const; /** * Waits forever until the condition occurs. diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp index 601afa80..5ca4b0f3 100644 --- a/lib/cpp/src/thrift/concurrency/Mutex.cpp +++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp @@ -20,6 +20,7 @@ #ifdef HAVE_CONFIG_H #include #endif +#include #include "Mutex.h" #include "Util.h" @@ -127,6 +128,7 @@ class Mutex::impl { if (initialized_) { initialized_ = false; int ret = pthread_mutex_destroy(&pthread_mutex_); + THRIFT_UNUSED_VARIABLE(ret); assert(ret == 0); } } @@ -143,7 +145,7 @@ class Mutex::impl { #if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L PROFILE_MUTEX_START_LOCK(); - struct timespec ts; + struct THRIFT_TIMESPEC ts; Util::toTimespec(ts, milliseconds + Util::currentTime()); int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts); if (ret == 0) { @@ -155,7 +157,7 @@ class Mutex::impl { return false; #else /* Otherwise follow solution used by Mono for Android */ - struct timespec sleepytime, now, to; + struct THRIFT_TIMESPEC sleepytime, now, to; /* This is just to avoid a completely busy wait */ sleepytime.tv_sec = 0; @@ -170,7 +172,7 @@ class Mutex::impl { } nanosleep(&sleepytime, NULL); } - + return true; #endif } @@ -206,6 +208,7 @@ void Mutex::unlock() const { impl_->unlock(); } void Mutex::DEFAULT_INITIALIZER(void* arg) { pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg; int ret = pthread_mutex_init(pthread_mutex, NULL); + THRIFT_UNUSED_VARIABLE(ret); assert(ret == 0); } @@ -224,6 +227,7 @@ static void init_with_kind(pthread_mutex_t* mutex, int kind) { ret = pthread_mutexattr_destroy(&mutexattr); assert(ret == 0); + THRIFT_UNUSED_VARIABLE(ret); } #endif @@ -260,6 +264,7 @@ public: profileTime_ = 0; #endif int ret = pthread_rwlock_init(&rw_lock_, NULL); + THRIFT_UNUSED_VARIABLE(ret); assert(ret == 0); initialized_ = true; } @@ -268,6 +273,7 @@ public: if(initialized_) { initialized_ = false; int ret = pthread_rwlock_destroy(&rw_lock_); + THRIFT_UNUSED_VARIABLE(ret); assert(ret == 0); } } diff --git a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h index 04f6ee32..a370c49b 100644 --- a/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h +++ b/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h @@ -20,18 +20,23 @@ #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ #define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1 -#ifndef USE_BOOST_THREAD -# include -#else +#include +#if USE_BOOST_THREAD # include +#elif USE_STD_THREAD +# include +#else +# include #endif namespace apache { namespace thrift { namespace concurrency { -#ifndef USE_BOOST_THREAD - typedef PosixThreadFactory PlatformThreadFactory; -#else +#ifdef USE_BOOST_THREAD typedef BoostThreadFactory PlatformThreadFactory; +#elif USE_STD_THREAD + typedef StdThreadFactory PlatformThreadFactory; +#else + typedef PosixThreadFactory PlatformThreadFactory; #endif }}} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/StdMonitor.cpp b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp new file mode 100644 index 00000000..af602f18 --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/StdMonitor.cpp @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif +#include "Monitor.h" +#include "Exception.h" +#include "Util.h" +#include +#include + +#include +#include +#include +#include + +namespace apache { namespace thrift { namespace concurrency { + +/** + * Monitor implementation using the std thread library + * + * @version $Id:$ + */ +class Monitor::Impl { + + public: + + Impl() + : ownedMutex_(new Mutex()), + conditionVariable_(), + mutex_(NULL) { + init(ownedMutex_.get()); + } + + Impl(Mutex* mutex) + : ownedMutex_(), + conditionVariable_(), + mutex_(NULL) { + init(mutex); + } + + Impl(Monitor* monitor) + : ownedMutex_(), + conditionVariable_(), + mutex_(NULL) { + init(&(monitor->mutex())); + } + + Mutex& mutex() { return *mutex_; } + void lock() { mutex_->lock(); } + void unlock() { mutex_->unlock(); } + + /** + * Exception-throwing version of waitForTimeRelative(), called simply + * wait(int64) for historical reasons. Timeout is in milliseconds. + * + * If the condition occurs, this function returns cleanly; on timeout or + * error an exception is thrown. + */ + void wait(int64_t timeout_ms) { + int result = waitForTimeRelative(timeout_ms); + if (result == THRIFT_ETIMEDOUT) { + throw TimedOutException(); + } else if (result != 0) { + throw TException( + "Monitor::wait() failed"); + } + } + + /** + * Waits until the specified timeout in milliseconds for the condition to + * occur, or waits forever if timeout_ms == 0. + * + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTimeRelative(int64_t timeout_ms) { + if (timeout_ms == 0LL) { + return waitForever(); + } + + assert(mutex_); + std::timed_mutex* mutexImpl = + static_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + std::unique_lock lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); + } + + /** + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const THRIFT_TIMESPEC* abstime) { + struct timeval temp; + temp.tv_sec = static_cast(abstime->tv_sec); + temp.tv_usec = static_cast(abstime->tv_nsec) / 1000; + return waitForTime(&temp); + } + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) { + assert(mutex_); + std::timed_mutex* mutexImpl = + static_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + struct timeval currenttime; + Util::toTimeval(currenttime, Util::currentTime()); + + long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); + long tv_usec = static_cast(abstime->tv_usec - currenttime.tv_usec); + if(tv_sec < 0) + tv_sec = 0; + if(tv_usec < 0) + tv_usec = 0; + + std::unique_lock lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, + std::chrono::seconds(tv_sec) + + std::chrono::microseconds(tv_usec)) == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); + } + + /** + * Waits forever until the condition occurs. + * Returns 0 if condition occurs, or an error code otherwise. + */ + int waitForever() { + assert(mutex_); + std::timed_mutex* mutexImpl = + static_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + std::unique_lock lock(*mutexImpl, std::adopt_lock); + conditionVariable_.wait(lock); + lock.release(); + return 0; + } + + + void notify() { + conditionVariable_.notify_one(); + } + + void notifyAll() { + conditionVariable_.notify_all(); + } + + private: + + void init(Mutex* mutex) { + mutex_ = mutex; + } + + const std::unique_ptr ownedMutex_; + std::condition_variable_any conditionVariable_; + Mutex* mutex_; +}; + +Monitor::Monitor() : impl_(new Monitor::Impl()) {} +Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {} +Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {} + +Monitor::~Monitor() { delete impl_; } + +Mutex& Monitor::mutex() const { return const_cast(impl_)->mutex(); } + +void Monitor::lock() const { const_cast(impl_)->lock(); } + +void Monitor::unlock() const { const_cast(impl_)->unlock(); } + +void Monitor::wait(int64_t timeout) const { const_cast(impl_)->wait(timeout); } + +int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { + return const_cast(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTime(const timeval* abstime) const { + return const_cast(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTimeRelative(int64_t timeout_ms) const { + return const_cast(impl_)->waitForTimeRelative(timeout_ms); +} + +int Monitor::waitForever() const { + return const_cast(impl_)->waitForever(); +} + +void Monitor::notify() const { const_cast(impl_)->notify(); } + +void Monitor::notifyAll() const { const_cast(impl_)->notifyAll(); } + +}}} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp new file mode 100644 index 00000000..584a5811 --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/StdMutex.cpp @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif +#include "Mutex.h" +#include "Util.h" + +#include +#include +#include + +namespace apache { namespace thrift { namespace concurrency { + +/** + * Implementation of Mutex class using C++11 std::timed_mutex + * + * @version $Id:$ + */ +class Mutex::impl : public std::timed_mutex { +}; + +Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {} + +void* Mutex::getUnderlyingImpl() const { return impl_.get(); } + +void Mutex::lock() const { impl_->lock(); } + +bool Mutex::trylock() const { return impl_->try_lock(); } + +bool Mutex::timedlock(int64_t ms) const { return impl_->try_lock_for(std::chrono::milliseconds(ms)); } + +void Mutex::unlock() const { impl_->unlock(); } + +void Mutex::DEFAULT_INITIALIZER(void* arg) { +} + +}}} // apache::thrift::concurrency + diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp new file mode 100644 index 00000000..a521f093 --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif +#include "StdThreadFactory.h" +#include "Exception.h" + +#include + +#include +#include +#include + +namespace apache { namespace thrift { namespace concurrency { + +/** + * The C++11 thread class. + * + * Note that we use boost shared_ptr rather than std shared_ptrs here + * because the Thread/Runnable classes use those and we don't want to + * mix them. + * + * @version $Id:$ + */ +class StdThread: public Thread, public boost::enable_shared_from_this { + public: + + enum STATE { + uninitialized, + starting, + started, + stopping, + stopped + }; + + static void threadMain(boost::shared_ptr thread); + + private: + std::unique_ptr thread_; + STATE state_; + bool detached_; + + public: + + StdThread(bool detached, boost::shared_ptr runnable) : + state_(uninitialized), + detached_(detached) { + this->Thread::runnable(runnable); + } + + ~StdThread() { + if(!detached_) { + try { + join(); + } catch(...) { + // We're really hosed. + } + } + } + + void start() { + if (state_ != uninitialized) { + return; + } + + boost::shared_ptr selfRef = shared_from_this(); + state_ = starting; + + thread_ = std::unique_ptr(new std::thread(threadMain, selfRef)); + + if(detached_) + thread_->detach(); + } + + void join() { + if (!detached_ && state_ != uninitialized) { + thread_->join(); + } + } + + Thread::id_t getId() { + return thread_.get() ? thread_->get_id() : std::thread::id(); + } + + boost::shared_ptr runnable() const { return Thread::runnable(); } + + void runnable(boost::shared_ptr value) { Thread::runnable(value); } +}; + +void StdThread::threadMain(boost::shared_ptr thread) { + if (thread == NULL) { + return; + } + + if (thread->state_ != starting) { + return; + } + + thread->state_ = started; + thread->runnable()->run(); + + if (thread->state_ != stopping && thread->state_ != stopped) { + thread->state_ = stopping; + } + + return; +} + +/** + * std::thread factory implementation + */ +class StdThreadFactory::Impl { + + private: + bool detached_; + + public: + + Impl(bool detached) : + detached_(detached) {} + + /** + * Creates a new std::thread to run the runnable object + * + * @param runnable A runnable object + */ + boost::shared_ptr newThread(boost::shared_ptr runnable) const { + boost::shared_ptr result = boost::shared_ptr(new StdThread(detached_, runnable)); + runnable->thread(result); + return result; + } + + bool isDetached() const { return detached_; } + + void setDetached(bool value) { detached_ = value; } + + Thread::id_t getCurrentThreadId() const { + return std::this_thread::get_id(); + } + +}; + +StdThreadFactory::StdThreadFactory(bool detached) : + impl_(new StdThreadFactory::Impl(detached)) {} + +boost::shared_ptr StdThreadFactory::newThread(boost::shared_ptr runnable) const { return impl_->newThread(runnable); } + +bool StdThreadFactory::isDetached() const { return impl_->isDetached(); } + +void StdThreadFactory::setDetached(bool value) { impl_->setDetached(value); } + +Thread::id_t StdThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); } + +}}} // apache::thrift::concurrency diff --git a/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h new file mode 100644 index 00000000..c300e0d0 --- /dev/null +++ b/lib/cpp/src/thrift/concurrency/StdThreadFactory.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1 + +#include "Thread.h" + +#include + +namespace apache { namespace thrift { namespace concurrency { + +/** + * A thread factory to create std::threads. + * + * @version $Id:$ + */ +class StdThreadFactory : public ThreadFactory { + + public: + + /** + * Std thread factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + * the Runnable tasks they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * By default threads are not joinable. + */ + + StdThreadFactory(bool detached=true); + + // From ThreadFactory; + boost::shared_ptr newThread(boost::shared_ptr runnable) const; + + // From ThreadFactory; + Thread::id_t getCurrentThreadId() const; + + /** + * Sets detached mode of threads + */ + virtual void setDetached(bool detached); + + /** + * Gets current detached mode + */ + virtual bool isDetached() const; + +private: + class Impl; + boost::shared_ptr impl_; +}; + +}}} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ diff --git a/lib/cpp/src/thrift/concurrency/Thread.h b/lib/cpp/src/thrift/concurrency/Thread.h index 164df0cb..6be7d1d0 100755 --- a/lib/cpp/src/thrift/concurrency/Thread.h +++ b/lib/cpp/src/thrift/concurrency/Thread.h @@ -28,12 +28,14 @@ #include #endif -#ifdef USE_BOOST_THREAD -#include -#endif - -#ifdef HAVE_PTHREAD_H -#include +#if USE_BOOST_THREAD +# include +#elif USE_STD_THREAD +# include +#else +# ifdef HAVE_PTHREAD_H +# include +# endif #endif namespace apache { namespace thrift { namespace concurrency { @@ -80,13 +82,19 @@ class Thread { public: -#ifdef USE_BOOST_THREAD +#if USE_BOOST_THREAD typedef boost::thread::id id_t; static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); } static inline id_t get_current() { return boost::this_thread::get_id(); } +#elif USE_STD_THREAD + typedef std::thread::id id_t; + + static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); } + static inline id_t get_current() { return std::this_thread::get_id(); } #else typedef pthread_t id_t; + static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); } static inline id_t get_current() { return pthread_self(); } #endif diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/lib/cpp/src/thrift/concurrency/TimerManager.cpp index 8be8a6e7..98a4b284 100644 --- a/lib/cpp/src/thrift/concurrency/TimerManager.cpp +++ b/lib/cpp/src/thrift/concurrency/TimerManager.cpp @@ -263,7 +263,7 @@ void TimerManager::add(shared_ptr task, int64_t timeout) { } } -void TimerManager::add(shared_ptr task, const struct timespec& value) { +void TimerManager::add(shared_ptr task, const struct THRIFT_TIMESPEC& value) { int64_t expiration; Util::toMilliseconds(expiration, value); @@ -277,6 +277,19 @@ void TimerManager::add(shared_ptr task, const struct timespec& value) add(task, expiration - now); } +void TimerManager::add(shared_ptr task, const struct timeval& value) { + + int64_t expiration; + Util::toMilliseconds(expiration, value); + + int64_t now = Util::currentTime(); + + if (expiration < now) { + throw InvalidArgumentException(); + } + + add(task, expiration - now); +} void TimerManager::remove(shared_ptr task) { (void) task; diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.h b/lib/cpp/src/thrift/concurrency/TimerManager.h index d905ddb7..fdda2635 100644 --- a/lib/cpp/src/thrift/concurrency/TimerManager.h +++ b/lib/cpp/src/thrift/concurrency/TimerManager.h @@ -77,7 +77,15 @@ class TimerManager { * @param task The task to execute * @param timeout Absolute time in the future to execute task. */ - virtual void add(boost::shared_ptr task, const struct timespec& timeout); + virtual void add(boost::shared_ptr task, const struct THRIFT_TIMESPEC& timeout); + + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * @param task The task to execute + * @param timeout Absolute time in the future to execute task. + */ + virtual void add(boost::shared_ptr task, const struct timeval& timeout); /** * Removes a pending task diff --git a/lib/cpp/src/thrift/concurrency/Util.cpp b/lib/cpp/src/thrift/concurrency/Util.cpp index 764b6f5d..1f291da1 100644 --- a/lib/cpp/src/thrift/concurrency/Util.cpp +++ b/lib/cpp/src/thrift/concurrency/Util.cpp @@ -20,35 +20,22 @@ #ifdef HAVE_CONFIG_H #include #endif - +#include #include "Util.h" -#if defined(HAVE_CLOCK_GETTIME) -#include -#elif defined(HAVE_SYS_TIME_H) +#if defined(HAVE_SYS_TIME_H) #include -#endif // defined(HAVE_CLOCK_GETTIME) +#endif namespace apache { namespace thrift { namespace concurrency { int64_t Util::currentTimeTicks(int64_t ticksPerSec) { int64_t result; - -#if defined(HAVE_CLOCK_GETTIME) - struct timespec now; - int ret = clock_gettime(CLOCK_REALTIME, &now); - assert(ret == 0); - ret = ret; //squelching "unused variable" warning - toTicks(result, now, ticksPerSec); -#elif defined(HAVE_GETTIMEOFDAY) struct timeval now; - int ret = gettimeofday(&now, NULL); + int ret = THRIFT_GETTIMEOFDAY(&now, NULL); assert(ret == 0); + THRIFT_UNUSED_VARIABLE(ret); //squelching "unused variable" warning toTicks(result, now, ticksPerSec); -#else -#error "No high-precision clock is available." -#endif // defined(HAVE_CLOCK_GETTIME) - return result; } diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h index e454227e..d63776ad 100644 --- a/lib/cpp/src/thrift/concurrency/Util.h +++ b/lib/cpp/src/thrift/concurrency/Util.h @@ -56,12 +56,12 @@ class Util { public: /** - * Converts millisecond timestamp into a timespec struct + * Converts millisecond timestamp into a THRIFT_TIMESPEC struct * - * @param struct timespec& result + * @param struct THRIFT_TIMESPEC& result * @param time or duration in milliseconds */ - static void toTimespec(struct timespec& result, int64_t value) { + static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) { result.tv_sec = value / MS_PER_S; // ms to s result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns } @@ -82,10 +82,10 @@ class Util { } } /** - * Converts struct timespec to arbitrary-sized ticks since epoch + * Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch */ static void toTicks(int64_t& result, - const struct timespec& value, + const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) { return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec); } @@ -100,10 +100,10 @@ class Util { } /** - * Converts struct timespec to milliseconds + * Converts struct THRIFT_TIMESPEC to milliseconds */ static void toMilliseconds(int64_t& result, - const struct timespec& value) { + const struct THRIFT_TIMESPEC& value) { return toTicks(result, value, MS_PER_S); } @@ -116,9 +116,9 @@ class Util { } /** - * Converts struct timespec to microseconds + * Converts struct THRIFT_TIMESPEC to microseconds */ - static void toUsec(int64_t& result, const struct timespec& value) { + static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) { return toTicks(result, value, US_PER_S); } diff --git a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp b/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp index 8b69df41..9f552450 100644 --- a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp +++ b/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp @@ -32,7 +32,7 @@ using std::string; static string byte_to_hex(const uint8_t byte) { char buf[3]; int ret = std::sprintf(buf, "%02x", (int)byte); - ret = ret; //squelching "unused variable" warning + THRIFT_UNUSED_VARIABLE(ret); assert(ret == 2); assert(buf[2] == '\0'); return buf; diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index 69f0e555..3277f5c8 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include @@ -51,7 +52,6 @@ #include #endif -#include #include #ifdef HAVE_SCHED_H @@ -62,7 +62,7 @@ #define AF_LOCAL AF_UNIX #endif -#ifdef _MSC_VER +#if !defined(PRIu32) #define PRIu32 "I32u" #define PRIu64 "I64u" #endif @@ -221,7 +221,7 @@ class TNonblockingServer::TConnection { class Task; /// Constructor - TConnection(int socket, TNonblockingIOThread* ioThread, + TConnection(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread, const sockaddr* addr, socklen_t addrLen) { readBuffer_ = NULL; readBufferSize_ = 0; @@ -232,8 +232,8 @@ class TNonblockingServer::TConnection { // Allocate input and output transports these only need to be allocated // once per TConnection (they don't need to be reallocated on init() call) inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_)); - outputTransport_.reset(new TMemoryBuffer( - server_->getWriteBufferDefaultSize())); + outputTransport_.reset( + new TMemoryBuffer(static_cast(server_->getWriteBufferDefaultSize()))); tSocket_.reset(new TSocket()); init(socket, ioThread, addr, addrLen); } @@ -254,7 +254,7 @@ class TNonblockingServer::TConnection { void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); /// Initialize - void init(int socket, TNonblockingIOThread* ioThread, + void init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread, const sockaddr* addr, socklen_t addrLen); /** @@ -284,7 +284,7 @@ class TNonblockingServer::TConnection { * * Don't call this from the IO thread itself. * - * @return true if successful, false if unable to notify (check errno). + * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR). */ bool notifyIOThread() { return ioThread_->notify(this); @@ -389,7 +389,7 @@ class TNonblockingServer::TConnection::Task: public Runnable { void* connectionContext_; }; -void TNonblockingServer::TConnection::init(int socket, +void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket, TNonblockingIOThread* ioThread, const sockaddr* addr, socklen_t addrLen) { @@ -827,7 +827,7 @@ void TNonblockingServer::TConnection::setFlags(short eventFlags) { void TNonblockingServer::TConnection::close() { // Delete the registered libevent if (event_del(&event_) == -1) { - GlobalOutput.perror("TConnection::close() event_del", errno); + GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR); } if (serverEventHandler_ != NULL) { @@ -857,7 +857,7 @@ void TNonblockingServer::TConnection::checkIdleBufferMemLimit( if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) { // just start over - outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize()); + outputTransport_->resetBuffer(static_cast(server_->getWriteBufferDefaultSize())); largestWriteBufferSize_ = 0; } } @@ -888,7 +888,7 @@ TNonblockingServer::~TNonblockingServer() { * by allocating a new one entirely */ TNonblockingServer::TConnection* TNonblockingServer::createConnection( - int socket, const sockaddr* addr, socklen_t addrLen) { + THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen) { // Check the stack Guard g(connMutex_); @@ -935,7 +935,7 @@ void TNonblockingServer::returnConnection(TConnection* connection) { * Server socket had something happen. We accept all waiting client * connections on fd and assign TConnection objects to handle those requests. */ -void TNonblockingServer::handleEvent(int fd, short which) { +void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { (void) which; // Make sure that libevent didn't mess up the socket handles assert(fd == serverSocket_); @@ -947,7 +947,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { addrLen = sizeof(addrStorage); // Going to accept a new client socket - int clientSocket; + THRIFT_SOCKET clientSocket; // Accept as many new clients as possible, even though libevent signaled only // one, this helps us to avoid having to go back into the libevent engine so @@ -959,12 +959,12 @@ void TNonblockingServer::handleEvent(int fd, short which) { nConnectionsDropped_++; nTotalConnectionsDropped_++; if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) { - ::close(clientSocket); + ::THRIFT_CLOSESOCKET(clientSocket); return; } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) { if (!drainPendingTask()) { // Nothing left to discard, so we drop connection instead. - ::close(clientSocket); + ::THRIFT_CLOSESOCKET(clientSocket); return; } } @@ -972,10 +972,10 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Explicitly set this socket to NONBLOCK mode int flags; - if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 || - fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) { - GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno); - ::close(clientSocket); + if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0 || + THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) { + GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ", THRIFT_GET_SOCKET_ERROR); + ::THRIFT_CLOSESOCKET(clientSocket); return; } @@ -986,7 +986,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Fail fast if we could not create a TConnection object if (clientConnection == NULL) { GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory"); - ::close(clientSocket); + ::THRIFT_CLOSESOCKET(clientSocket); return; } @@ -1015,8 +1015,8 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Done looping accept, now we have to make sure the error is due to // blocking. Any other error is a problem - if (errno != EAGAIN && errno != EWOULDBLOCK) { - GlobalOutput.perror("thriftServerEventHandler: accept() ", errno); + if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) { + GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR); } } @@ -1024,7 +1024,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { * Creates a socket to listen on and binds it to the local port. */ void TNonblockingServer::createAndListenOnSocket() { - int s; + THRIFT_SOCKET s; struct addrinfo hints, *res, *res0; int error; @@ -1040,7 +1040,7 @@ void TNonblockingServer::createAndListenOnSocket() { error = getaddrinfo(NULL, port, &hints, &res0); if (error) { throw TException("TNonblockingServer::serve() getaddrinfo " + - string(gai_strerror(error))); + string(THRIFT_GAI_STRERROR(error))); } // Pick the ipv6 address first since ipv4 addresses can be mapped @@ -1069,15 +1069,15 @@ void TNonblockingServer::createAndListenOnSocket() { int one = 1; - // Set reuseaddr to avoid 2MSL delay on server restart - setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one)); + // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart + setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one)); - if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) { - ::close(s); + if (::bind(s, res->ai_addr, static_cast(res->ai_addrlen)) == -1) { + ::THRIFT_CLOSESOCKET(s); freeaddrinfo(res0); throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServer::serve() bind", - errno); + THRIFT_GET_SOCKET_ERROR); } // Done with the addr info @@ -1091,13 +1091,13 @@ void TNonblockingServer::createAndListenOnSocket() { * Takes a socket created by listenSocket() and sets various options on it * to prepare for use in the server. */ -void TNonblockingServer::listenSocket(int s) { +void TNonblockingServer::listenSocket(THRIFT_SOCKET s) { // Set socket to nonblocking mode int flags; - if ((flags = fcntl(s, F_GETFL, 0)) < 0 || - fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) { - ::close(s); - throw TException("TNonblockingServer::serve() O_NONBLOCK"); + if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0 || + THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) { + ::THRIFT_CLOSESOCKET(s); + throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK"); } int one = 1; @@ -1122,7 +1122,7 @@ void TNonblockingServer::listenSocket(int s) { #endif if (listen(s, LISTEN_BACKLOG) == -1) { - ::close(s); + ::THRIFT_CLOSESOCKET(s); throw TException("TNonblockingServer::serve() listen"); } @@ -1209,7 +1209,7 @@ void TNonblockingServer::serve() { for (uint32_t id = 0; id < numIOThreads_; ++id) { // the first IO thread also does the listening on server socket - int listenFd = (id == 0 ? serverSocket_ : -1); + THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : -1); shared_ptr thread( new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); @@ -1232,7 +1232,7 @@ void TNonblockingServer::serve() { // Launch all the secondary IO threads in separate threads if (ioThreads_.size() > 1) { ioThreadFactory_.reset(new PlatformThreadFactory( -#ifndef USE_BOOST_THREAD +#if !defined(USE_BOOST_THREAD) && !defined(USE_STD_THREAD) PlatformThreadFactory::OTHER, // scheduler PlatformThreadFactory::NORMAL, // priority 1, // stack size (MB) @@ -1263,7 +1263,7 @@ void TNonblockingServer::serve() { TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server, int number, - int listenSocket, + THRIFT_SOCKET listenSocket, bool useHighPriority) : server_(server) , number_(number) @@ -1283,18 +1283,18 @@ TNonblockingIOThread::~TNonblockingIOThread() { } if (listenSocket_ >= 0) { - if (0 != ::close(listenSocket_)) { + if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) { GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", - errno); + THRIFT_GET_SOCKET_ERROR); } listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE; } for (int i = 0; i < 2; ++i) { if (notificationPipeFDs_[i] >= 0) { - if (0 != ::close(notificationPipeFDs_[i])) { + if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFDs_[i])) { GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ", - errno); + THRIFT_GET_SOCKET_ERROR); } notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE; } @@ -1308,20 +1308,20 @@ void TNonblockingIOThread::createNotificationPipe() { } if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 || evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) { - ::close(notificationPipeFDs_[0]); - ::close(notificationPipeFDs_[1]); - throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK"); + ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); + ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); + throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK"); } for (int i = 0; i < 2; ++i) { #if LIBEVENT_VERSION_NUMBER < 0x02000000 int flags; - if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 || - fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) { + if ((flags = THRIFT_FCNTL(notificationPipeFDs_[i], F_GETFD, 0)) < 0 || + THRIFT_FCNTL(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) { #else if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) { #endif - ::close(notificationPipeFDs_[0]); - ::close(notificationPipeFDs_[1]); + ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); + ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); throw TException("TNonblockingServer::createNotificationPipe() " "FD_CLOEXEC"); } @@ -1372,7 +1372,7 @@ void TNonblockingIOThread::registerEvents() { } bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { - int fd = getNotificationSendFD(); + THRIFT_SOCKET fd = getNotificationSendFD(); if (fd < 0) { return false; } @@ -1412,9 +1412,9 @@ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* // exit the loop break; } else { // nBytes < 0 - if (errno != EWOULDBLOCK && errno != EAGAIN) { + if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) { GlobalOutput.perror( - "TNonblocking: notifyHandler read() failed: ", errno); + "TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR); ioThread->breakLoop(true); return; } @@ -1474,7 +1474,7 @@ void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { GlobalOutput.printf( "TNonblocking: IO Thread #%d using high-priority scheduler!", number_); } else { - GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno); + GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR); } #endif } @@ -1520,7 +1520,7 @@ void TNonblockingIOThread::cleanupEvents() { // stop the listen socket, if any if (listenSocket_ >= 0) { if (event_del(&serverEvent_) == -1) { - GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno); + GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR); } } diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h index e7bbdc5a..9e6ba170 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.h +++ b/lib/cpp/src/thrift/server/TNonblockingServer.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -32,7 +33,6 @@ #include #include #include -#include #include #ifdef HAVE_UNISTD_H #include @@ -67,7 +67,7 @@ using apache::thrift::concurrency::Guard; #endif #if LIBEVENT_VERSION_NUMBER < 0x02000000 - typedef int evutil_socket_t; + typedef THRIFT_SOCKET evutil_socket_t; #endif #ifndef SOCKOPT_CAST_T @@ -146,7 +146,7 @@ class TNonblockingServer : public TServer { static const int DEFAULT_IO_THREADS = 1; /// File descriptor of an invalid socket - static const int INVALID_SOCKET_VALUE = -1; + static const THRIFT_SOCKET INVALID_SOCKET_VALUE = -1; /// # of IO threads this server will use size_t numIOThreads_; @@ -155,7 +155,7 @@ class TNonblockingServer : public TServer { bool useHighPriorityIOThreads_; /// Server socket file descriptor - int serverSocket_; + THRIFT_SOCKET serverSocket_; /// Port server runs on int port_; @@ -271,7 +271,7 @@ class TNonblockingServer : public TServer { * @param fd the listen socket. * @param which the event flag that triggered the handler. */ - void handleEvent(int fd, short which); + void handleEvent(THRIFT_SOCKET fd, short which); void init(int port) { serverSocket_ = -1; @@ -774,7 +774,7 @@ class TNonblockingServer : public TServer { * * @param fd descriptor of socket to be initialized/ */ - void listenSocket(int fd); + void listenSocket(THRIFT_SOCKET fd); /** * Return an initialized connection object. Creates or recovers from * pool a TConnection and initializes it with the provided socket FD @@ -785,7 +785,7 @@ class TNonblockingServer : public TServer { * @param addrLen the length of addr * @return pointer to initialized TConnection object. */ - TConnection* createConnection(int socket, const sockaddr* addr, + TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr, socklen_t addrLen); /** @@ -805,7 +805,7 @@ class TNonblockingIOThread : public Runnable { // listenSocket is < 0, accepting will not be done. TNonblockingIOThread(TNonblockingServer* server, int number, - int listenSocket, + THRIFT_SOCKET listenSocket, bool useHighPriority); ~TNonblockingIOThread(); @@ -896,7 +896,7 @@ class TNonblockingIOThread : public Runnable { Thread::id_t threadId_; /// If listenSocket_ >= 0, adds an event on the event_base to accept conns - int listenSocket_; + THRIFT_SOCKET listenSocket_; /// Sets a high scheduling priority when running bool useHighPriority_; diff --git a/lib/cpp/src/thrift/transport/PlatformSocket.h b/lib/cpp/src/thrift/transport/PlatformSocket.h new file mode 100644 index 00000000..40a42460 --- /dev/null +++ b/lib/cpp/src/thrift/transport/PlatformSocket.h @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TRANSPORT_PLATFORM_SOCKET_H_ +# define _THRIFT_TRANSPORT_PLATFORM_SOCKET_H_ + +#ifdef _WIN32 +# define THRIFT_GET_SOCKET_ERROR ::WSAGetLastError() +# define THRIFT_EINPROGRESS WSAEINPROGRESS +# define THRIFT_EAGAIN WSAEWOULDBLOCK +# define THRIFT_EINTR WSAEINTR +# define THRIFT_ECONNRESET WSAECONNRESET +# define THRIFT_ENOTCONN WSAENOTCONN +# define THRIFT_ETIMEDOUT WSAETIMEDOUT +# define THRIFT_EWOULDBLOCK WSAEWOULDBLOCK +# define THRIFT_EPIPE WSAECONNRESET +# define THRIFT_NO_SOCKET_CACHING SO_EXCLUSIVEADDRUSE +# define THRIFT_SOCKET SOCKET +# define THRIFT_SOCKETPAIR thrift_socketpair +# define THRIFT_FCNTL thrift_fcntl +# define THRIFT_O_NONBLOCK 1 +# define THRIFT_F_GETFL 0 +# define THRIFT_F_SETFL 1 +# define THRIFT_GETTIMEOFDAY thrift_gettimeofday +# define THRIFT_CLOSESOCKET closesocket +# define THRIFT_GAI_STRERROR gai_strerrorA +# define THRIFT_SSIZET ptrdiff_t +# define THRIFT_SNPRINTF _snprintf +# define THRIFT_SLEEP_SEC thrift_sleep +# define THRIFT_SLEEP_USEC thrift_usleep +# define THRIFT_TIMESPEC thrift_timespec +# define THRIFT_CTIME_R thrift_ctime_r +# define THRIFT_POLL thrift_poll +# if WINVER <= 0x0502 //XP, Server2003 +# define THRIFT_POLLFD thrift_pollfd +# define THRIFT_POLLIN 0x0300 +# define THRIFT_POLLOUT 0x0010 +# else //Vista, Win7... +# define THRIFT_POLLFD pollfd +# define THRIFT_POLLIN POLLIN +# define THRIFT_POLLOUT POLLOUT +# endif //WINVER +# define THRIFT_SHUT_RDWR SD_BOTH +#else //not _WIN32 +# include +# define THRIFT_GET_SOCKET_ERROR errno +# define THRIFT_EINTR EINTR +# define THRIFT_EINPROGRESS EINPROGRESS +# define THRIFT_ECONNRESET ECONNRESET +# define THRIFT_ENOTCONN ENOTCONN +# define THRIFT_ETIMEDOUT ETIMEDOUT +# define THRIFT_EWOULDBLOCK EWOULDBLOCK +# define THRIFT_EAGAIN EAGAIN +# define THRIFT_EPIPE EPIPE +# define THRIFT_NO_SOCKET_CACHING SO_REUSEADDR +# define THRIFT_SOCKET int +# define THRIFT_SOCKETPAIR socketpair +# define THRIFT_FCNTL fcntl +# define THRIFT_O_NONBLOCK O_NONBLOCK +# define THRIFT_F_GETFL F_GETFL +# define THRIFT_F_SETFL F_SETFL +# define THRIFT_GETTIMEOFDAY gettimeofday +# define THRIFT_CLOSESOCKET close +# define THRIFT_GAI_STRERROR gai_strerror +# define THRIFT_SSIZET ssize_t +# define THRIFT_SNPRINTF snprintf +# define THRIFT_SLEEP_SEC sleep +# define THRIFT_SLEEP_USEC usleep +# define THRIFT_TIMESPEC timespec +# define THRIFT_CTIME_R ctime_r +# define THRIFT_POLL poll +# define THRIFT_POLLFD pollfd +# define THRIFT_POLLIN POLLIN +# define THRIFT_POLLOUT POLLOUT +# define THRIFT_SHUT_RDWR SHUT_RDWR +#endif + +#endif // _THRIFT_TRANSPORT_PLATFORM_SOCKET_H_ diff --git a/lib/cpp/src/thrift/transport/TFDTransport.cpp b/lib/cpp/src/thrift/transport/TFDTransport.cpp index 176e7bf7..3b72de52 100644 --- a/lib/cpp/src/thrift/transport/TFDTransport.cpp +++ b/lib/cpp/src/thrift/transport/TFDTransport.cpp @@ -21,6 +21,7 @@ #include #include +#include #ifdef HAVE_UNISTD_H #include @@ -39,8 +40,8 @@ void TFDTransport::close() { return; } - int rv = ::close(fd_); - int errno_copy = errno; + int rv = ::THRIFT_CLOSESOCKET(fd_); + int errno_copy = THRIFT_GET_SOCKET_ERROR; fd_ = -1; // Have to check uncaught_exception because this is called in the destructor. if (rv < 0 && !std::uncaught_exception()) { @@ -54,14 +55,14 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) { unsigned int maxRetries = 5; // same as the TSocket default unsigned int retries = 0; while (true) { - ssize_t rv = ::read(fd_, buf, len); + THRIFT_SSIZET rv = ::read(fd_, buf, len); if (rv < 0) { - if (errno == EINTR && retries < maxRetries) { + if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) { // If interrupted, try again ++retries; continue; } - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; throw TTransportException(TTransportException::UNKNOWN, "TFDTransport::read()", errno_copy); @@ -74,10 +75,10 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) { void TFDTransport::write(const uint8_t* buf, uint32_t len) { while (len > 0) { - ssize_t rv = ::write(fd_, buf, len); + THRIFT_SSIZET rv = ::write(fd_, buf, len); if (rv < 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; throw TTransportException(TTransportException::UNKNOWN, "TFDTransport::write()", errno_copy); diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp index 4b6ea47f..137e47da 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.cpp +++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp @@ -23,17 +23,15 @@ #include "TFileTransport.h" #include "TTransportUtils.h" +#include "PlatformSocket.h" +#include -#ifdef HAVE_PTHREAD_H -#include -#endif #ifdef HAVE_SYS_TIME_H #include #else #include #endif #include -#include #ifdef HAVE_UNISTD_H #include #endif @@ -60,27 +58,6 @@ using namespace std; using namespace apache::thrift::protocol; using namespace apache::thrift::concurrency; -#ifndef HAVE_CLOCK_GETTIME - -/** - * Fake clock_gettime for systems like darwin - * - */ -#define CLOCK_REALTIME 0 -static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) { - struct timeval now; - - int rv = gettimeofday(&now, NULL); - if (rv != 0) { - return rv; - } - - tp->tv_sec = now.tv_sec; - tp->tv_nsec = now.tv_usec * 1000; - return 0; -} -#endif - TFileTransport::TFileTransport(string path, bool readOnly) : readState_() , readBuff_(NULL) @@ -96,7 +73,6 @@ TFileTransport::TFileTransport(string path, bool readOnly) , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US) , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US) , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US) - , writerThreadId_(0) , dequeueBuffer_(NULL) , enqueueBuffer_(NULL) , notFull_(&mutex_) @@ -112,6 +88,7 @@ TFileTransport::TFileTransport(string path, bool readOnly) , numCorruptedEventsInChunk_(0) , readOnly_(readOnly) { + threadFactory_.setDetached(false); openLogFile(); } @@ -124,8 +101,8 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { // flush any events in the queue flush(); GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str()); - if (-1 == ::close(fd_)) { - int errno_copy = errno; + if (-1 == ::THRIFT_CLOSESOCKET(fd_)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy); throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy); } else { @@ -145,25 +122,16 @@ void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { TFileTransport::~TFileTransport() { // flush the buffer if a writer thread is active -#ifdef USE_BOOST_THREAD - if(writerThreadId_.get()) { -#else - if (writerThreadId_ > 0) { -#endif + if(writerThread_.get()) { // set state to closing closing_ = true; // wake up the writer thread // Since closing_ is true, it will attempt to flush all data, then exit. - notEmpty_.notify(); + notEmpty_.notify(); -#ifdef USE_BOOST_THREAD - writerThreadId_->join(); - writerThreadId_.reset(); -#else - pthread_join(writerThreadId_, NULL); - writerThreadId_ = 0; -#endif + writerThread_->join(); + writerThread_.reset(); } if (dequeueBuffer_) { @@ -188,8 +156,8 @@ TFileTransport::~TFileTransport() { // close logfile if (fd_ > 0) { - if(-1 == ::close(fd_)) { - GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno); + if(-1 == ::THRIFT_CLOSESOCKET(fd_)) { + GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR); } else { //successfully closed fd fd_ = 0; @@ -203,18 +171,11 @@ bool TFileTransport::initBufferAndWriteThread() { return false; } -#ifdef USE_BOOST_THREAD - if(!writerThreadId_.get()) { - writerThreadId_ = std::auto_ptr(new boost::thread(boost::bind(startWriterThread, (void *)this))); - } -#else - if (writerThreadId_ == 0) { - if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) { - T_ERROR("%s", "Could not create writer thread"); - return false; - } + if(!writerThread_.get()) { + writerThread_ = threadFactory_.newThread( + apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this)); + writerThread_->start(); } -#endif dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); @@ -295,7 +256,7 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { // it is probably a non-factor for the time being } -bool TFileTransport::swapEventBuffers(struct timespec* deadline) { +bool TFileTransport::swapEventBuffers(struct timeval* deadline) { bool swap; Guard g(mutex_); @@ -341,7 +302,7 @@ void TFileTransport::writerThread() { try { openLogFile(); } catch (...) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy); fd_ = 0; hasIOError = true; @@ -361,14 +322,14 @@ void TFileTransport::writerThread() { #endif readState_.resetAllValues(); } catch (...) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy); hasIOError = true; } } // Figure out the next time by which a flush must take place - struct timespec ts_next_flush; + struct timeval ts_next_flush; getNextFlushTime(&ts_next_flush); uint32_t unflushed = 0; @@ -376,11 +337,7 @@ void TFileTransport::writerThread() { // this will only be true when the destructor is being invoked if (closing_) { if (hasIOError) { -#ifndef USE_BOOST_THREAD - pthread_exit(NULL); -#else - return; -#endif + return; } // Try to empty buffers before exit @@ -388,19 +345,15 @@ void TFileTransport::writerThread() { #ifndef _WIN32 fsync(fd_); #endif - if (-1 == ::close(fd_)) { - int errno_copy = errno; + if (-1 == ::THRIFT_CLOSESOCKET(fd_)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); } else { //fd successfully closed fd_ = 0; } -#ifndef USE_BOOST_THREAD - pthread_exit(NULL); -#else return; -#endif - } + } } if (swapEventBuffers(&ts_next_flush)) { @@ -413,16 +366,12 @@ void TFileTransport::writerThread() { while (hasIOError) { T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_); - usleep(writerThreadIOErrorSleepTime_); + THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_); if (closing_) { -#ifndef USE_BOOST_THREAD - pthread_exit(NULL); -#else return; -#endif } if (!fd_) { - ::close(fd_); + ::THRIFT_CLOSESOCKET(fd_); fd_ = 0; } try { @@ -463,7 +412,7 @@ void TFileTransport::writerThread() { memset(zeros, '\0', padding); boost::scoped_array array(zeros); if (-1 == ::write(fd_, zeros, padding)) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy); hasIOError = true; continue; @@ -476,7 +425,7 @@ void TFileTransport::writerThread() { // write the dequeued event to the file if (outEvent->eventSize_ > 0) { if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy); hasIOError = true; continue; @@ -523,11 +472,11 @@ void TFileTransport::writerThread() { if (forced_flush || unflushed > flushMaxBytes_) { flush = true; } else { - struct timespec current_time; - clock_gettime(CLOCK_REALTIME, ¤t_time); + struct timeval current_time; + THRIFT_GETTIMEOFDAY(¤t_time, NULL); if (current_time.tv_sec > ts_next_flush.tv_sec || (current_time.tv_sec == ts_next_flush.tv_sec && - current_time.tv_nsec > ts_next_flush.tv_nsec)) { + current_time.tv_usec > ts_next_flush.tv_usec)) { if (unflushed > 0) { flush = true; } else { @@ -560,15 +509,9 @@ void TFileTransport::writerThread() { void TFileTransport::flush() { // file must be open for writing for any flushing to take place -#ifdef USE_BOOST_THREAD - if (!writerThreadId_.get()) { - return; - } -#else - if (writerThreadId_ <= 0) { + if (!writerThread_.get()) { return; } -#endif // wait for flush to take place Guard g(mutex_); @@ -674,7 +617,7 @@ eventInfo* TFileTransport::readEvent() { } else if (readState_.bufferLen_ == 0) { // EOF // wait indefinitely if there is no timeout if (readTimeout_ == TAIL_READ_TIMEOUT) { - usleep(eofSleepTime_); + THRIFT_SLEEP_USEC(eofSleepTime_); continue; } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { // reset state @@ -686,7 +629,7 @@ eventInfo* TFileTransport::readEvent() { readState_.resetState(0); return NULL; } else { - usleep(readTimeout_ * 1000); + THRIFT_SLEEP_USEC(readTimeout_ * 1000); readTries++; continue; } @@ -818,7 +761,7 @@ void TFileTransport::performRecovery() { // if tailing the file, wait until there is enough data to start // the next chunk while(curChunk == (getNumChunks() - 1)) { - usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US); + THRIFT_SLEEP_USEC(DEFAULT_CORRUPTED_SLEEP_TIME_US); } seekToChunk(curChunk + 1); } else { @@ -910,7 +853,7 @@ uint32_t TFileTransport::getNumChunks() { int rv = fstat(fd_, &f_info); if (rv < 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; throw TTransportException(TTransportException::UNKNOWN, "TFileTransport::getNumChunks() (fstat)", errno_copy); @@ -946,21 +889,22 @@ void TFileTransport::openLogFile() { // make sure open call was successful if(fd_ == -1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy); throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy); } } -void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) { - clock_gettime(CLOCK_REALTIME, ts_next_flush); - ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000; - if (ts_next_flush->tv_nsec > 1000000000) { - ts_next_flush->tv_nsec -= 1000000000; - ts_next_flush->tv_sec += 1; +void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) { + THRIFT_GETTIMEOFDAY(ts_next_flush, NULL); + + ts_next_flush->tv_usec += flushMaxUs_; + if (ts_next_flush->tv_usec > 1000000) { + long extra_secs = ts_next_flush->tv_usec / 1000000; + ts_next_flush->tv_usec %= 1000000; + ts_next_flush->tv_sec += extra_secs; } - ts_next_flush->tv_sec += flushMaxUs_ / 1000000; } TFileTransportBuffer::TFileTransportBuffer(uint32_t size) diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h index 267305d6..75941cf1 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.h +++ b/lib/cpp/src/thrift/transport/TFileTransport.h @@ -27,19 +27,13 @@ #include #include -#ifdef HAVE_PTHREAD_H -#include -#endif - -#ifdef USE_BOOST_THREAD -#include -#endif - #include #include #include #include +#include +#include namespace apache { namespace thrift { namespace transport { @@ -307,13 +301,13 @@ class TFileTransport : public TFileReaderTransport, private: // helper functions for writing to a file void enqueueEvent(const uint8_t* buf, uint32_t eventLen); - bool swapEventBuffers(struct timespec* deadline); + bool swapEventBuffers(struct timeval* deadline); bool initBufferAndWriteThread(); // control for writer thread static void* startWriterThread(void* ptr) { - (((TFileTransport*)ptr)->writerThread()); - return 0; + static_cast(ptr)->writerThread(); + return NULL; } void writerThread(); @@ -326,7 +320,7 @@ class TFileTransport : public TFileReaderTransport, // Utility functions void openLogFile(); - void getNextFlushTime(struct timespec* ts_next_flush); + void getNextFlushTime(struct timeval* ts_next_flush); // Class variables readState readState_; @@ -375,12 +369,9 @@ class TFileTransport : public TFileReaderTransport, uint32_t writerThreadIOErrorSleepTime_; static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; - // writer thread id -#ifdef USE_BOOST_THREAD - std::auto_ptr writerThreadId_; -#else - pthread_t writerThreadId_; -#endif + // writer thread + apache::thrift::concurrency::PlatformThreadFactory threadFactory_; + boost::shared_ptr writerThread_; // buffers to hold data before it is flushed. Each element of the buffer stores a msg that // needs to be written to the file. The buffers are swapped by the writer thread. diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp index 464272d4..0b5080b2 100644 --- a/lib/cpp/src/thrift/transport/TPipe.cpp +++ b/lib/cpp/src/thrift/transport/TPipe.cpp @@ -108,7 +108,7 @@ void TPipe::open() { NULL); // no template file if (hPipe_ == INVALID_HANDLE_VALUE) - sleep(SleepInterval); + ::Sleep(SleepInterval); else break; } diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp index 14c13dc7..bf29c41c 100644 --- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp @@ -37,6 +37,7 @@ #include #include #include "TSSLSocket.h" +#include "PlatformSocket.h" #define OPENSSL_VERSION_NO_THREAD_ID 0x10000000L @@ -105,8 +106,9 @@ bool TSSLSocket::isOpen() { return false; } int shutdown = SSL_get_shutdown(ssl_); - bool shutdownReceived = (shutdown & SSL_RECEIVED_SHUTDOWN); - bool shutdownSent = (shutdown & SSL_SENT_SHUTDOWN); + // "!!" is squelching C4800 "forcing bool -> true or false" perfomance warning + bool shutdownReceived = !!(shutdown & SSL_RECEIVED_SHUTDOWN); + bool shutdownSent = !!(shutdown & SSL_SENT_SHUTDOWN); if (shutdownReceived && shutdownSent) { return false; } @@ -122,7 +124,7 @@ bool TSSLSocket::peek() { uint8_t byte; rc = SSL_peek(ssl_, &byte, 1); if (rc < 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); throw TSSLException("SSL_peek: " + errors); @@ -147,7 +149,7 @@ void TSSLSocket::close() { rc = SSL_shutdown(ssl_); } if (rc < 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); GlobalOutput(("SSL_shutdown: " + errors).c_str()); @@ -166,9 +168,9 @@ uint32_t TSSLSocket::read(uint8_t* buf, uint32_t len) { bytes = SSL_read(ssl_, buf, len); if (bytes >= 0) break; - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; if (SSL_get_error(ssl_, bytes) == SSL_ERROR_SYSCALL) { - if (ERR_get_error() == 0 && errno_copy == EINTR) { + if (ERR_get_error() == 0 && errno_copy == THRIFT_EINTR) { continue; } } @@ -186,7 +188,7 @@ void TSSLSocket::write(const uint8_t* buf, uint32_t len) { while (written < len) { int32_t bytes = SSL_write(ssl_, &buf[written], len - written); if (bytes <= 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); throw TSSLException("SSL_write: " + errors); @@ -206,7 +208,7 @@ void TSSLSocket::flush() { throw TSSLException("SSL_get_wbio returns NULL"); } if (BIO_flush(bio) != 1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); throw TSSLException("BIO_flush: " + errors); @@ -229,7 +231,7 @@ void TSSLSocket::checkHandshake() { rc = SSL_connect(ssl_); } if (rc <= 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string fname(server() ? "SSL_accept" : "SSL_connect"); string errors; buildErrors(errors, errno_copy); @@ -426,7 +428,7 @@ void TSSLSocketFactory::loadCertificate(const char* path, const char* format) { } if (strcmp(format, "PEM") == 0) { if (SSL_CTX_use_certificate_chain_file(ctx_->get(), path) == 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); throw TSSLException("SSL_CTX_use_certificate_chain_file: " + errors); @@ -443,7 +445,7 @@ void TSSLSocketFactory::loadPrivateKey(const char* path, const char* format) { } if (strcmp(format, "PEM") == 0) { if (SSL_CTX_use_PrivateKey_file(ctx_->get(), path, SSL_FILETYPE_PEM) == 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); throw TSSLException("SSL_CTX_use_PrivateKey_file: " + errors); @@ -457,7 +459,7 @@ void TSSLSocketFactory::loadTrustedCertificates(const char* path) { "loadTrustedCertificates: is NULL"); } if (SSL_CTX_load_verify_locations(ctx_->get(), path, NULL) == 0) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; string errors; buildErrors(errors, errno_copy); throw TSSLException("SSL_CTX_load_verify_locations: " + errors); @@ -579,7 +581,7 @@ void buildErrors(string& errors, int errno_copy) { } const char* reason = ERR_reason_error_string(errorCode); if (reason == NULL) { - snprintf(message, sizeof(message) - 1, "SSL error # %lu", errorCode); + THRIFT_SNPRINTF(message, sizeof(message) - 1, "SSL error # %lu", errorCode); reason = message; } errors += reason; @@ -598,7 +600,7 @@ void buildErrors(string& errors, int errno_copy) { * Default implementation of AccessManager */ Decision DefaultClientAccessManager::verify(const sockaddr_storage& sa) - throw() { + throw() { (void) sa; return SKIP; } diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp index 1000367e..cb3833e2 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.cpp +++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp @@ -39,13 +39,13 @@ #include #endif #include -#include #ifdef HAVE_UNISTD_H #include #endif #include "TSocket.h" #include "TServerSocket.h" +#include "PlatformSocket.h" #include #ifndef AF_LOCAL @@ -155,9 +155,9 @@ void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) { } void TServerSocket::listen() { - SOCKET sv[2]; - if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) { - GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno); + THRIFT_SOCKET sv[2]; + if (-1 == THRIFT_SOCKETPAIR(AF_LOCAL, SOCK_STREAM, 0, sv)) { + GlobalOutput.perror("TServerSocket::listen() socketpair() ", THRIFT_GET_SOCKET_ERROR); intSock1_ = -1; intSock2_ = -1; } else { @@ -177,7 +177,7 @@ void TServerSocket::listen() { // Wildcard address error = getaddrinfo(NULL, port, &hints, &res0); if (error) { - GlobalOutput.printf("getaddrinfo %d: %s", error, gai_strerror(error)); + GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error)); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket."); } @@ -196,27 +196,32 @@ void TServerSocket::listen() { } if (serverSocket_ == -1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy); } - // Set reusaddress to prevent 2MSL delay on accept + // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept int one = 1; - if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, + if (-1 == setsockopt(serverSocket_, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, cast_sockopt(&one), sizeof(one))) { - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_REUSEADDR ", errno_copy); + //ignore errors coming out of this setsockopt on Windows. This is because + //SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't + //want to force servers to be an admin. +#ifndef _WIN32 + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ", errno_copy); close(); - throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR", errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, "Could not set THRIFT_NO_SOCKET_CACHING", errno_copy); +#endif } // Set TCP buffer sizes if (tcpSendBuffer_ > 0) { if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF, cast_sockopt(&tcpSendBuffer_), sizeof(tcpSendBuffer_))) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy); @@ -226,7 +231,7 @@ void TServerSocket::listen() { if (tcpRecvBuffer_ > 0) { if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF, cast_sockopt(&tcpRecvBuffer_), sizeof(tcpRecvBuffer_))) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy); @@ -237,7 +242,7 @@ void TServerSocket::listen() { #ifdef TCP_DEFER_ACCEPT if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT, &one, sizeof(one))) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy); @@ -249,7 +254,7 @@ void TServerSocket::listen() { int zero = 0; if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY, cast_sockopt(&zero), sizeof(zero))) { - GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno); + GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR); } } #endif // #ifdef IPV6_V6ONLY @@ -258,7 +263,7 @@ void TServerSocket::listen() { struct linger ling = {0, 0}; if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy); @@ -269,7 +274,7 @@ void TServerSocket::listen() { // TCP Nodelay, speed over bandwidth if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy); @@ -277,21 +282,21 @@ void TServerSocket::listen() { } // Set NONBLOCK on the accept socket - int flags = fcntl(serverSocket_, F_GETFL, 0); + int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0); if (flags == -1) { - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::listen() fcntl() F_GETFL ", errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy); + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); } - if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) { - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::listen() fcntl() O_NONBLOCK ", errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy); + if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); } // prepare the port information - // we may want to try to bind more than once, since SO_REUSEADDR doesn't + // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't // always seem to work. The client can configure the retry variables. int retries = 0; @@ -304,13 +309,13 @@ void TServerSocket::listen() { socklen_t len; if (path_.length() > sizeof(address.sun_path)) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy); throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long"); } address.sun_family = AF_UNIX; - snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); + THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); len = sizeof(address); do { @@ -318,7 +323,7 @@ void TServerSocket::listen() { break; } // use short circuit evaluation here to only sleep if we need to - } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0)); + } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0)); #else GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99); throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path not supported"); @@ -329,7 +334,7 @@ void TServerSocket::listen() { break; } // use short circuit evaluation here to only sleep if we need to - } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0)); + } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0)); // free addrinfo freeaddrinfo(res0); @@ -347,12 +352,12 @@ void TServerSocket::listen() { GlobalOutput(errbuf); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not bind", - errno); + THRIFT_GET_SOCKET_ERROR); } // Call listen if (-1 == ::listen(serverSocket_, acceptBacklog_)) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy); @@ -366,7 +371,7 @@ shared_ptr TServerSocket::acceptImpl() { throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening"); } - struct pollfd fds[2]; + struct THRIFT_POLLFD fds[2]; int maxEintrs = 5; int numEintrs = 0; @@ -374,71 +379,71 @@ shared_ptr TServerSocket::acceptImpl() { while (true) { std::memset(fds, 0 , sizeof(fds)); fds[0].fd = serverSocket_; - fds[0].events = POLLIN; + fds[0].events = THRIFT_POLLIN; if (intSock2_ != -1) { fds[1].fd = intSock2_; - fds[1].events = POLLIN; + fds[1].events = THRIFT_POLLIN; } /* - TODO: if EINTR is received, we'll restart the timeout. + TODO: if THRIFT_EINTR is received, we'll restart the timeout. To be accurate, we need to fix this in the future. */ - int ret = poll(fds, 2, accTimeout_); + int ret = THRIFT_POLL(fds, 2, accTimeout_); if (ret < 0) { // error cases - if (errno == EINTR && (numEintrs++ < maxEintrs)) { - // EINTR needs to be handled manually and we can tolerate + if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && (numEintrs++ < maxEintrs)) { + // THRIFT_EINTR needs to be handled manually and we can tolerate // a certain number continue; } - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::acceptImpl() poll() ", errno_copy); + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_POLL() ", errno_copy); throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy); } else if (ret > 0) { // Check for an interrupt signal - if (intSock2_ != -1 && (fds[1].revents & POLLIN)) { + if (intSock2_ != -1 && (fds[1].revents & THRIFT_POLLIN)) { int8_t buf; if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) { - GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno); + GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", THRIFT_GET_SOCKET_ERROR); } throw TTransportException(TTransportException::INTERRUPTED); } // Check for the actual server socket being ready - if (fds[0].revents & POLLIN) { + if (fds[0].revents & THRIFT_POLLIN) { break; } } else { - GlobalOutput("TServerSocket::acceptImpl() poll 0"); + GlobalOutput("TServerSocket::acceptImpl() THRIFT_POLL 0"); throw TTransportException(TTransportException::UNKNOWN); } } struct sockaddr_storage clientAddress; int size = sizeof(clientAddress); - SOCKET clientSocket = ::accept(serverSocket_, + THRIFT_SOCKET clientSocket = ::accept(serverSocket_, (struct sockaddr *) &clientAddress, (socklen_t *) &size); if (clientSocket == -1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy); throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy); } // Make sure client socket is blocking - int flags = fcntl(clientSocket, F_GETFL, 0); + int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0); if (flags == -1) { - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_GETFL ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy); + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy); + throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_GETFL)", errno_copy); } - if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) { - int errno_copy = errno; - GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_SETFL ~O_NONBLOCK ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy); + if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ", errno_copy); + throw TTransportException(TTransportException::UNKNOWN, "THRIFT_FCNTL(THRIFT_F_SETFL)", errno_copy); } shared_ptr client = createSocket(clientSocket); @@ -453,7 +458,7 @@ shared_ptr TServerSocket::acceptImpl() { return client; } -shared_ptr TServerSocket::createSocket(SOCKET clientSocket) { +shared_ptr TServerSocket::createSocket(THRIFT_SOCKET clientSocket) { return shared_ptr(new TSocket(clientSocket)); } @@ -461,28 +466,21 @@ void TServerSocket::interrupt() { if (intSock1_ != -1) { int8_t byte = 0; if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) { - GlobalOutput.perror("TServerSocket::interrupt() send() ", errno); + GlobalOutput.perror("TServerSocket::interrupt() send() ", THRIFT_GET_SOCKET_ERROR); } } } void TServerSocket::close() { if (serverSocket_ != -1) { - -#ifdef _WIN32 - shutdown(serverSocket_, SD_BOTH); - ::closesocket(serverSocket_); -#else - shutdown(serverSocket_, SHUT_RDWR); - ::close(serverSocket_); -#endif - + shutdown(serverSocket_, THRIFT_SHUT_RDWR); + ::THRIFT_CLOSESOCKET(serverSocket_); } if (intSock1_ != -1) { - ::close(intSock1_); + ::THRIFT_CLOSESOCKET(intSock1_); } if (intSock2_ != -1) { - ::close(intSock2_); + ::THRIFT_CLOSESOCKET(intSock2_); } serverSocket_ = -1; intSock1_ = -1; diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h index e562a194..17a00b6e 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.h +++ b/lib/cpp/src/thrift/transport/TServerSocket.h @@ -21,10 +21,8 @@ #define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1 #include "TServerTransport.h" +#include "PlatformSocket.h" #include -#ifndef _WIN32 - typedef int SOCKET; -#endif namespace apache { namespace thrift { namespace transport { @@ -64,12 +62,12 @@ class TServerSocket : public TServerTransport { protected: boost::shared_ptr acceptImpl(); - virtual boost::shared_ptr createSocket(SOCKET client); + virtual boost::shared_ptr createSocket(THRIFT_SOCKET client); private: int port_; std::string path_; - SOCKET serverSocket_; + THRIFT_SOCKET serverSocket_; int acceptBacklog_; int sendTimeout_; int recvTimeout_; @@ -79,8 +77,8 @@ class TServerSocket : public TServerTransport { int tcpSendBuffer_; int tcpRecvBuffer_; - SOCKET intSock1_; - SOCKET intSock2_; + THRIFT_SOCKET intSock1_; + THRIFT_SOCKET intSock2_; }; }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp index e59f4a1c..ba0b1e35 100644 --- a/lib/cpp/src/thrift/transport/TSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSocket.cpp @@ -39,12 +39,12 @@ #ifdef HAVE_UNISTD_H #include #endif -#include #include #include #include "TSocket.h" #include "TTransportException.h" +#include "PlatformSocket.h" #ifndef SOCKOPT_CAST_T # ifndef _WIN32 @@ -126,7 +126,7 @@ TSocket::TSocket() : cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; } -TSocket::TSocket(SOCKET socket) : +TSocket::TSocket(THRIFT_SOCKET socket) : host_(""), port_(0), path_(""), @@ -158,13 +158,13 @@ bool TSocket::peek() { uint8_t buf; int r = static_cast(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK)); if (r == -1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; #if defined __FreeBSD__ || defined __MACH__ /* shigin: - * freebsd returns -1 and ECONNRESET if socket was closed by + * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by * the other side */ - if (errno_copy == ECONNRESET) + if (errno_copy == THRIFT_ECONNRESET) { close(); return false; @@ -189,7 +189,7 @@ void TSocket::openConnection(struct addrinfo *res) { } if (socket_ == -1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy); throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy); } @@ -220,18 +220,18 @@ void TSocket::openConnection(struct addrinfo *res) { // Set the socket to be non blocking for connect if a timeout exists - int flags = fcntl(socket_, F_GETFL, 0); + int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0); if (connTimeout_ > 0) { - if (-1 == fcntl(socket_, F_SETFL, flags | O_NONBLOCK)) { - int errno_copy = errno; - GlobalOutput.perror("TSocket::open() fcntl() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy); + if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); } } else { - if (-1 == fcntl(socket_, F_SETFL, flags & ~O_NONBLOCK)) { - int errno_copy = errno; - GlobalOutput.perror("TSocket::open() fcntl " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy); + if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy); } } @@ -245,13 +245,13 @@ void TSocket::openConnection(struct addrinfo *res) { socklen_t len; if (path_.length() > sizeof(address.sun_path)) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy); throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long"); } address.sun_family = AF_UNIX; - snprintf(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); + THRIFT_SNPRINTF(address.sun_path, sizeof(address.sun_path), "%s", path_.c_str()); len = sizeof(address); ret = connect(socket_, (struct sockaddr *) &address, len); @@ -269,18 +269,18 @@ void TSocket::openConnection(struct addrinfo *res) { goto done; } - if ((errno != EINPROGRESS) && (errno != EWOULDBLOCK)) { - int errno_copy = errno; + if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS) && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) { + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy); throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy); } - struct pollfd fds[1]; + struct THRIFT_POLLFD fds[1]; std::memset(fds, 0 , sizeof(fds)); fds[0].fd = socket_; - fds[0].events = POLLOUT; - ret = poll(fds, 1, connTimeout_); + fds[0].events = THRIFT_POLLOUT; + ret = THRIFT_POLL(fds, 1, connTimeout_); if (ret > 0) { // Ensure the socket is connected and that there are no errors set @@ -289,7 +289,7 @@ void TSocket::openConnection(struct addrinfo *res) { lon = sizeof(int); int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon); if (ret2 == -1) { - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy); throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy); } @@ -297,7 +297,7 @@ void TSocket::openConnection(struct addrinfo *res) { if (val == 0) { goto done; } - GlobalOutput.perror("TSocket::open() error on socket (after poll) " + getSocketInfo(), val); + GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(), val); throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val); } else if (ret == 0) { // socket timed out @@ -305,15 +305,15 @@ void TSocket::openConnection(struct addrinfo *res) { GlobalOutput(errStr.c_str()); throw TTransportException(TTransportException::NOT_OPEN, "open() timed out"); } else { - // error on poll() - int errno_copy = errno; - GlobalOutput.perror("TSocket::open() poll() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, "poll() failed", errno_copy); + // error on THRIFT_POLL() + int errno_copy = THRIFT_GET_SOCKET_ERROR; + GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy); + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy); } done: // Set socket back to normal mode (blocking) - fcntl(socket_, F_SETFL, flags); + THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags); if (path_.empty()) { setCachedAddress(res->ai_addr, static_cast(res->ai_addrlen)); @@ -367,7 +367,7 @@ void TSocket::local_open(){ error = getaddrinfo(host_.c_str(), port, &hints, &res0); if (error) { - string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(gai_strerror(error)); + string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo() + string(THRIFT_GAI_STRERROR(error)); GlobalOutput(errStr.c_str()); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for client socket."); @@ -396,20 +396,13 @@ void TSocket::local_open(){ void TSocket::close() { if (socket_ != -1) { - -#ifdef _WIN32 - shutdown(socket_, SD_BOTH); - ::closesocket(socket_); -#else - shutdown(socket_, SHUT_RDWR); - ::close(socket_); -#endif - + shutdown(socket_, THRIFT_SHUT_RDWR); + ::THRIFT_CLOSESOCKET(socket_); } socket_ = -1; } -void TSocket::setSocketFD(int socket) { +void TSocket::setSocketFD(THRIFT_SOCKET socket) { if (socket_ != -1) { close(); } @@ -423,10 +416,10 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { int32_t retries = 0; - // EAGAIN can be signalled both when a timeout has occurred and when + // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when // the system is out of resources (an awesome undocumented feature). // The following is an approximation of the time interval under which - // EAGAIN is taken to indicate an out of resources error. + // THRIFT_EAGAIN is taken to indicate an out of resources error. uint32_t eagainThresholdMicros = 0; if (recvTimeout_) { // if a readTimeout is specified along with a max number of recv retries, then @@ -439,55 +432,55 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { // Read from the socket struct timeval begin; if (recvTimeout_ > 0) { - gettimeofday(&begin, NULL); + THRIFT_GETTIMEOFDAY(&begin, NULL); } else { // if there is no read timeout we don't need the TOD to determine whether - // an EAGAIN is due to a timeout or an out-of-resource condition. + // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition. begin.tv_sec = begin.tv_usec = 0; } int got = static_cast(recv(socket_, cast_sockopt(buf), len, 0)); - int errno_copy = errno; //gettimeofday can change errno + int errno_copy = THRIFT_GET_SOCKET_ERROR; //THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR ++g_socket_syscalls; // Check for error on read if (got < 0) { - if (errno_copy == EAGAIN) { + if (errno_copy == THRIFT_EAGAIN) { // if no timeout we can assume that resource exhaustion has occurred. if (recvTimeout_ == 0) { throw TTransportException(TTransportException::TIMED_OUT, - "EAGAIN (unavailable resources)"); + "THRIFT_EAGAIN (unavailable resources)"); } // check if this is the lack of resources or timeout case struct timeval end; - gettimeofday(&end, NULL); + THRIFT_GETTIMEOFDAY(&end, NULL); uint32_t readElapsedMicros = static_cast( ((end.tv_sec - begin.tv_sec) * 1000 * 1000) + (((uint64_t)(end.tv_usec - begin.tv_usec)))); if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) { if (retries++ < maxRecvRetries_) { - usleep(50); + THRIFT_SLEEP_USEC(50); goto try_again; } else { throw TTransportException(TTransportException::TIMED_OUT, - "EAGAIN (unavailable resources)"); + "THRIFT_EAGAIN (unavailable resources)"); } } else { // infer that timeout has been hit throw TTransportException(TTransportException::TIMED_OUT, - "EAGAIN (timed out)"); + "THRIFT_EAGAIN (timed out)"); } } // If interrupted, try again - if (errno_copy == EINTR && retries++ < maxRecvRetries_) { + if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) { goto try_again; } #if defined __FreeBSD__ || defined __MACH__ - if (errno_copy == ECONNRESET) { + if (errno_copy == THRIFT_ECONNRESET) { /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with - * ECONNRESET if peer performed shutdown + * THRIFT_ECONNRESET if peer performed shutdown * edhall: eliminated close() since we do that in the destructor. */ return 0; @@ -504,18 +497,18 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy); // If we disconnect with no linger time - if (errno_copy == ECONNRESET) { - throw TTransportException(TTransportException::NOT_OPEN, "ECONNRESET"); + if (errno_copy == THRIFT_ECONNRESET) { + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ECONNRESET"); } // This ish isn't open - if (errno_copy == ENOTCONN) { - throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN"); + if (errno_copy == THRIFT_ENOTCONN) { + throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN"); } // Timed out! - if (errno_copy == ETIMEDOUT) { - throw TTransportException(TTransportException::TIMED_OUT, "ETIMEDOUT"); + if (errno_copy == THRIFT_ETIMEDOUT) { + throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT"); } // Some other error, whatevz @@ -558,7 +551,7 @@ uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { int flags = 0; #ifdef MSG_NOSIGNAL // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we - // check for the EPIPE return condition and close the socket in that case + // check for the THRIFT_EPIPE return condition and close the socket in that case flags |= MSG_NOSIGNAL; #endif // ifdef MSG_NOSIGNAL @@ -566,14 +559,14 @@ uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { ++g_socket_syscalls; if (b < 0) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { + if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) { return 0; } // Fail on a send error - int errno_copy = errno; + int errno_copy = THRIFT_GET_SOCKET_ERROR; GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy); - if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) { + if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET || errno_copy == THRIFT_ENOTCONN) { close(); throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy); } @@ -614,7 +607,7 @@ void TSocket::setLinger(bool on, int linger) { struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_}; int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l)); if (ret == -1) { - int errno_copy = errno; // Copy errno because we're allocating memory. + int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy); } } @@ -629,7 +622,7 @@ void TSocket::setNoDelay(bool noDelay) { int v = noDelay_ ? 1 : 0; int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v)); if (ret == -1) { - int errno_copy = errno; // Copy errno because we're allocating memory. + int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy); } } @@ -654,11 +647,11 @@ void TSocket::setRecvTimeout(int ms) { recvTimeval_.tv_sec = (int)(recvTimeout_/1000); recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000); - // Copy because poll may modify + // Copy because THRIFT_POLL may modify struct timeval r = recvTimeval_; int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, cast_sockopt(&r), sizeof(r)); if (ret == -1) { - int errno_copy = errno; // Copy errno because we're allocating memory. + int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. GlobalOutput.perror("TSocket::setRecvTimeout() setsockopt() " + getSocketInfo(), errno_copy); } } @@ -680,7 +673,7 @@ void TSocket::setSendTimeout(int ms) { (int)((sendTimeout_%1000)*1000)}; int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, cast_sockopt(&s), sizeof(s)); if (ret == -1) { - int errno_copy = errno; // Copy errno because we're allocating memory. + int errno_copy = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory. GlobalOutput.perror("TSocket::setSendTimeout() setsockopt() " + getSocketInfo(), errno_copy); } } diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h index b916a3e2..60b5b517 100644 --- a/lib/cpp/src/thrift/transport/TSocket.h +++ b/lib/cpp/src/thrift/transport/TSocket.h @@ -25,6 +25,7 @@ #include "TTransport.h" #include "TVirtualTransport.h" #include "TServerSocket.h" +#include "PlatformSocket.h" #ifdef HAVE_ARPA_INET_H #include @@ -35,9 +36,6 @@ #ifdef HAVE_NETDB_H #include #endif -#ifndef _WIN32 - typedef int SOCKET; -#endif namespace apache { namespace thrift { namespace transport { @@ -175,7 +173,7 @@ class TSocket : public TVirtualTransport { void setSendTimeout(int ms); /** - * Set the max number of recv retries in case of an EAGAIN + * Set the max number of recv retries in case of an THRIFT_EAGAIN * error */ void setMaxRecvRetries(int maxRecvRetries); @@ -203,7 +201,7 @@ class TSocket : public TVirtualTransport { /** * Returns the underlying socket file descriptor. */ - SOCKET getSocketFD() { + THRIFT_SOCKET getSocketFD() { return socket_; } @@ -214,7 +212,7 @@ class TSocket : public TVirtualTransport { * * @param fd the descriptor for an already-connected socket */ - void setSocketFD(int fd); + void setSocketFD(THRIFT_SOCKET fd); /* * Returns a cached copy of the peer address. @@ -234,7 +232,7 @@ class TSocket : public TVirtualTransport { /** * Constructor to create socket from raw UNIX handle. */ - TSocket(SOCKET socket); + TSocket(THRIFT_SOCKET socket); /** * Set a cache of the peer address (used when trivially available: e.g. @@ -265,7 +263,7 @@ class TSocket : public TVirtualTransport { std::string path_; /** Underlying UNIX socket handle */ - SOCKET socket_; + THRIFT_SOCKET socket_; /** Connect timeout in ms */ int connTimeout_; @@ -297,9 +295,6 @@ class TSocket : public TVirtualTransport { sockaddr_in6 ipv6; } cachedPeerAddr_; - /** Connection start time */ - timespec startTime_; - /** Whether to use low minimum TCP retransmission timeout */ static bool useLowMinRto_; diff --git a/lib/cpp/src/thrift/transport/TSocketPool.h b/lib/cpp/src/thrift/transport/TSocketPool.h index 48e35bb6..f8c5ddca 100644 --- a/lib/cpp/src/thrift/transport/TSocketPool.h +++ b/lib/cpp/src/thrift/transport/TSocketPool.h @@ -49,7 +49,7 @@ class TSocketPoolServer { int port_; // Socket for the server - SOCKET socket_; + THRIFT_SOCKET socket_; // Last time connecting to this server failed time_t lastFailTime_; diff --git a/lib/cpp/src/thrift/windows/GetTimeOfDay.cpp b/lib/cpp/src/thrift/windows/GetTimeOfDay.cpp index 6201eda5..4f0ac555 100644 --- a/lib/cpp/src/thrift/windows/GetTimeOfDay.cpp +++ b/lib/cpp/src/thrift/windows/GetTimeOfDay.cpp @@ -35,7 +35,7 @@ struct timezone int tz_dsttime; /* type of dst correction */ }; -int gettimeofday(struct timeval * tv, struct timezone * tz) +int thrift_gettimeofday(struct timeval * tv, struct timezone * tz) { FILETIME ft; unsigned __int64 tmpres(0); @@ -50,7 +50,7 @@ int gettimeofday(struct timeval * tv, struct timezone * tz) tmpres |= ft.dwLowDateTime; /*converting file time to unix epoch*/ - tmpres -= DELTA_EPOCH_IN_MICROSECS; + tmpres -= DELTA_EPOCH_IN_MICROSECS; tmpres /= 10; /*convert into microseconds*/ tv->tv_sec = (long)(tmpres / 1000000UL); tv->tv_usec = (long)(tmpres % 1000000UL); @@ -90,3 +90,23 @@ int gettimeofday(struct timeval * tv, struct timezone * tz) return -1; } + +int thrift_sleep(unsigned int seconds) +{ + ::Sleep(seconds * 1000); + return 0; +} +int thrift_usleep(unsigned int microseconds) +{ + unsigned int milliseconds = (microseconds + 999)/ 1000; + ::Sleep(milliseconds); + return 0; +} + +char *thrift_ctime_r(const time_t *_clock, char *_buf) +{ + strcpy(_buf, ctime(_clock)); + return _buf; +} + + diff --git a/lib/cpp/src/thrift/windows/GetTimeOfDay.h b/lib/cpp/src/thrift/windows/GetTimeOfDay.h index f6bdf1cd..25ed2540 100644 --- a/lib/cpp/src/thrift/windows/GetTimeOfDay.h +++ b/lib/cpp/src/thrift/windows/GetTimeOfDay.h @@ -28,6 +28,16 @@ #error This is a MSVC header only. #endif -int gettimeofday(struct timeval * tv, struct timezone * tz); +#include "config.h" + +struct thrift_timespec { + int64_t tv_sec; + int64_t tv_nsec; +}; + +int thrift_gettimeofday(struct timeval * tv, struct timezone * tz); +int thrift_sleep(unsigned int seconds); +int thrift_usleep(unsigned int micro_seconds); +char *thrift_ctime_r(const time_t *_clock, char *_buf); #endif // _THRIFT_WINDOWS_GETTIMEOFDAY_H_ diff --git a/lib/cpp/src/thrift/windows/SocketPair.cpp b/lib/cpp/src/thrift/windows/SocketPair.cpp index bca8d927..45aa7e86 100644 --- a/lib/cpp/src/thrift/windows/SocketPair.cpp +++ b/lib/cpp/src/thrift/windows/SocketPair.cpp @@ -35,13 +35,13 @@ // Win32 #include -int socketpair(int d, int type, int protocol, SOCKET sv[2]) +int thrift_socketpair(int d, int type, int protocol, THRIFT_SOCKET sv[2]) { union { struct sockaddr_in inaddr; struct sockaddr addr; } a; - SOCKET listener; + THRIFT_SOCKET listener; int e; socklen_t addrlen = sizeof(a.inaddr); DWORD flags = 0; @@ -63,9 +63,11 @@ int socketpair(int d, int type, int protocol, SOCKET sv[2]) sv[0] = sv[1] = INVALID_SOCKET; do { - if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, - (char*) &reuse, (socklen_t) sizeof(reuse)) == -1) - break; + //ignore errors coming out of this setsockopt. This is because + //SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't + //want to force socket pairs to be an admin. + setsockopt(listener, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, + (char*) &reuse, (socklen_t) sizeof(reuse)); if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) break; if (getsockname(listener, &a.addr, &addrlen) == SOCKET_ERROR) diff --git a/lib/cpp/src/thrift/windows/SocketPair.h b/lib/cpp/src/thrift/windows/SocketPair.h index 9d02998e..1de5613e 100644 --- a/lib/cpp/src/thrift/windows/SocketPair.h +++ b/lib/cpp/src/thrift/windows/SocketPair.h @@ -30,7 +30,8 @@ // Win32 #include +#include "config.h" -int socketpair(int d, int type, int protocol, SOCKET sv[2]); +int thrift_socketpair(int d, int type, int protocol, THRIFT_SOCKET sv[2]); #endif // _THRIFT_WINDOWS_SOCKETPAIR_H_ diff --git a/lib/cpp/src/thrift/windows/StdAfx.cpp b/lib/cpp/src/thrift/windows/StdAfx.cpp index 5e49487b..e69de29b 100644 --- a/lib/cpp/src/thrift/windows/StdAfx.cpp +++ b/lib/cpp/src/thrift/windows/StdAfx.cpp @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "stdafx.h" diff --git a/lib/cpp/src/thrift/windows/StdAfx.h b/lib/cpp/src/thrift/windows/StdAfx.h index e6ebbbaa..e69de29b 100644 --- a/lib/cpp/src/thrift/windows/StdAfx.h +++ b/lib/cpp/src/thrift/windows/StdAfx.h @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_WINDOWS_STDAFX_H_ -#define _THRIFT_WINDOWS_STDAFX_H_ - -#if defined(_MSC_VER) && (_MSC_VER > 1200) -#pragma once -#endif // _MSC_VER - -#ifndef _WIN32 -#error This is a MSVC header only. -#endif - -#include "TargetVersion.h" -#include "Config.h" - -// Exclude rarely-used stuff from Windows headers -#ifndef WIN32_LEAN_AND_MEAN -#define WIN32_LEAN_AND_MEAN -#endif - -#include - -#endif // _THRIFT_WINDOWS_STDAFX_H_ diff --git a/lib/cpp/src/thrift/windows/TWinsockSingleton.cpp b/lib/cpp/src/thrift/windows/TWinsockSingleton.cpp index aae25ab5..a1c7e498 100644 --- a/lib/cpp/src/thrift/windows/TWinsockSingleton.cpp +++ b/lib/cpp/src/thrift/windows/TWinsockSingleton.cpp @@ -17,16 +17,22 @@ * under the License. */ -#include "StdAfx.h" #include "TWinsockSingleton.h" // boost #include +#include namespace apache { namespace thrift { namespace transport { TWinsockSingleton::instance_ptr TWinsockSingleton::instance_ptr_(NULL); +#if USE_BOOST_THREAD boost::once_flag TWinsockSingleton::flags_ = BOOST_ONCE_INIT; +#elif USE_STD_THREAD +std::once_flag TWinsockSingleton::flags_; +#else +#error For windows you must choose USE_BOOST_THREAD or USE_STD_THREAD +#endif //------------------------------------------------------------------------------ TWinsockSingleton::TWinsockSingleton(void) @@ -51,7 +57,11 @@ TWinsockSingleton::~TWinsockSingleton(void) //------------------------------------------------------------------------------ void TWinsockSingleton::create(void) { +#if USE_BOOST_THREAD boost::call_once(init, flags_); +#elif USE_STD_THREAD + std::call_once(flags_, init); +#endif } //------------------------------------------------------------------------------ diff --git a/lib/cpp/src/thrift/windows/TWinsockSingleton.h b/lib/cpp/src/thrift/windows/TWinsockSingleton.h index 134c7b0d..f6e4b8c6 100644 --- a/lib/cpp/src/thrift/windows/TWinsockSingleton.h +++ b/lib/cpp/src/thrift/windows/TWinsockSingleton.h @@ -28,10 +28,19 @@ #error This is a MSVC header only. #endif +#include + // boost #include #include + +#if USE_BOOST_THREAD #include +#elif USE_STD_THREAD +#include +#else +#error For windows you must choose USE_BOOST_THREAD or USE_STD_THREAD +#endif namespace apache { namespace thrift { namespace transport { @@ -48,7 +57,9 @@ public: private: +#if USE_BOOST_THREAD friend void boost::call_once(void (*func)(void), boost::once_flag& flag); +#endif private: @@ -69,7 +80,13 @@ private: private: static instance_ptr instance_ptr_; +#if USE_BOOST_THREAD static boost::once_flag flags_; +#elif USE_STD_THREAD + static std::once_flag flags_; +#else +#error Need a non-Boost non-C++11 way to track single initialization here. +#endif }; }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/windows/TargetVersion.h b/lib/cpp/src/thrift/windows/TargetVersion.h index 35c093f7..e69de29b 100644 --- a/lib/cpp/src/thrift/windows/TargetVersion.h +++ b/lib/cpp/src/thrift/windows/TargetVersion.h @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THIRFT_WINDOWS_TARGETVERSION_H_ -#define _THIRFT_WINDOWS_TARGETVERSION_H_ - -#if defined(_MSC_VER) && (_MSC_VER > 1200) -#pragma once -#endif // _MSC_VER - -#ifndef _WIN32 -#error This is a MSVC header only. -#endif - -#endif //_THIRFT_WINDOWS_TARGETVERSION_H_ diff --git a/lib/cpp/src/thrift/windows/WinFcntl.cpp b/lib/cpp/src/thrift/windows/WinFcntl.cpp index da2f73a4..86132e92 100644 --- a/lib/cpp/src/thrift/windows/WinFcntl.cpp +++ b/lib/cpp/src/thrift/windows/WinFcntl.cpp @@ -19,19 +19,19 @@ #include "WinFcntl.h" -int fcntl(SOCKET fd, int cmd, int flags) +int thrift_fcntl(THRIFT_SOCKET fd, int cmd, int flags) { - if(cmd != F_GETFL && cmd != F_SETFL) + if(cmd != THRIFT_F_GETFL && cmd != THRIFT_F_SETFL) { return -1; } - if(flags != O_NONBLOCK && flags != 0) + if(flags != THRIFT_O_NONBLOCK && flags != 0) { return -1; } - if(cmd == F_GETFL) + if(cmd == THRIFT_F_GETFL) { return 0; } @@ -48,3 +48,57 @@ int fcntl(SOCKET fd, int cmd, int flags) return res; } + +#if WINVER <= 0x0502 //XP, Server2003 +int thrift_poll(THRIFT_POLLFD *fdArray, ULONG nfds, INT timeout) +{ + fd_set read_fds, write_fds; + fd_set* read_fds_ptr = NULL; + fd_set* write_fds_ptr = NULL; + + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + + for(ULONG i=0; i= 0) { + timeval time_out = {timeout / 1000, timeout * 1000}; + time_out_ptr = &time_out; + } + else { //to avoid compiler warnings + (void)time_out; + (void)timeout; + } + + int sktready = select(1, read_fds_ptr, write_fds_ptr, NULL, time_out_ptr); + if(sktready > 0) { + for(ULONG i=0; i +#include -#define O_NONBLOCK 1 - -enum -{ - F_GETFL, - F_SETFL, +#if WINVER <= 0x0502 //XP, Server2003 +struct thrift_pollfd { + THRIFT_SOCKET fd; + SHORT events; + SHORT revents; }; +#endif -int fcntl(SOCKET fd, int cmd, int flags); +int thrift_fcntl(THRIFT_SOCKET fd, int cmd, int flags); +int thrift_poll(THRIFT_POLLFD *fdArray, ULONG nfds, INT timeout); #endif // _THRIFT_WINDOWS_FCNTL_H_ diff --git a/lib/cpp/src/thrift/windows/config.h b/lib/cpp/src/thrift/windows/config.h index d8b814c1..25230c5a 100644 --- a/lib/cpp/src/thrift/windows/config.h +++ b/lib/cpp/src/thrift/windows/config.h @@ -28,123 +28,63 @@ #error This is a MSVC header only. #endif -#pragma warning(disable: 4996) // Depreciated posix name. -#pragma warning(disable: 4250) // Inherits via dominance. +// use std::thread in MSVC11 (2012) or newer +#if _MSC_VER >= 1700 +# define USE_STD_THREAD 1 +// otherwise use boost threads +#else +# define USE_BOOST_THREAD 1 +#endif + +#ifndef TARGET_WIN_XP +# define TARGET_WIN_XP 1 +#endif + +#if TARGET_WIN_XP +# ifndef WINVER +# define WINVER 0x0501 +# endif +# ifndef _WIN32_WINNT +# define _WIN32_WINNT 0x0501 +# endif +#endif + +#ifndef _WIN32_WINNT +# define _WIN32_WINNT 0x0601 +#endif + +#pragma warning(disable: 4996) // Deprecated posix name. #define VERSION "1.0.0-dev" #define HAVE_GETTIMEOFDAY 1 #define HAVE_SYS_STAT_H 1 -#include "TargetVersion.h" +#ifdef HAVE_STDINT_H +# include +#else +# include + +typedef boost::int64_t int64_t; +typedef boost::uint64_t uint64_t; +typedef boost::int32_t int32_t; +typedef boost::uint32_t uint32_t; +typedef boost::int16_t int16_t; +typedef boost::uint16_t uint16_t; +typedef boost::int8_t int8_t; +typedef boost::uint8_t uint8_t; +#endif + +#include #include "GetTimeOfDay.h" #include "Operators.h" #include "TWinsockSingleton.h" #include "WinFcntl.h" #include "SocketPair.h" -// boost -#include - -typedef boost::int64_t int64_t; -typedef boost::uint32_t uint32_t; -typedef boost::uint8_t uint8_t; - // windows #include #include #pragma comment(lib, "Ws2_32.lib") #pragma comment(lib, "advapi32.lib") //For security APIs in TPipeServer -// pthreads -#if 0 -# include -#else -struct timespec { - int64_t tv_sec; - int64_t tv_nsec; -}; -# define USE_BOOST_THREAD 1 -# define ctime_r( _clock, _buf ) \ - ( strcpy( (_buf), ctime( (_clock) ) ), \ - (_buf) ) -#endif - -typedef ptrdiff_t ssize_t; - -// Missing functions. -#define usleep(ms) Sleep(ms) -inline int sleep(DWORD ms) -{ - Sleep(ms); - return 0; -} - -#if WINVER <= 0x0502 //XP, Server2003 -#define POLLIN 0x0300 -#define POLLOUT 0x0010 -#define poll(fds, nfds, timeout) \ - poll_win32(fds, nfds, timeout) - -typedef struct pollfd { - SOCKET fd; - SHORT events; - SHORT revents; -} WSAPOLLFD, *PWSAPOLLFD, FAR *LPWSAPOLLFD; - -inline int poll_win32(LPWSAPOLLFD fdArray, ULONG nfds, INT timeout) -{ - fd_set read_fds, write_fds; - fd_set* read_fds_ptr = NULL; - fd_set* write_fds_ptr = NULL; - - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); - - for(ULONG i=0; i= 0) { - timeval time_out = {timeout / 1000, timeout * 1000}; - time_out_ptr = &time_out; - } - else { //to avoid compiler warnings - (void)time_out; - (void)timeout; - } - - int sktready = select(1, read_fds_ptr, write_fds_ptr, NULL, time_out_ptr); - if(sktready > 0) { - for(ULONG i=0; i 1200) -# pragma once -#endif // _MSC_VER - -#ifndef _WIN32 -# error This is a MSVC header only. -#endif - -#ifndef NOMINMAX -# define NOMINMAX -#endif -#ifndef USE_BOOST_THREAD -# define BOOST_ALL_NO_LIB 1 -# define BOOST_THREAD_NO_LIB 1 -#endif -#define TARGET_WIN_XP - -#ifdef TARGET_WIN_XP -# ifndef WINVER -# define WINVER 0x0501 -# endif -# ifndef _WIN32_WINNT -# define _WIN32_WINNT 0x0501 -# endif -#endif - -#ifndef _WIN32_WINNT -# define _WIN32_WINNT 0x0601 -#endif - -#include "config.h" - -#undef gai_strerror -#define gai_strerror gai_strerrorA - -#undef errno -#undef EINTR -#undef EINPROGRESS -#undef ECONNRESET -#undef ENOTCONN -#undef ETIMEDOUT -#undef EWOULDBLOCK -#undef EAGAIN -#undef EPIPE -#define errno ::WSAGetLastError() -#define EINPROGRESS WSAEINPROGRESS -#define EAGAIN WSAEWOULDBLOCK -#define EINTR WSAEINTR -#define ECONNRESET WSAECONNRESET -#define ENOTCONN WSAENOTCONN -#define ETIMEDOUT WSAETIMEDOUT -#define EWOULDBLOCK WSAEWOULDBLOCK -#define EPIPE WSAECONNRESET - -#endif // _THRIFT_WINDOWS_FORCEINC_H_ -- 2.17.1