From b69d24dbf71176bff8b8ad6cf4f59605c9cdd322 Mon Sep 17 00:00:00 2001 From: Roger Meier Date: Thu, 4 Oct 2012 18:02:15 +0000 Subject: [PATCH] THRIFT-1690 Sockets and Pipe Handles truncated on Win64 Patch: Ben Craig git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1394182 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/thrift/concurrency/BoostMonitor.cpp | 4 +- .../src/thrift/concurrency/TimerManager.cpp | 10 +- lib/cpp/src/thrift/concurrency/Util.cpp | 1 + lib/cpp/src/thrift/concurrency/Util.h | 2 +- .../src/thrift/protocol/TBinaryProtocol.tcc | 4 +- .../src/thrift/protocol/TDebugProtocol.cpp | 20 +- .../src/thrift/protocol/TDenseProtocol.cpp | 6 +- lib/cpp/src/thrift/protocol/TJSONProtocol.cpp | 42 +++-- lib/cpp/src/thrift/protocol/TProtocol.h | 7 +- .../src/thrift/server/TThreadPoolServer.cpp | 2 +- .../thrift/transport/TBufferTransports.cpp | 24 +-- .../src/thrift/transport/TBufferTransports.h | 8 +- lib/cpp/src/thrift/transport/TFDTransport.cpp | 8 +- .../src/thrift/transport/TFileTransport.cpp | 8 +- lib/cpp/src/thrift/transport/TFileTransport.h | 2 +- lib/cpp/src/thrift/transport/THttpClient.cpp | 6 +- lib/cpp/src/thrift/transport/THttpServer.cpp | 5 +- .../src/thrift/transport/THttpTransport.cpp | 2 +- lib/cpp/src/thrift/transport/TPipe.cpp | 167 +++++++---------- lib/cpp/src/thrift/transport/TPipe.h | 30 +-- lib/cpp/src/thrift/transport/TPipeServer.cpp | 171 ++++++------------ lib/cpp/src/thrift/transport/TPipeServer.h | 34 ++-- .../src/thrift/transport/TServerSocket.cpp | 28 +-- lib/cpp/src/thrift/transport/TServerSocket.h | 11 +- lib/cpp/src/thrift/transport/TSocket.cpp | 38 ++-- lib/cpp/src/thrift/transport/TSocket.h | 9 +- lib/cpp/src/thrift/transport/TSocketPool.cpp | 6 +- lib/cpp/src/thrift/transport/TSocketPool.h | 6 +- lib/cpp/src/thrift/windows/SocketPair.cpp | 13 +- lib/cpp/src/thrift/windows/SocketPair.h | 5 +- lib/cpp/src/thrift/windows/StdAfx.h | 3 + lib/cpp/src/thrift/windows/WinFcntl.cpp | 7 +- lib/cpp/src/thrift/windows/WinFcntl.h | 5 +- lib/cpp/src/thrift/windows/config.h | 9 +- lib/cpp/src/thrift/windows/force_inc.h | 18 +- 35 files changed, 351 insertions(+), 370 deletions(-) diff --git a/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp b/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp index 2adf7d73..dd318fea 100644 --- a/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp +++ b/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp @@ -114,8 +114,8 @@ class Monitor::Impl : public boost::condition_variable_any { struct timespec currenttime; Util::toTimespec(currenttime, Util::currentTime()); - long tv_sec = abstime->tv_sec - currenttime.tv_sec; - long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec; + long tv_sec = static_cast(abstime->tv_sec - currenttime.tv_sec); + long tv_nsec = static_cast(abstime->tv_nsec - currenttime.tv_nsec); if(tv_sec < 0) tv_sec = 0; if(tv_nsec < 0) diff --git a/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/lib/cpp/src/thrift/concurrency/TimerManager.cpp index f7acd0ad..8be8a6e7 100644 --- a/lib/cpp/src/thrift/concurrency/TimerManager.cpp +++ b/lib/cpp/src/thrift/concurrency/TimerManager.cpp @@ -102,7 +102,7 @@ class TimerManager::Dispatcher: public Runnable { assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0)); try { manager_->monitor_.wait(timeout); - } catch (TimedOutException &e) {} + } catch (TimedOutException &) {} now = Util::currentTime(); } @@ -140,12 +140,20 @@ class TimerManager::Dispatcher: public Runnable { friend class TimerManager; }; +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable: 4355) // 'this' used in base member initializer list +#endif + TimerManager::TimerManager() : taskCount_(0), state_(TimerManager::UNINITIALIZED), dispatcher_(shared_ptr(new Dispatcher(this))) { } +#if defined(_MSC_VER) +#pragma warning(pop) +#endif TimerManager::~TimerManager() { diff --git a/lib/cpp/src/thrift/concurrency/Util.cpp b/lib/cpp/src/thrift/concurrency/Util.cpp index 73b0c052..764b6f5d 100644 --- a/lib/cpp/src/thrift/concurrency/Util.cpp +++ b/lib/cpp/src/thrift/concurrency/Util.cpp @@ -38,6 +38,7 @@ int64_t Util::currentTimeTicks(int64_t ticksPerSec) { struct timespec now; int ret = clock_gettime(CLOCK_REALTIME, &now); assert(ret == 0); + ret = ret; //squelching "unused variable" warning toTicks(result, now, ticksPerSec); #elif defined(HAVE_GETTIMEOFDAY) struct timeval now; diff --git a/lib/cpp/src/thrift/concurrency/Util.h b/lib/cpp/src/thrift/concurrency/Util.h index 8ef07eff..ee67fc72 100644 --- a/lib/cpp/src/thrift/concurrency/Util.h +++ b/lib/cpp/src/thrift/concurrency/Util.h @@ -67,7 +67,7 @@ class Util { } static void toTimeval(struct timeval& result, int64_t value) { - result.tv_sec = value / MS_PER_S; // ms to s + result.tv_sec = (uint32_t)(value / MS_PER_S); // ms to s result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us } diff --git a/lib/cpp/src/thrift/protocol/TBinaryProtocol.tcc b/lib/cpp/src/thrift/protocol/TBinaryProtocol.tcc index f3f38f70..294c8312 100644 --- a/lib/cpp/src/thrift/protocol/TBinaryProtocol.tcc +++ b/lib/cpp/src/thrift/protocol/TBinaryProtocol.tcc @@ -178,7 +178,9 @@ uint32_t TBinaryProtocolT::writeDouble(const double dub) { template template uint32_t TBinaryProtocolT::writeString(const StrType& str) { - uint32_t size = str.size(); + if(str.size() > static_cast((std::numeric_limits::max)())) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + uint32_t size = static_cast(str.size()); uint32_t result = writeI32((int32_t)size); if (size > 0) { this->trans_->write((uint8_t*)str.data(), size); diff --git a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp b/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp index 28e175b6..8b69df41 100644 --- a/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp +++ b/lib/cpp/src/thrift/protocol/TDebugProtocol.cpp @@ -32,6 +32,7 @@ using std::string; static string byte_to_hex(const uint8_t byte) { char buf[3]; int ret = std::sprintf(buf, "%02x", (int)byte); + ret = ret; //squelching "unused variable" warning assert(ret == 2); assert(buf[2] == '\0'); return buf; @@ -74,14 +75,23 @@ void TDebugProtocol::indentDown() { } uint32_t TDebugProtocol::writePlain(const string& str) { - trans_->write((uint8_t*)str.data(), str.length()); - return str.length(); + if(str.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + trans_->write((uint8_t*)str.data(), static_cast(str.length())); + return static_cast(str.length()); } uint32_t TDebugProtocol::writeIndented(const string& str) { - trans_->write((uint8_t*)indent_str_.data(), indent_str_.length()); - trans_->write((uint8_t*)str.data(), str.length()); - return indent_str_.length() + str.length(); + if(str.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + if(indent_str_.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + uint64_t total_len = indent_str_.length() + str.length(); + if(total_len > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + trans_->write((uint8_t*)indent_str_.data(), static_cast(indent_str_.length())); + trans_->write((uint8_t*)str.data(), static_cast(str.length())); + return static_cast(indent_str_.length() + str.length()); } uint32_t TDebugProtocol::startItem() { diff --git a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp index 35400d1d..1eb444b5 100644 --- a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp +++ b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp @@ -245,7 +245,7 @@ inline uint32_t TDenseProtocol::vlqWrite(uint64_t vlq) { while (vlq > 0) { assert(pos >= 0); - buf[pos] = (vlq | 0x80); + buf[pos] = static_cast(vlq | 0x80); vlq >>= 7; pos--; } @@ -463,7 +463,9 @@ inline uint32_t TDenseProtocol::subWriteI32(const int32_t i32) { } uint32_t TDenseProtocol::subWriteString(const std::string& str) { - uint32_t size = str.size(); + if(str.size() > static_cast((std::numeric_limits::max)())) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + uint32_t size = static_cast(str.size()); uint32_t xfer = subWriteI32((int32_t)size); if (size > 0) { trans_->write((uint8_t*)str.data(), size); diff --git a/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp b/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp index 6f1612e8..09518760 100644 --- a/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp +++ b/lib/cpp/src/thrift/protocol/TJSONProtocol.cpp @@ -380,7 +380,7 @@ void TJSONProtocol::popContext() { // Write the character ch as a JSON escape sequence ("\u00xx") uint32_t TJSONProtocol::writeJSONEscapeChar(uint8_t ch) { trans_->write((const uint8_t *)kJSONEscapePrefix.c_str(), - kJSONEscapePrefix.length()); + static_cast(kJSONEscapePrefix.length())); uint8_t outCh = hexChar(ch >> 4); trans_->write(&outCh, 1); outCh = hexChar(ch); @@ -442,7 +442,9 @@ uint32_t TJSONProtocol::writeJSONBase64(const std::string &str) { trans_->write(&kJSONStringDelimiter, 1); uint8_t b[4]; const uint8_t *bytes = (const uint8_t *)str.c_str(); - uint32_t len = str.length(); + if(str.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + uint32_t len = static_cast(str.length()); while (len >= 3) { // Encode 3 bytes at a time base64_encode(bytes, 3, b); @@ -471,8 +473,10 @@ uint32_t TJSONProtocol::writeJSONInteger(NumberType num) { trans_->write(&kJSONStringDelimiter, 1); result += 1; } - trans_->write((const uint8_t *)val.c_str(), val.length()); - result += val.length(); + if(val.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + trans_->write((const uint8_t *)val.c_str(), static_cast(val.length())); + result += static_cast(val.length()); if (escapeNum) { trans_->write(&kJSONStringDelimiter, 1); result += 1; @@ -512,8 +516,10 @@ uint32_t TJSONProtocol::writeJSONDouble(double num) { trans_->write(&kJSONStringDelimiter, 1); result += 1; } - trans_->write((const uint8_t *)val.c_str(), val.length()); - result += val.length(); + if(val.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + trans_->write((const uint8_t *)val.c_str(), static_cast(val.length())); + result += static_cast(val.length()); if (escapeNum) { trans_->write(&kJSONStringDelimiter, 1); result += 1; @@ -721,7 +727,9 @@ uint32_t TJSONProtocol::readJSONBase64(std::string &str) { std::string tmp; uint32_t result = readJSONString(tmp); uint8_t *b = (uint8_t *)tmp.c_str(); - uint32_t len = tmp.length(); + if(tmp.length() > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + uint32_t len = static_cast(tmp.length()); str.clear(); while (len >= 4) { base64_decode(b, 4); @@ -869,7 +877,9 @@ uint32_t TJSONProtocol::readMessageBegin(std::string& name, result += readJSONInteger(tmpVal); messageType = (TMessageType)tmpVal; result += readJSONInteger(tmpVal); - seqid = tmpVal; + if(tmpVal > static_cast((std::numeric_limits::max)())) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + seqid = static_cast(tmpVal); return result; } @@ -900,7 +910,9 @@ uint32_t TJSONProtocol::readFieldBegin(std::string& name, uint64_t tmpVal = 0; std::string tmpStr; result += readJSONInteger(tmpVal); - fieldId = tmpVal; + if(tmpVal > static_cast((std::numeric_limits::max)())) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + fieldId = static_cast(tmpVal); result += readJSONObjectStart(); result += readJSONString(tmpStr); fieldType = getTypeIDForTypeName(tmpStr); @@ -923,7 +935,9 @@ uint32_t TJSONProtocol::readMapBegin(TType& keyType, result += readJSONString(tmpStr); valType = getTypeIDForTypeName(tmpStr); result += readJSONInteger(tmpVal); - size = tmpVal; + if(tmpVal > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + size = static_cast(tmpVal); result += readJSONObjectStart(); return result; } @@ -940,7 +954,9 @@ uint32_t TJSONProtocol::readListBegin(TType& elemType, result += readJSONString(tmpStr); elemType = getTypeIDForTypeName(tmpStr); result += readJSONInteger(tmpVal); - size = tmpVal; + if(tmpVal > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + size = static_cast(tmpVal); return result; } @@ -956,7 +972,9 @@ uint32_t TJSONProtocol::readSetBegin(TType& elemType, result += readJSONString(tmpStr); elemType = getTypeIDForTypeName(tmpStr); result += readJSONInteger(tmpVal); - size = tmpVal; + if(tmpVal > (std::numeric_limits::max)()) + throw TProtocolException(TProtocolException::SIZE_LIMIT); + size = static_cast(tmpVal); return result; } diff --git a/lib/cpp/src/thrift/protocol/TProtocol.h b/lib/cpp/src/thrift/protocol/TProtocol.h index 77b018e3..09ce7c4a 100644 --- a/lib/cpp/src/thrift/protocol/TProtocol.h +++ b/lib/cpp/src/thrift/protocol/TProtocol.h @@ -129,10 +129,13 @@ using apache::thrift::transport::TTransport; # include # define ntohll(n) bswap_64(n) # define htonll(n) bswap_64(n) -# else /* GNUC & GLIBC */ +# elif defined(_MSC_VER) /* Microsoft Visual C++ */ +# define ntohll(n) ( _byteswap_uint64((uint64_t)n) ) +# define htonll(n) ( _byteswap_uint64((uint64_t)n) ) +# else /* Not GNUC/GLIBC or MSVC */ # define ntohll(n) ( (((uint64_t)ntohl(n)) << 32) + ntohl(n >> 32) ) # define htonll(n) ( (((uint64_t)htonl(n)) << 32) + htonl(n >> 32) ) -# endif /* GNUC & GLIBC */ +# endif /* GNUC/GLIBC or MSVC or something else */ #else /* __THRIFT_BYTE_ORDER */ # error "Can't define htonll or ntohll!" #endif diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp index db639511..d3b97142 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp @@ -72,7 +72,7 @@ public: break; } } - } catch (const TTransportException& ttx) { + } catch (const TTransportException&) { // This is reasonably expected, client didn't send a full request so just // ignore him // string errStr = string("TThreadPoolServer client died: ") + ttx.what(); diff --git a/lib/cpp/src/thrift/transport/TBufferTransports.cpp b/lib/cpp/src/thrift/transport/TBufferTransports.cpp index a2cc0663..8d7b670d 100644 --- a/lib/cpp/src/thrift/transport/TBufferTransports.cpp +++ b/lib/cpp/src/thrift/transport/TBufferTransports.cpp @@ -28,7 +28,7 @@ namespace apache { namespace thrift { namespace transport { uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) { - uint32_t have = rBound_ - rBase_; + uint32_t have = static_cast(rBound_ - rBase_); // We should only take the slow path if we can't satisfy the read // with the data already in the buffer. @@ -52,7 +52,7 @@ uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) { setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); // Hand over whatever we have. - uint32_t give = std::min(len, static_cast(rBound_ - rBase_)); + uint32_t give = (std::min)(len, static_cast(rBound_ - rBase_)); memcpy(buf, rBase_, give); rBase_ += give; @@ -60,8 +60,8 @@ uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) { } void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) { - uint32_t have_bytes = wBase_ - wBuf_.get(); - uint32_t space = wBound_ - wBase_; + uint32_t have_bytes = static_cast(wBase_ - wBuf_.get()); + uint32_t space = static_cast(wBound_ - wBase_); // We should only take the slow path if we can't accomodate the write // with the free space already in the buffer. assert(wBound_ - wBase_ < static_cast(len)); @@ -118,7 +118,7 @@ const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { void TBufferedTransport::flush() { // Write out any data waiting in the write buffer. - uint32_t have_bytes = wBase_ - wBuf_.get(); + uint32_t have_bytes = static_cast(wBase_ - wBuf_.get()); if (have_bytes > 0) { // Note that we reset wBase_ prior to the underlying write // to ensure we're in a sane state (i.e. internal buffer cleaned) @@ -134,7 +134,7 @@ void TBufferedTransport::flush() { uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { uint32_t want = len; - uint32_t have = rBound_ - rBase_; + uint32_t have = static_cast(rBound_ - rBase_); // We should only take the slow path if we can't satisfy the read // with the data already in the buffer. @@ -159,7 +159,7 @@ uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { // TODO(dreiss): Should we warn when reads cross frames? // Hand over whatever we have. - uint32_t give = std::min(want, static_cast(rBound_ - rBase_)); + uint32_t give = (std::min)(want, static_cast(rBound_ - rBase_)); memcpy(buf, rBase_, give); rBase_ += give; want -= give; @@ -212,7 +212,7 @@ bool TFramedTransport::readFrame() { void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) { // Double buffer size until sufficient. - uint32_t have = wBase_ - wBuf_.get(); + uint32_t have = static_cast(wBase_ - wBuf_.get()); uint32_t new_size = wBufSize_; if (len + have < have /* overflow */ || len + have > 0x7fffffff) { throw TTransportException(TTransportException::BAD_ARGS, @@ -247,7 +247,7 @@ void TFramedTransport::flush() { assert(wBufSize_ > sizeof(sz_nbo)); // Slip the frame size into the start of the buffer. - sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo)); + sz_hbo = static_cast(wBase_ - (wBuf_.get() + sizeof(sz_nbo))); sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo)); memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo)); @@ -267,7 +267,7 @@ void TFramedTransport::flush() { } uint32_t TFramedTransport::writeEnd() { - return wBase_ - wBuf_.get(); + return static_cast(wBase_ - wBuf_.get()); } const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { @@ -281,7 +281,7 @@ const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { uint32_t TFramedTransport::readEnd() { // include framing bytes - return rBound_ - rBuf_.get() + sizeof(uint32_t); + return static_cast(rBound_ - rBuf_.get() + sizeof(uint32_t)); } void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) { @@ -289,7 +289,7 @@ void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out rBound_ = wBase_; // Decide how much to give. - uint32_t give = std::min(len, available_read()); + uint32_t give = (std::min)(len, available_read()); *out_start = rBase_; *out_give = give; diff --git a/lib/cpp/src/thrift/transport/TBufferTransports.h b/lib/cpp/src/thrift/transport/TBufferTransports.h index b5570f59..c1352294 100644 --- a/lib/cpp/src/thrift/transport/TBufferTransports.h +++ b/lib/cpp/src/thrift/transport/TBufferTransports.h @@ -634,7 +634,7 @@ class TMemoryBuffer : public TVirtualTransport { // Move it into ourself. this->swap(new_buffer); // Our old self gets destroyed. - } + } std::string readAsString(uint32_t len) { std::string str; @@ -646,7 +646,8 @@ class TMemoryBuffer : public TVirtualTransport { // return number of bytes read uint32_t readEnd() { - uint32_t bytes = rBase_ - buffer_; + //This cast should be safe, because buffer_'s size is a uint32_t + uint32_t bytes = static_cast(rBase_ - buffer_); if (rBase_ == wBase_) { resetBuffer(); } @@ -655,7 +656,8 @@ class TMemoryBuffer : public TVirtualTransport { // Return number of bytes written uint32_t writeEnd() { - return wBase_ - buffer_; + //This cast should be safe, because buffer_'s size is a uint32_t + return static_cast(wBase_ - buffer_); } uint32_t available_read() const { diff --git a/lib/cpp/src/thrift/transport/TFDTransport.cpp b/lib/cpp/src/thrift/transport/TFDTransport.cpp index fb9cc57d..176e7bf7 100644 --- a/lib/cpp/src/thrift/transport/TFDTransport.cpp +++ b/lib/cpp/src/thrift/transport/TFDTransport.cpp @@ -66,7 +66,9 @@ uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) { "TFDTransport::read()", errno_copy); } - return rv; + //this should be fine, since we already checked for negative values, + //and ::read should only return a 32-bit value since len is 32-bit. + return static_cast(rv); } } @@ -85,7 +87,9 @@ void TFDTransport::write(const uint8_t* buf, uint32_t len) { } buf += rv; - len -= rv; + //this should be fine, as we've already checked for negative values, and + //::write shouldn't return more than a uint32_t since len is a uint32_t + len -= static_cast(rv); } } diff --git a/lib/cpp/src/thrift/transport/TFileTransport.cpp b/lib/cpp/src/thrift/transport/TFileTransport.cpp index 47dc3281..0a07fbad 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.cpp +++ b/lib/cpp/src/thrift/transport/TFileTransport.cpp @@ -114,7 +114,7 @@ TFileTransport::TFileTransport(string path, bool readOnly) openLogFile(); } -void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) { +void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { filename_ = filename; offset_ = offset; @@ -827,7 +827,7 @@ void TFileTransport::performRecovery() { char errorMsg[1024]; sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", (offset_ + readState_.lastDispatchPtr_)); - + GlobalOutput(errorMsg); throw TTransportException(errorMsg); } @@ -1079,7 +1079,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) { if ( (numEvents > 0) && (numProcessed == numEvents)) { return; } - } catch (TEOFException& teof) { + } catch (TEOFException&) { if (!tail) { break; } @@ -1110,7 +1110,7 @@ void TFileProcessor::processChunk() { if (curChunk != inputTransport_->getCurChunk()) { break; } - } catch (TEOFException& teof) { + } catch (TEOFException&) { break; } catch (TException &te) { cerr << te.what() << endl; diff --git a/lib/cpp/src/thrift/transport/TFileTransport.h b/lib/cpp/src/thrift/transport/TFileTransport.h index e27f5a6b..edfc407c 100644 --- a/lib/cpp/src/thrift/transport/TFileTransport.h +++ b/lib/cpp/src/thrift/transport/TFileTransport.h @@ -201,7 +201,7 @@ class TFileTransport : public TFileReaderTransport, uint32_t getCurChunk(); // for changing the output file - void resetOutputFile(int fd, std::string filename, int64_t offset); + void resetOutputFile(int fd, std::string filename, off_t offset); // Setter/Getter functions for user-controllable options void setReadBuffSize(uint32_t readBuffSize) { diff --git a/lib/cpp/src/thrift/transport/THttpClient.cpp b/lib/cpp/src/thrift/transport/THttpClient.cpp index adfc9593..a4771626 100644 --- a/lib/cpp/src/thrift/transport/THttpClient.cpp +++ b/lib/cpp/src/thrift/transport/THttpClient.cpp @@ -49,7 +49,7 @@ void THttpClient::parseHeader(char* header) { if (boost::iends_with(value, "chunked")) { chunked_ = true; } - } else if (boost::istarts_with(header, "Content-Length")) { + } else if (boost::istarts_with(header, "Content-Length")) { chunked_ = false; contentLength_ = atoi(value); } @@ -101,8 +101,10 @@ void THttpClient::flush() { CRLF; string header = h.str(); + if(header.size() > (std::numeric_limits::max)()) + throw TTransportException("Header too big"); // Write the header, then the data, then flush - transport_->write((const uint8_t*)header.c_str(), header.size()); + transport_->write((const uint8_t*)header.c_str(), static_cast(header.size())); transport_->write(buf, len); transport_->flush(); diff --git a/lib/cpp/src/thrift/transport/THttpServer.cpp b/lib/cpp/src/thrift/transport/THttpServer.cpp index d7231967..1b3fe7bc 100644 --- a/lib/cpp/src/thrift/transport/THttpServer.cpp +++ b/lib/cpp/src/thrift/transport/THttpServer.cpp @@ -39,7 +39,7 @@ void THttpServer::parseHeader(char* header) { if (colon == NULL) { return; } - uint32_t sz = colon - header; + size_t sz = colon - header; char* value = colon+1; if (strncmp(header, "Transfer-Encoding", sz) == 0) { @@ -96,7 +96,8 @@ void THttpServer::flush() { string header = h.str(); // Write the header, then the data, then flush - transport_->write((const uint8_t*)header.c_str(), header.size()); + // cast should be fine, because none of "header" is under attacker control + transport_->write((const uint8_t*)header.c_str(), static_cast(header.size())); transport_->write(buf, len); transport_->flush(); diff --git a/lib/cpp/src/thrift/transport/THttpTransport.cpp b/lib/cpp/src/thrift/transport/THttpTransport.cpp index 3bda20e7..c415ddb9 100644 --- a/lib/cpp/src/thrift/transport/THttpTransport.cpp +++ b/lib/cpp/src/thrift/transport/THttpTransport.cpp @@ -171,7 +171,7 @@ char* THttpTransport::readLine() { // Return pointer to next line *eol = '\0'; char* line = httpBuf_+httpPos_; - httpPos_ = (eol-httpBuf_) + CRLF_LEN; + httpPos_ = static_cast((eol-httpBuf_) + CRLF_LEN); return line; } } diff --git a/lib/cpp/src/thrift/transport/TPipe.cpp b/lib/cpp/src/thrift/transport/TPipe.cpp index ae98a478..464272d4 100644 --- a/lib/cpp/src/thrift/transport/TPipe.cpp +++ b/lib/cpp/src/thrift/transport/TPipe.cpp @@ -28,57 +28,41 @@ using namespace std; * TPipe implementation. */ +#ifdef _WIN32 //---- Constructors ---- -TPipe::TPipe(int Pipe) : - pipename_(""), +TPipe::TPipe(HANDLE Pipe) : Pipe_(Pipe), TimeoutSeconds_(3), isAnonymous(false) +{} + +TPipe::TPipe(const char *pipename) : + Pipe_(INVALID_HANDLE_VALUE), + TimeoutSeconds_(3), + isAnonymous(false) { -#ifndef _WIN32 - GlobalOutput.perror("TPipe: constructor using a pipe handle is not supported under *NIX", -99); - throw TTransportException(TTransportException::NOT_OPEN, " constructor using a pipe handle is not supported under *NIX"); -#endif + setPipename(pipename); } -TPipe::TPipe(string pipename) : - pipename_(pipename), - Pipe_(-1), +TPipe::TPipe(const std::string &pipename) : + Pipe_(INVALID_HANDLE_VALUE), TimeoutSeconds_(3), isAnonymous(false) { -#ifdef _WIN32 - if(pipename_.find("\\\\") == -1) { - pipename_ = "\\\\.\\pipe\\" + pipename_; - } -#else - dsocket.reset(new TSocket(pipename)); -#endif + setPipename(pipename); } -TPipe::TPipe(int PipeRd, int PipeWrt) : - pipename_(""), +TPipe::TPipe(HANDLE PipeRd, HANDLE PipeWrt) : Pipe_(PipeRd), PipeWrt_(PipeWrt), TimeoutSeconds_(3), isAnonymous(true) -{ -#ifndef _WIN32 - GlobalOutput.perror("TPipe: Anonymous pipes not yet supported under *NIX", -99); - throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); -#endif -} +{} - TPipe::TPipe() : - pipename_(""), - Pipe_(-1), +TPipe::TPipe() : + Pipe_(INVALID_HANDLE_VALUE), TimeoutSeconds_(3) -{ -#ifndef _WIN32 - GlobalOutput.perror("TPipe: Anonymous pipes not yet supported under *NIX", -99); - throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); -#endif -} +{} //---- Destructor ---- TPipe::~TPipe() { @@ -86,15 +70,13 @@ TPipe::~TPipe() { } -bool TPipe::isOpen() { - return (Pipe_ != -1); -} - //--------------------------------------------------------- // Transport callbacks //--------------------------------------------------------- -#ifdef _WIN32 //Windows callbacks +bool TPipe::isOpen() { + return (Pipe_ != INVALID_HANDLE_VALUE); +} bool TPipe::peek() { if (!isOpen()) { @@ -102,7 +84,7 @@ bool TPipe::peek() { } DWORD bytesavail = 0; int PeekRet = 0; - PeekRet = PeekNamedPipe((HANDLE)Pipe_, NULL, 0, NULL, &bytesavail, NULL); + PeekRet = PeekNamedPipe(Pipe_, NULL, 0, NULL, &bytesavail, NULL); return (PeekRet != 0 && bytesavail > 0); } @@ -116,44 +98,44 @@ void TPipe::open() { HANDLE hPipe_; for(int i=0; ipeek(); -} - -void TPipe::open() { - dsocket->open(); -} - -void TPipe::close() { - dsocket->close(); -} - -uint32_t TPipe::read(uint8_t* buf, uint32_t len) { - return dsocket->read(buf, len); -} - -void TPipe::write(const uint8_t* buf, uint32_t len) { - dsocket->write(buf, len); -} -#endif //callbacks - - //--------------------------------------------------------- // Accessors //--------------------------------------------------------- @@ -223,23 +182,26 @@ string TPipe::getPipename() { return pipename_; } -void TPipe::setPipename(std::string pipename) { - pipename_ = pipename; +void TPipe::setPipename(const std::string &pipename) { + if(pipename.find("\\\\") == -1) + pipename_ = "\\\\.\\pipe\\" + pipename; + else + pipename_ = pipename; } -int TPipe::getPipeHandle() { +HANDLE TPipe::getPipeHandle() { return Pipe_; } -void TPipe::setPipeHandle(int pipehandle) { +void TPipe::setPipeHandle(HANDLE pipehandle) { Pipe_ = pipehandle; } -int TPipe::getWrtPipeHandle() { +HANDLE TPipe::getWrtPipeHandle() { return PipeWrt_; } -void TPipe::setWrtPipeHandle(int pipehandle) { +void TPipe::setWrtPipeHandle(HANDLE pipehandle) { PipeWrt_ = pipehandle; } @@ -250,5 +212,6 @@ long TPipe::getConnectTimeout() { void TPipe::setConnectTimeout(long seconds) { TimeoutSeconds_ = seconds; } +#endif //_WIN32 }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TPipe.h b/lib/cpp/src/thrift/transport/TPipe.h index 942f54f7..77426440 100644 --- a/lib/cpp/src/thrift/transport/TPipe.h +++ b/lib/cpp/src/thrift/transport/TPipe.h @@ -32,16 +32,19 @@ namespace apache { namespace thrift { namespace transport { * Windows Pipes implementation of the TTransport interface. * */ +#ifdef _WIN32 class TPipe : public TVirtualTransport { public: // Constructs a new pipe object. TPipe(); // Named pipe constructors - - TPipe(int Pipe); - TPipe(std::string pipename); + explicit TPipe(HANDLE Pipe); //HANDLE is a void* + //need a const char * overload so string literals don't go to the HANDLE overload + explicit TPipe(const char *pipename); + explicit TPipe(const std::string &pipename); // Anonymous pipe - - TPipe(int PipeRd, int PipeWrt); + TPipe(HANDLE PipeRd, HANDLE PipeWrt); // Destroys the pipe object, closing it if necessary. virtual ~TPipe(); @@ -67,26 +70,25 @@ class TPipe : public TVirtualTransport { //Accessors std::string getPipename(); - void setPipename(std::string pipename); - int getPipeHandle(); //doubles as the read handle for anon pipe - void setPipeHandle(int pipehandle); - int getWrtPipeHandle(); - void setWrtPipeHandle(int pipehandle); + void setPipename(const std::string &pipename); + HANDLE getPipeHandle(); //doubles as the read handle for anon pipe + void setPipeHandle(HANDLE pipehandle); + HANDLE getWrtPipeHandle(); + void setWrtPipeHandle(HANDLE pipehandle); long getConnectTimeout(); void setConnectTimeout(long seconds); private: std::string pipename_; + //Named pipe handles are R/W, while anonymous pipes are one or the other (half duplex). - int Pipe_, PipeWrt_; + HANDLE Pipe_, PipeWrt_; long TimeoutSeconds_; bool isAnonymous; - -#ifndef _WIN32 - //*NIX named pipe implementation uses domain socket - boost::shared_ptr dsocket; -#endif }; +#else +typedef TSocket TPipe; +#endif }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TPipeServer.cpp b/lib/cpp/src/thrift/transport/TPipeServer.cpp index 73a52196..3a97dda5 100644 --- a/lib/cpp/src/thrift/transport/TPipeServer.cpp +++ b/lib/cpp/src/thrift/transport/TPipeServer.cpp @@ -32,30 +32,26 @@ namespace apache { namespace thrift { namespace transport { +#ifdef _WIN32 + using namespace std; using boost::shared_ptr; //---- Constructors ---- -TPipeServer::TPipeServer(string pipename, uint32_t bufsize) : +TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize) : pipename_(pipename), bufsize_(bufsize), - Pipe_(-1), + Pipe_(INVALID_HANDLE_VALUE), maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), isAnonymous(false) { -#ifdef _WIN32 - if(pipename_.find("\\\\") == 0) { - pipename_ = "\\\\.\\pipe\\" + pipename_; - } -#else - dsrvsocket.reset(new TServerSocket(pipename)); -#endif + setPipename(pipename); } -TPipeServer::TPipeServer(string pipename, uint32_t bufsize, uint32_t maxconnections) : +TPipeServer::TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections) : pipename_(pipename), bufsize_(bufsize), - Pipe_(-1), + Pipe_(INVALID_HANDLE_VALUE), isAnonymous(false) { //Restrict maxconns_ to 1-255 if(maxconnections == 0) @@ -65,39 +61,26 @@ TPipeServer::TPipeServer(string pipename, uint32_t bufsize, uint32_t maxconnecti else maxconns_ = maxconnections; -#ifdef _WIN32 - if(pipename_.find("\\\\") == -1) { - pipename_ = "\\\\.\\pipe\\" + pipename_; - } -#else - dsrvsocket.reset(new TServerSocket(pipename)); -#endif + setPipename(pipename); } -TPipeServer::TPipeServer(string pipename) : +TPipeServer::TPipeServer(const std::string &pipename) : pipename_(pipename), bufsize_(1024), - Pipe_(-1), + Pipe_(INVALID_HANDLE_VALUE), maxconns_(TPIPE_SERVER_MAX_CONNS_DEFAULT), isAnonymous(false) { -#ifdef _WIN32 - if(pipename_.find("\\\\") == 0) { - pipename_ = "\\\\.\\pipe\\" + pipename_; - } -#else - dsrvsocket.reset(new TServerSocket(pipename)); -#endif + setPipename(pipename); } -TPipeServer::TPipeServer(int bufsize) : +TPipeServer::TPipeServer(int bufsize) : pipename_(""), bufsize_(bufsize), - Pipe_(-1), + Pipe_(INVALID_HANDLE_VALUE), maxconns_(1), isAnonymous(true) { -#ifdef _WIN32 //The anonymous pipe needs to be created first so that the server can //pass the handles on to the client before the serve (acceptImpl) //blocking call. @@ -105,28 +88,19 @@ TPipeServer::TPipeServer(int bufsize) : GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } -#else - GlobalOutput.perror("TPipeServer: Anonymous pipes not yet supported under *NIX", -99); - throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); -#endif } -TPipeServer::TPipeServer() : +TPipeServer::TPipeServer() : pipename_(""), bufsize_(1024), - Pipe_(-1), + Pipe_(INVALID_HANDLE_VALUE), maxconns_(1), isAnonymous(true) { -#ifdef _WIN32 if (!TCreateAnonPipe()) { GlobalOutput.perror("TPipeServer Create(Anon)Pipe failed, GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, " TPipeServer Create(Anon)Pipe failed"); } -#else - GlobalOutput.perror("TPipeServer: Anonymous pipes not yet supported under *NIX", -99); - throw TTransportException(TTransportException::NOT_OPEN, " Anonymous pipes not yet supported under *NIX"); -#endif } //---- Destructor ---- @@ -138,8 +112,6 @@ TPipeServer::~TPipeServer() { // Transport callbacks //--------------------------------------------------------- -#ifdef _WIN32 - shared_ptr TPipeServer::acceptImpl() { shared_ptr client; @@ -148,11 +120,11 @@ shared_ptr TPipeServer::acceptImpl() { //This 0-byte read serves merely as a blocking call. byte buf; DWORD br; - int fSuccess = ReadFile( - (HANDLE)Pipe_, // pipe handle - &buf, // buffer to receive reply - 0, // size of buffer - &br, // number of bytes read + int fSuccess = ReadFile( + Pipe_, // pipe handle + &buf, // buffer to receive reply + 0, // size of buffer + &br, // number of bytes read NULL); // not overlapped if ( !fSuccess && GetLastError() != ERROR_MORE_DATA ) { @@ -172,9 +144,9 @@ shared_ptr TPipeServer::acceptImpl() { } // Wait for the client to connect; if it succeeds, the - // function returns a nonzero value. If the function returns - // zero, GetLastError should return ERROR_PIPE_CONNECTED. - ConnectRet = ConnectNamedPipe((HANDLE)Pipe_, NULL) ? + // function returns a nonzero value. If the function returns + // zero, GetLastError should return ERROR_PIPE_CONNECTED. + ConnectRet = ConnectNamedPipe(Pipe_, NULL) ? TRUE : (GetLastError() == ERROR_PIPE_CONNECTED); if (ConnectRet == TRUE) @@ -196,27 +168,27 @@ shared_ptr TPipeServer::acceptImpl() { } void TPipeServer::interrupt() { - if(Pipe_ != -1) { - CancelIo((HANDLE)Pipe_); + if(Pipe_ != INVALID_HANDLE_VALUE) { + CancelIo(Pipe_); } } void TPipeServer::close() { if(!isAnonymous) { - if(Pipe_ != -1) { - DisconnectNamedPipe((HANDLE)Pipe_); - CloseHandle((HANDLE)Pipe_); - Pipe_ = -1; + if(Pipe_ != INVALID_HANDLE_VALUE) { + DisconnectNamedPipe(Pipe_); + CloseHandle(Pipe_); + Pipe_ = INVALID_HANDLE_VALUE; } } else { try { - CloseHandle((HANDLE)Pipe_); - CloseHandle((HANDLE)PipeW_); - CloseHandle((HANDLE)ClientAnonRead); - CloseHandle((HANDLE)ClientAnonWrite); + CloseHandle(Pipe_); + CloseHandle(PipeW_); + CloseHandle(ClientAnonRead); + CloseHandle(ClientAnonWrite); } catch(...) { GlobalOutput.perror("TPipeServer anon close GLE=", GetLastError()); @@ -255,26 +227,26 @@ bool TPipeServer::TCreateNamedPipe() { sa.bInheritHandle = FALSE; // Create an instance of the named pipe - HANDLE hPipe_ = CreateNamedPipe( - pipename_.c_str(), // pipe name - PIPE_ACCESS_DUPLEX, // read/write access - PIPE_TYPE_MESSAGE | // message type pipe - PIPE_READMODE_MESSAGE, // message-read mode - maxconns_, // max. instances - bufsize_, // output buffer size - bufsize_, // input buffer size - 0, // client time-out - &sa); // default security attribute + HANDLE hPipe_ = CreateNamedPipe( + pipename_.c_str(), // pipe name + PIPE_ACCESS_DUPLEX, // read/write access + PIPE_TYPE_MESSAGE | // message type pipe + PIPE_READMODE_MESSAGE, // message-read mode + maxconns_, // max. instances + bufsize_, // output buffer size + bufsize_, // input buffer size + 0, // client time-out + &sa); // default security attribute if(hPipe_ == INVALID_HANDLE_VALUE) { - Pipe_ = -1; + Pipe_ = INVALID_HANDLE_VALUE; GlobalOutput.perror("TPipeServer::TCreateNamedPipe() GLE=", GetLastError()); throw TTransportException(TTransportException::NOT_OPEN, "TCreateNamedPipe() failed", GetLastError()); return false; } - Pipe_ = (int)hPipe_; + Pipe_ = hPipe_; return true; } @@ -302,43 +274,14 @@ bool TPipeServer::TCreateAnonPipe() { CloseHandle(PipeW_H); return false; } - ClientAnonRead = (int)ClientAnonReadH; - ClientAnonWrite = (int)ClientAnonWriteH; - Pipe_ = (int)Pipe_H; - PipeW_ = (int)PipeW_H; + ClientAnonRead = ClientAnonReadH; + ClientAnonWrite = ClientAnonWriteH; + Pipe_ = Pipe_H; + PipeW_ = PipeW_H; return true; } -#else -//*NIX implementation uses Unix Domain Sockets. -void TPipeServer::listen() { - dsrvsocket->listen(); -} - -shared_ptr TPipeServer::acceptImpl() { -// return boost::shared_dynamic_cast(dsrvsocket)->accept(); - return dsrvsocket->accept(); -} - -void TPipeServer::interrupt() { - dsrvsocket->interrupt(); -} - -void TPipeServer::close() { - dsrvsocket->close(); -} - -bool TPipeServer::TCreateNamedPipe() { - return false; //placeholder -} - -bool TPipeServer::TCreateAnonPipe() { - return false; //currently unimplemented -} -#endif //_WIN32 - - //--------------------------------------------------------- // Accessors //--------------------------------------------------------- @@ -347,8 +290,11 @@ string TPipeServer::getPipename() { return pipename_; } -void TPipeServer::setPipename(std::string pipename) { - pipename_ = pipename; +void TPipeServer::setPipename(const std::string &pipename) { + if(pipename.find("\\\\") == -1) + pipename_ = "\\\\.\\pipe\\" + pipename; + else + pipename_ = pipename; } int TPipeServer::getBufferSize() { @@ -359,21 +305,21 @@ void TPipeServer::setBufferSize(int bufsize) { bufsize_ = bufsize; } -int TPipeServer::getPipeHandle() { +HANDLE TPipeServer::getPipeHandle() { return Pipe_; } -int TPipeServer::getWrtPipeHandle() +HANDLE TPipeServer::getWrtPipeHandle() { return PipeW_; } -int TPipeServer::getClientRdPipeHandle() +HANDLE TPipeServer::getClientRdPipeHandle() { return ClientAnonRead; } -int TPipeServer::getClientWrtPipeHandle() +HANDLE TPipeServer::getClientWrtPipeHandle() { return ClientAnonWrite; } @@ -385,5 +331,6 @@ bool TPipeServer::getAnonymous() { void TPipeServer::setAnonymous(bool anon) { isAnonymous = anon; } +#endif //_WIN32 }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TPipeServer.h b/lib/cpp/src/thrift/transport/TPipeServer.h index c3cd26bc..624a30a5 100755 --- a/lib/cpp/src/thrift/transport/TPipeServer.h +++ b/lib/cpp/src/thrift/transport/TPipeServer.h @@ -33,13 +33,14 @@ namespace apache { namespace thrift { namespace transport { /** * Windows Pipes implementation of TServerTransport. */ +#ifdef _WIN32 class TPipeServer : public TServerTransport { public: //Constructors // Named Pipe - - TPipeServer(std::string pipename, uint32_t bufsize); - TPipeServer(std::string pipename, uint32_t bufsize, uint32_t maxconnections); - TPipeServer(std::string pipename); + TPipeServer(const std::string &pipename, uint32_t bufsize); + TPipeServer(const std::string &pipename, uint32_t bufsize, uint32_t maxconnections); + TPipeServer(const std::string &pipename); // Anonymous pipe - TPipeServer(int bufsize); TPipeServer(); @@ -59,33 +60,30 @@ class TPipeServer : public TServerTransport { public: //Accessors std::string getPipename(); - void setPipename(std::string pipename); + void setPipename(const std::string &pipename); int getBufferSize(); void setBufferSize(int bufsize); - int getPipeHandle(); //Named Pipe R/W -or- Anonymous pipe Read handle - int getWrtPipeHandle(); - int getClientRdPipeHandle(); - int getClientWrtPipeHandle(); + HANDLE getPipeHandle(); //Named Pipe R/W -or- Anonymous pipe Read handle + HANDLE getWrtPipeHandle(); + HANDLE getClientRdPipeHandle(); + HANDLE getClientWrtPipeHandle(); bool getAnonymous(); void setAnonymous(bool anon); private: std::string pipename_; uint32_t bufsize_; - int Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R) + HANDLE Pipe_; //Named Pipe (R/W) or Anonymous Pipe (R) uint32_t maxconns_; - int PipeW_; //Anonymous Pipe (W) - int ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles + HANDLE PipeW_; //Anonymous Pipe (W) + HANDLE ClientAnonRead, ClientAnonWrite; //Client side anonymous pipe handles //? Do we need duplicates to send to client? bool isAnonymous; - -public: -#ifndef _WIN32 - //*NIX named pipe implementation uses domain socket - void listen(); //Only needed for domain sockets - boost::shared_ptr dsrvsocket; -#endif }; +#else //_WIN32 +//*NIX named pipe implementation uses domain socket +typedef TServerSocket TPipeServer; +#endif }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TServerSocket.cpp b/lib/cpp/src/thrift/transport/TServerSocket.cpp index edc2cb8c..20313ac3 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.cpp +++ b/lib/cpp/src/thrift/transport/TServerSocket.cpp @@ -151,7 +151,7 @@ void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) { } void TServerSocket::listen() { - int sv[2]; + SOCKET sv[2]; if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) { GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno); intSock1_ = -1; @@ -243,7 +243,7 @@ void TServerSocket::listen() { #ifdef IPV6_V6ONLY if (res->ai_family == AF_INET6 && path_.empty()) { int zero = 0; - if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY, + if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY, cast_sockopt(&zero), sizeof(zero))) { GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno); } @@ -321,7 +321,7 @@ void TServerSocket::listen() { #endif } else { do { - if (0 == ::bind(serverSocket_, res->ai_addr, res->ai_addrlen)) { + if (0 == ::bind(serverSocket_, res->ai_addr, static_cast(res->ai_addrlen))) { break; } // use short circuit evaluation here to only sleep if we need to @@ -358,7 +358,7 @@ void TServerSocket::listen() { } shared_ptr TServerSocket::acceptImpl() { - if (serverSocket_ < 0) { + if (serverSocket_ == -1) { throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening"); } @@ -371,7 +371,7 @@ shared_ptr TServerSocket::acceptImpl() { std::memset(fds, 0 , sizeof(fds)); fds[0].fd = serverSocket_; fds[0].events = POLLIN; - if (intSock2_ >= 0) { + if (intSock2_ != -1) { fds[1].fd = intSock2_; fds[1].events = POLLIN; } @@ -393,7 +393,7 @@ shared_ptr TServerSocket::acceptImpl() { throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy); } else if (ret > 0) { // Check for an interrupt signal - if (intSock2_ >= 0 && (fds[1].revents & POLLIN)) { + if (intSock2_ != -1 && (fds[1].revents & POLLIN)) { int8_t buf; if (-1 == recv(intSock2_, cast_sockopt(&buf), sizeof(int8_t), 0)) { GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno); @@ -413,11 +413,11 @@ shared_ptr TServerSocket::acceptImpl() { struct sockaddr_storage clientAddress; int size = sizeof(clientAddress); - int clientSocket = ::accept(serverSocket_, + SOCKET clientSocket = ::accept(serverSocket_, (struct sockaddr *) &clientAddress, (socklen_t *) &size); - if (clientSocket < 0) { + if (clientSocket == -1) { int errno_copy = errno; GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy); throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy); @@ -445,16 +445,16 @@ shared_ptr TServerSocket::acceptImpl() { client->setRecvTimeout(recvTimeout_); } client->setCachedAddress((sockaddr*) &clientAddress, size); - + return client; } -shared_ptr TServerSocket::createSocket(int clientSocket) { +shared_ptr TServerSocket::createSocket(SOCKET clientSocket) { return shared_ptr(new TSocket(clientSocket)); } void TServerSocket::interrupt() { - if (intSock1_ >= 0) { + if (intSock1_ != -1) { int8_t byte = 0; if (-1 == send(intSock1_, cast_sockopt(&byte), sizeof(int8_t), 0)) { GlobalOutput.perror("TServerSocket::interrupt() send() ", errno); @@ -463,7 +463,7 @@ void TServerSocket::interrupt() { } void TServerSocket::close() { - if (serverSocket_ >= 0) { + if (serverSocket_ != -1) { #ifdef _WIN32 shutdown(serverSocket_, SD_BOTH); @@ -474,10 +474,10 @@ void TServerSocket::close() { #endif } - if (intSock1_ >= 0) { + if (intSock1_ != -1) { ::close(intSock1_); } - if (intSock2_ >= 0) { + if (intSock2_ != -1) { ::close(intSock2_); } serverSocket_ = -1; diff --git a/lib/cpp/src/thrift/transport/TServerSocket.h b/lib/cpp/src/thrift/transport/TServerSocket.h index 280ee698..fb93eb4d 100644 --- a/lib/cpp/src/thrift/transport/TServerSocket.h +++ b/lib/cpp/src/thrift/transport/TServerSocket.h @@ -22,6 +22,9 @@ #include "TServerTransport.h" #include +#ifdef __linux__ + typedef int SOCKET; +#endif namespace apache { namespace thrift { namespace transport { @@ -58,12 +61,12 @@ class TServerSocket : public TServerTransport { protected: boost::shared_ptr acceptImpl(); - virtual boost::shared_ptr createSocket(int client); + virtual boost::shared_ptr createSocket(SOCKET client); private: int port_; std::string path_; - int serverSocket_; + SOCKET serverSocket_; int acceptBacklog_; int sendTimeout_; int recvTimeout_; @@ -73,8 +76,8 @@ class TServerSocket : public TServerTransport { int tcpSendBuffer_; int tcpRecvBuffer_; - int intSock1_; - int intSock2_; + SOCKET intSock1_; + SOCKET intSock2_; }; }}} // apache::thrift::transport diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp index 3c235d2c..88cb2822 100644 --- a/lib/cpp/src/thrift/transport/TSocket.cpp +++ b/lib/cpp/src/thrift/transport/TSocket.cpp @@ -129,7 +129,7 @@ TSocket::TSocket() : cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC; } -TSocket::TSocket(int socket) : +TSocket::TSocket(SOCKET socket) : host_(""), port_(0), path_(""), @@ -151,7 +151,7 @@ TSocket::~TSocket() { } bool TSocket::isOpen() { - return (socket_ >= 0); + return (socket_ != -1); } bool TSocket::peek() { @@ -164,7 +164,7 @@ bool TSocket::peek() { int errno_copy = errno; #if defined __FreeBSD__ || defined __MACH__ /* shigin: - * freebsd returns -1 and ECONNRESET if socket was closed by + * freebsd returns -1 and ECONNRESET if socket was closed by * the other side */ if (errno_copy == ECONNRESET) @@ -264,7 +264,7 @@ void TSocket::openConnection(struct addrinfo *res) { #endif } else { - ret = connect(socket_, res->ai_addr, res->ai_addrlen); + ret = connect(socket_, res->ai_addr, static_cast(res->ai_addrlen)); } // success case @@ -319,7 +319,7 @@ void TSocket::openConnection(struct addrinfo *res) { fcntl(socket_, F_SETFL, flags); if (path_.empty()) { - setCachedAddress(res->ai_addr, res->ai_addrlen); + setCachedAddress(res->ai_addr, static_cast(res->ai_addrlen)); } } @@ -382,7 +382,7 @@ void TSocket::local_open(){ try { openConnection(res); break; - } catch (TTransportException& ttx) { + } catch (TTransportException&) { if (res->ai_next) { close(); } else { @@ -398,7 +398,7 @@ void TSocket::local_open(){ } void TSocket::close() { - if (socket_ >= 0) { + if (socket_ != -1) { #ifdef _WIN32 shutdown(socket_, SD_BOTH); @@ -413,14 +413,14 @@ void TSocket::close() { } void TSocket::setSocketFD(int socket) { - if (socket_ >= 0) { + if (socket_ != -1) { close(); } socket_ = socket; } uint32_t TSocket::read(uint8_t* buf, uint32_t len) { - if (socket_ < 0) { + if (socket_ == -1) { throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket"); } @@ -489,7 +489,7 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { #if defined __FreeBSD__ || defined __MACH__ if (errno_copy == ECONNRESET) { /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with - * ECONNRESET if peer performed shutdown + * ECONNRESET if peer performed shutdown * edhall: eliminated close() since we do that in the destructor. */ return 0; @@ -551,7 +551,7 @@ void TSocket::write(const uint8_t* buf, uint32_t len) { } uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { - if (socket_ < 0) { + if (socket_ == -1) { throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket"); } @@ -582,7 +582,7 @@ uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy); } - + // Fail on blocked send if (b == 0) { throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0."); @@ -609,7 +609,7 @@ void TSocket::setPort(int port) { void TSocket::setLinger(bool on, int linger) { lingerOn_ = on; lingerVal_ = linger; - if (socket_ < 0) { + if (socket_ == -1) { return; } @@ -623,7 +623,7 @@ void TSocket::setLinger(bool on, int linger) { void TSocket::setNoDelay(bool noDelay) { noDelay_ = noDelay; - if (socket_ < 0 || !path_.empty()) { + if (socket_ == -1 || !path_.empty()) { return; } @@ -649,7 +649,7 @@ void TSocket::setRecvTimeout(int ms) { } recvTimeout_ = ms; - if (socket_ < 0) { + if (socket_ == -1) { return; } @@ -674,7 +674,7 @@ void TSocket::setSendTimeout(int ms) { } sendTimeout_ = ms; - if (socket_ < 0) { + if (socket_ == -1) { return; } @@ -708,7 +708,7 @@ std::string TSocket::getPeerHost() { struct sockaddr* addrPtr; socklen_t addrLen; - if (socket_ < 0) { + if (socket_ == -1) { return host_; } @@ -742,7 +742,7 @@ std::string TSocket::getPeerAddress() { struct sockaddr* addrPtr; socklen_t addrLen; - if (socket_ < 0) { + if (socket_ == -1) { return peerAddress_; } @@ -810,7 +810,7 @@ sockaddr* TSocket::getCachedAddress(socklen_t* len) const { default: return NULL; } -} +} bool TSocket::useLowMinRto_ = false; void TSocket::setUseLowMinRto(bool useLowMinRto) { diff --git a/lib/cpp/src/thrift/transport/TSocket.h b/lib/cpp/src/thrift/transport/TSocket.h index 2357430c..4f091c48 100644 --- a/lib/cpp/src/thrift/transport/TSocket.h +++ b/lib/cpp/src/thrift/transport/TSocket.h @@ -32,6 +32,9 @@ #ifdef HAVE_NETDB_H #include #endif +#ifdef __linux__ + typedef int SOCKET; +#endif namespace apache { namespace thrift { namespace transport { @@ -197,7 +200,7 @@ class TSocket : public TVirtualTransport { /** * Returns the underlying socket file descriptor. */ - int getSocketFD() { + SOCKET getSocketFD() { return socket_; } @@ -228,7 +231,7 @@ class TSocket : public TVirtualTransport { /** * Constructor to create socket from raw UNIX handle. */ - TSocket(int socket); + TSocket(SOCKET socket); /** * Set a cache of the peer address (used when trivially available: e.g. @@ -259,7 +262,7 @@ class TSocket : public TVirtualTransport { std::string path_; /** Underlying UNIX socket handle */ - int socket_; + SOCKET socket_; /** Connect timeout in ms */ int connTimeout_; diff --git a/lib/cpp/src/thrift/transport/TSocketPool.cpp b/lib/cpp/src/thrift/transport/TSocketPool.cpp index 160c5a3c..03764c27 100644 --- a/lib/cpp/src/thrift/transport/TSocketPool.cpp +++ b/lib/cpp/src/thrift/transport/TSocketPool.cpp @@ -174,7 +174,7 @@ void TSocketPool::setCurrentServer(const shared_ptr &server) /* TODO: without apc we ignore a lot of functionality from the php version */ void TSocketPool::open() { - unsigned int numServers = servers_.size(); + size_t numServers = servers_.size(); if (numServers == 0) { socket_ = -1; throw TTransportException(TTransportException::NOT_OPEN); @@ -188,7 +188,7 @@ void TSocketPool::open() { random_shuffle(servers_.begin(), servers_.end()); } - for (unsigned int i = 0; i < numServers; ++i) { + for (size_t i = 0; i < numServers; ++i) { shared_ptr &server = servers_[i]; // Impersonate the server socket @@ -204,7 +204,7 @@ void TSocketPool::open() { if (server->lastFailTime_ > 0) { // The server was marked as down, so check if enough time has elapsed to retry - int elapsedTime = time(NULL) - server->lastFailTime_; + time_t elapsedTime = time(NULL) - server->lastFailTime_; if (elapsedTime > retryInterval_) { retryIntervalPassed = true; } diff --git a/lib/cpp/src/thrift/transport/TSocketPool.h b/lib/cpp/src/thrift/transport/TSocketPool.h index 749116a1..48e35bb6 100644 --- a/lib/cpp/src/thrift/transport/TSocketPool.h +++ b/lib/cpp/src/thrift/transport/TSocketPool.h @@ -49,10 +49,10 @@ class TSocketPoolServer { int port_; // Socket for the server - int socket_; + SOCKET socket_; // Last time connecting to this server failed - int lastFailTime_; + time_t lastFailTime_; // Number of consecutive times connecting to this server failed int consecutiveFailures_; @@ -178,7 +178,7 @@ class TSocketPool : public TSocket { /** Retry interval in seconds, how long to not try a host if it has been * marked as down. */ - int retryInterval_; + time_t retryInterval_; /** Max consecutive failures before marking a host down. */ int maxConsecutiveFailures_; diff --git a/lib/cpp/src/thrift/windows/SocketPair.cpp b/lib/cpp/src/thrift/windows/SocketPair.cpp index 5eee4ae9..bca8d927 100644 --- a/lib/cpp/src/thrift/windows/SocketPair.cpp +++ b/lib/cpp/src/thrift/windows/SocketPair.cpp @@ -1,11 +1,11 @@ /* socketpair.c * Copyright 2007 by Nathan C. Myers ; some rights reserved. - * This code is Free Software. It may be copied freely, in original or + * This code is Free Software. It may be copied freely, in original or * modified form, subject only to the restrictions that (1) the author is * relieved from all responsibilities for any use for any purpose, and (2) * this copyright notice must be retained, unchanged, in its entirety. If * for any reason the author might be held responsible for any consequences - * of copying or use, license is withheld. + * of copying or use, license is withheld. */ /* @@ -33,10 +33,9 @@ #include // Win32 -#include #include -int socketpair(int d, int type, int protocol, int sv[2]) +int socketpair(int d, int type, int protocol, SOCKET sv[2]) { union { struct sockaddr_in inaddr; @@ -54,17 +53,17 @@ int socketpair(int d, int type, int protocol, int sv[2]) } listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (listener == INVALID_SOCKET) + if (listener == INVALID_SOCKET) return SOCKET_ERROR; memset(&a, 0, sizeof(a)); a.inaddr.sin_family = AF_INET; a.inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - a.inaddr.sin_port = 0; + a.inaddr.sin_port = 0; sv[0] = sv[1] = INVALID_SOCKET; do { - if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, + if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (char*) &reuse, (socklen_t) sizeof(reuse)) == -1) break; if (bind(listener, &a.addr, sizeof(a.inaddr)) == SOCKET_ERROR) diff --git a/lib/cpp/src/thrift/windows/SocketPair.h b/lib/cpp/src/thrift/windows/SocketPair.h index 27ec9b18..9d02998e 100644 --- a/lib/cpp/src/thrift/windows/SocketPair.h +++ b/lib/cpp/src/thrift/windows/SocketPair.h @@ -28,6 +28,9 @@ #error This is a MSVC header only. #endif -int socketpair(int d, int type, int protocol, int sv[2]); +// Win32 +#include + +int socketpair(int d, int type, int protocol, SOCKET sv[2]); #endif // _THRIFT_WINDOWS_SOCKETPAIR_H_ diff --git a/lib/cpp/src/thrift/windows/StdAfx.h b/lib/cpp/src/thrift/windows/StdAfx.h index b953b22b..e6ebbbaa 100644 --- a/lib/cpp/src/thrift/windows/StdAfx.h +++ b/lib/cpp/src/thrift/windows/StdAfx.h @@ -32,7 +32,10 @@ #include "Config.h" // Exclude rarely-used stuff from Windows headers +#ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN +#endif + #include #endif // _THRIFT_WINDOWS_STDAFX_H_ diff --git a/lib/cpp/src/thrift/windows/WinFcntl.cpp b/lib/cpp/src/thrift/windows/WinFcntl.cpp index bd277735..da2f73a4 100644 --- a/lib/cpp/src/thrift/windows/WinFcntl.cpp +++ b/lib/cpp/src/thrift/windows/WinFcntl.cpp @@ -17,12 +17,9 @@ * under the License. */ -#include "Fcntl.h" +#include "WinFcntl.h" -// Win32 -#include - -int fcntl(int fd, int cmd, int flags) +int fcntl(SOCKET fd, int cmd, int flags) { if(cmd != F_GETFL && cmd != F_SETFL) { diff --git a/lib/cpp/src/thrift/windows/WinFcntl.h b/lib/cpp/src/thrift/windows/WinFcntl.h index 40b70d6a..be80c5be 100644 --- a/lib/cpp/src/thrift/windows/WinFcntl.h +++ b/lib/cpp/src/thrift/windows/WinFcntl.h @@ -28,6 +28,9 @@ #error This is a MSVC header only. #endif +// Win32 +#include + #define O_NONBLOCK 1 enum @@ -36,6 +39,6 @@ enum F_SETFL, }; -int fcntl(int fd, int cmd, int flags); +int fcntl(SOCKET fd, int cmd, int flags); #endif // _THRIFT_WINDOWS_FCNTL_H_ diff --git a/lib/cpp/src/thrift/windows/config.h b/lib/cpp/src/thrift/windows/config.h index aa361ffb..fa836e4a 100644 --- a/lib/cpp/src/thrift/windows/config.h +++ b/lib/cpp/src/thrift/windows/config.h @@ -126,13 +126,12 @@ inline int poll_win32(LPWSAPOLLFD fdArray, ULONG nfds, INT timeout) int sktready = select(1, read_fds_ptr, write_fds_ptr, NULL, time_out_ptr); if(sktready > 0) { - for(ULONG i=0; i