From 3faaedf46b0c99096f0a18820782362886530e8e Mon Sep 17 00:00:00 2001 From: Roger Meier Date: Sun, 2 Oct 2011 10:51:45 +0000 Subject: [PATCH] THRIFT-1361 Optional replacement of pthread by boost::thread Patch: alexandre parenteau git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1178176 13f79535-47bb-0310-9956-ffa450edef68 --- .gitignore | 1 + configure.ac | 17 ++ lib/cpp/Makefile.am | 19 +- lib/cpp/src/concurrency/BoostMonitor.cpp | 203 ++++++++++++++++++ lib/cpp/src/concurrency/BoostMutex.cpp | 59 +++++ .../src/concurrency/BoostThreadFactory.cpp | 182 ++++++++++++++++ lib/cpp/src/concurrency/BoostThreadFactory.h | 75 +++++++ lib/cpp/src/concurrency/Mutex.h | 2 +- .../src/concurrency/PlatformThreadFactory.h | 40 ++++ .../src/concurrency/PosixThreadFactory.cpp | 2 +- lib/cpp/src/concurrency/Thread.h | 8 + .../src/concurrency/test/ThreadFactoryTests.h | 12 +- .../src/concurrency/test/ThreadManagerTests.h | 12 +- .../src/concurrency/test/TimerManagerTests.h | 4 +- lib/cpp/src/server/TThreadedServer.cpp | 8 +- lib/cpp/src/transport/TFDTransport.cpp | 2 + lib/cpp/src/transport/TFileTransport.cpp | 109 +++++++--- lib/cpp/src/transport/TFileTransport.h | 23 +- lib/cpp/src/transport/TSocket.cpp | 6 + lib/cpp/src/windows/config.h | 19 +- test/cpp/Makefile.am | 6 +- test/cpp/src/StressTest.cpp | 4 +- test/cpp/src/StressTestNonBlocking.cpp | 4 +- test/cpp/src/TestClient.cpp | 6 +- test/cpp/src/TestServer.cpp | 6 +- test/threads/ThreadsClient.cpp | 6 +- test/threads/ThreadsServer.cpp | 10 +- 27 files changed, 762 insertions(+), 83 deletions(-) create mode 100644 lib/cpp/src/concurrency/BoostMonitor.cpp create mode 100644 lib/cpp/src/concurrency/BoostMutex.cpp create mode 100644 lib/cpp/src/concurrency/BoostThreadFactory.cpp create mode 100644 lib/cpp/src/concurrency/BoostThreadFactory.h create mode 100644 lib/cpp/src/concurrency/PlatformThreadFactory.h diff --git a/.gitignore b/.gitignore index 082f7b73..02d66d93 100644 --- a/.gitignore +++ b/.gitignore @@ -129,6 +129,7 @@ /lib/rb/ext/thrift_native.so /lib/rb/spec/gen-* /lib/rb/test/ +/lib/rb/thrift-*.gem /lib/php/Makefile /lib/php/Makefile.in /lib/php/src/ext/thrift_protocol/.deps diff --git a/configure.ac b/configure.ac index 44fd168d..9759a5a6 100644 --- a/configure.ac +++ b/configure.ac @@ -432,6 +432,23 @@ AC_SUBST(GCOV_CFLAGS) AC_SUBST(GCOV_CXXFLAGS) AC_SUBST(GCOV_LDFLAGS) +AC_ARG_ENABLE(boostthreads, + [ --enable-boostthreads use boost threads, instead of POSIX pthread (experimental) ], + [case "${enableval}" in + yes) ENABLE_BOOSTTHREADS=1 ;; + no) ENABLE_BOOSTTHREADS=0 ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-cov) ;; + esac], + [ENABLE_BOOSTTHREADS=2]) + + +if test "x[$]ENABLE_BOOSTTHREADS" = "x1"; then + AC_MSG_WARN(enable boostthreads) + AC_DEFINE([USE_BOOST_THREAD], [1], [experimental --enable-boostthreads that replaces POSIX pthread by boost::thread]) +fi + +AM_CONDITIONAL([WITH_BOOSTTHREADS], [test "x[$]ENABLE_BOOSTTHREADS" = "x1"]) + AC_CONFIG_HEADERS(config.h:config.hin) AC_CONFIG_FILES([ diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index d5bc4899..593ef9e0 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -41,14 +41,12 @@ endif AM_CXXFLAGS = -Wall AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(srcdir)/src +AM_LDFLAGS = $(BOOST_LDFLAGS) # Define the source files for the module libthrift_la_SOURCES = src/Thrift.cpp \ src/TApplicationException.cpp \ - src/concurrency/Mutex.cpp \ - src/concurrency/Monitor.cpp \ - src/concurrency/PosixThreadFactory.cpp \ src/concurrency/ThreadManager.cpp \ src/concurrency/TimerManager.cpp \ src/concurrency/Util.cpp \ @@ -77,6 +75,17 @@ libthrift_la_SOURCES = src/Thrift.cpp \ src/async/TAsyncChannel.cpp \ src/processor/PeekProcessor.cpp +if WITH_BOOSTTHREADS +libthrift_la_SOURCES += src/concurrency/BoostThreadFactory.cpp \ + src/concurrency/BoostMonitor.cpp \ + src/concurrency/BoostMutex.cpp +else +libthrift_la_SOURCES += src/concurrency/Mutex.cpp \ + src/concurrency/Monitor.cpp \ + src/concurrency/PosixThreadFactory.cpp +endif + + libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp \ src/async/TAsyncProtocolProcessor.cpp \ src/async/TEvhttpServer.cpp \ @@ -91,6 +100,10 @@ libthriftz_la_CPPFLAGS = $(AM_CPPFLAGS) $(ZLIB_CPPFLAGS) libthriftnb_la_CXXFLAGS = $(AM_CXXFLAGS) libthriftz_la_CXXFLAGS = $(AM_CXXFLAGS) +if WITH_BOOSTTHREADS +libthrift_la_LIBADD = -lboost_thread +endif + include_thriftdir = $(includedir)/thrift include_thrift_HEADERS = \ $(top_builddir)/config.h \ diff --git a/lib/cpp/src/concurrency/BoostMonitor.cpp b/lib/cpp/src/concurrency/BoostMonitor.cpp new file mode 100644 index 00000000..7a9b589b --- /dev/null +++ b/lib/cpp/src/concurrency/BoostMonitor.cpp @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif +#include "Monitor.h" +#include "Exception.h" +#include "Util.h" + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace apache { namespace thrift { namespace concurrency { + +using namespace boost::interprocess; + +/** + * Monitor implementation using the boost interprocess library + * + * @version $Id:$ + */ +class Monitor::Impl : public interprocess_condition { + + public: + + Impl() + : ownedMutex_(new Mutex()), + mutex_(NULL) { + init(ownedMutex_.get()); + } + + Impl(Mutex* mutex) + : mutex_(NULL) { + init(mutex); + } + + Impl(Monitor* monitor) + : mutex_(NULL) { + init(&(monitor->mutex())); + } + + Mutex& mutex() { return *mutex_; } + void lock() { mutex().lock(); } + void unlock() { mutex().unlock(); } + + /** + * Exception-throwing version of waitForTimeRelative(), called simply + * wait(int64) for historical reasons. Timeout is in milliseconds. + * + * If the condition occurs, this function returns cleanly; on timeout or + * error an exception is thrown. + */ + void wait(int64_t timeout_ms) { + int result = waitForTimeRelative(timeout_ms); + if (result == ETIMEDOUT) { + throw TimedOutException(); + } else if (result != 0) { + throw TException( + "Monitor::wait() failed"); + } + } + + /** + * Waits until the specified timeout in milliseconds for the condition to + * occur, or waits forever if timeout_ms == 0. + * + * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + */ + int waitForTimeRelative(int64_t timeout_ms) { + if (timeout_ms == 0LL) { + return waitForever(); + } + + assert(mutex_); + interprocess_mutex* mutexImpl = + reinterpret_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + scoped_lock lock(*mutexImpl, accept_ownership_type()); + int res = timed_wait(lock, boost::get_system_time()+boost::posix_time::milliseconds(timeout_ms)) ? 0 : ETIMEDOUT; + lock.release(); + return res; + } + + /** + * Waits until the absolute time specified using struct timespec. + * Returns 0 if condition occurs, ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const timespec* abstime) { + assert(mutex_); + interprocess_mutex* mutexImpl = + reinterpret_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + struct timespec currenttime; + Util::toTimespec(currenttime, Util::currentTime()); + + long tv_sec = abstime->tv_sec - currenttime.tv_sec; + long tv_nsec = abstime->tv_nsec - currenttime.tv_nsec; + if(tv_sec < 0) + tv_sec = 0; + if(tv_nsec < 0) + tv_nsec = 0; + + scoped_lock lock(*mutexImpl, accept_ownership_type()); + int res = timed_wait(lock, boost::get_system_time() + + boost::posix_time::seconds(tv_sec) + + boost::posix_time::microseconds(tv_nsec / 1000) + ) ? 0 : ETIMEDOUT; + lock.release(); + return res; + } + + /** + * Waits forever until the condition occurs. + * Returns 0 if condition occurs, or an error code otherwise. + */ + int waitForever() { + assert(mutex_); + interprocess_mutex* mutexImpl = + reinterpret_cast(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + scoped_lock lock(*mutexImpl, accept_ownership_type()); + ((interprocess_condition*)this)->wait(lock); + lock.release(); + return 0; + } + + + void notify() { + notify_one(); + } + + void notifyAll() { + notify_all(); + } + + private: + + void init(Mutex* mutex) { + mutex_ = mutex; + } + + boost::scoped_ptr ownedMutex_; + Mutex* mutex_; +}; + +Monitor::Monitor() : impl_(new Monitor::Impl()) {} +Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {} +Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {} + +Monitor::~Monitor() { delete impl_; } + +Mutex& Monitor::mutex() const { return const_cast(impl_)->mutex(); } + +void Monitor::lock() const { const_cast(impl_)->lock(); } + +void Monitor::unlock() const { const_cast(impl_)->unlock(); } + +void Monitor::wait(int64_t timeout) const { const_cast(impl_)->wait(timeout); } + +int Monitor::waitForTime(const timespec* abstime) const { + return const_cast(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTimeRelative(int64_t timeout_ms) const { + return const_cast(impl_)->waitForTimeRelative(timeout_ms); +} + +int Monitor::waitForever() const { + return const_cast(impl_)->waitForever(); +} + +void Monitor::notify() const { const_cast(impl_)->notify(); } + +void Monitor::notifyAll() const { const_cast(impl_)->notifyAll(); } + +}}} // apache::thrift::concurrency diff --git a/lib/cpp/src/concurrency/BoostMutex.cpp b/lib/cpp/src/concurrency/BoostMutex.cpp new file mode 100644 index 00000000..2277f615 --- /dev/null +++ b/lib/cpp/src/concurrency/BoostMutex.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif +#include "Mutex.h" +#include "Util.h" + +#include +#include +#include +#include + +using namespace boost::interprocess; + +namespace apache { namespace thrift { namespace concurrency { + +/** + * Implementation of Mutex class using boost interprocess mutex + * + * @version $Id:$ + */ +class Mutex::impl : public interprocess_mutex { +}; + +Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) {} + +void* Mutex::getUnderlyingImpl() const { return impl_.get(); } + +void Mutex::lock() const { impl_->lock(); } + +bool Mutex::trylock() const { return impl_->try_lock(); } + +bool Mutex::timedlock(int64_t ms) const { return impl_->timed_lock(boost::get_system_time()+boost::posix_time::milliseconds(ms)); } + +void Mutex::unlock() const { impl_->unlock(); } + +void Mutex::DEFAULT_INITIALIZER(void* arg) { +} + +}}} // apache::thrift::concurrency + diff --git a/lib/cpp/src/concurrency/BoostThreadFactory.cpp b/lib/cpp/src/concurrency/BoostThreadFactory.cpp new file mode 100644 index 00000000..55515282 --- /dev/null +++ b/lib/cpp/src/concurrency/BoostThreadFactory.cpp @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif +#include "BoostThreadFactory.h" +#include "Exception.h" + +#include + +#include +#include + +namespace apache { namespace thrift { namespace concurrency { + +using boost::shared_ptr; +using boost::weak_ptr; + +/** + * The boost thread class. + * + * @version $Id:$ + */ +class BoostThread: public Thread { + public: + + enum STATE { + uninitialized, + starting, + started, + stopping, + stopped + }; + + static void* threadMain(void* arg); + + private: + std::auto_ptr thread_; + STATE state_; + weak_ptr self_; + bool detached_; + + public: + + BoostThread(bool detached, shared_ptr runnable) : + state_(uninitialized), + detached_(detached) { + this->Thread::runnable(runnable); + } + + ~BoostThread() { + if(!detached_) { + try { + join(); + } catch(...) { + // We're really hosed. + } + } + } + + void start() { + if (state_ != uninitialized) { + return; + } + + // Create reference + shared_ptr* selfRef = new shared_ptr(); + *selfRef = self_.lock(); + + thread_ = std::auto_ptr(new boost::thread(boost::bind(threadMain, (void*)selfRef))); + + if(detached_) + thread_->detach(); + + state_ = starting; + } + + void join() { + if (!detached_ && state_ != uninitialized) { + thread_->join(); + } + } + + Thread::id_t getId() { + return thread_.get() ? thread_->get_id() : boost::thread::id(); + } + + shared_ptr runnable() const { return Thread::runnable(); } + + void runnable(shared_ptr value) { Thread::runnable(value); } + + void weakRef(shared_ptr self) { + assert(self.get() == this); + self_ = weak_ptr(self); + } +}; + +void* BoostThread::threadMain(void* arg) { + shared_ptr thread = *(shared_ptr*)arg; + delete reinterpret_cast*>(arg); + + if (thread == NULL) { + return (void*)0; + } + + if (thread->state_ != starting) { + return (void*)0; + } + + thread->state_ = started; + thread->runnable()->run(); + + if (thread->state_ != stopping && thread->state_ != stopped) { + thread->state_ = stopping; + } + return (void*)0; +} + +/** + * POSIX Thread factory implementation + */ +class BoostThreadFactory::Impl { + + private: + bool detached_; + + public: + + Impl(bool detached) : + detached_(detached) {} + + /** + * Creates a new POSIX thread to run the runnable object + * + * @param runnable A runnable object + */ + shared_ptr newThread(shared_ptr runnable) const { + shared_ptr result = shared_ptr(new BoostThread(detached_, runnable)); + result->weakRef(result); + runnable->thread(result); + return result; + } + + bool isDetached() const { return detached_; } + + void setDetached(bool value) { detached_ = value; } + + Thread::id_t getCurrentThreadId() const { + return boost::this_thread::get_id(); + } + +}; + +BoostThreadFactory::BoostThreadFactory(bool detached) : + impl_(new BoostThreadFactory::Impl(detached)) {} + +shared_ptr BoostThreadFactory::newThread(shared_ptr runnable) const { return impl_->newThread(runnable); } + +bool BoostThreadFactory::isDetached() const { return impl_->isDetached(); } + +void BoostThreadFactory::setDetached(bool value) { impl_->setDetached(value); } + +Thread::id_t BoostThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); } + +}}} // apache::thrift::concurrency diff --git a/lib/cpp/src/concurrency/BoostThreadFactory.h b/lib/cpp/src/concurrency/BoostThreadFactory.h new file mode 100644 index 00000000..a4667058 --- /dev/null +++ b/lib/cpp/src/concurrency/BoostThreadFactory.h @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1 + +#include "Thread.h" + +#include + +namespace apache { namespace thrift { namespace concurrency { + +/** + * A thread factory to create posix threads + * + * @version $Id:$ + */ +class BoostThreadFactory : public ThreadFactory { + + public: + + /** + * Boost thread factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + * the Runnable tasks they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * Threads are created with the specified boost policy, priority, stack-size. A detachable thread is not + * joinable. + * + * By default threads are not joinable. + */ + + BoostThreadFactory(bool detached=true); + + // From ThreadFactory; + boost::shared_ptr newThread(boost::shared_ptr runnable) const; + + // From ThreadFactory; + Thread::id_t getCurrentThreadId() const; + + /** + * Sets detached mode of threads + */ + virtual void setDetached(bool detached); + + /** + * Gets current detached mode + */ + virtual bool isDetached() const; + +private: + class Impl; + boost::shared_ptr impl_; +}; + +}}} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h index 4b1c3bf1..847aaf69 100644 --- a/lib/cpp/src/concurrency/Mutex.h +++ b/lib/cpp/src/concurrency/Mutex.h @@ -167,7 +167,7 @@ class RWGuard : boost::noncopyable { // A little hack to prevent someone from trying to do "Guard(m);" // Such a use is invalid because the temporary Guard object is -// destoryed at the end of the line, releasing the lock. +// destroyed at the end of the line, releasing the lock. // Sorry for polluting the global namespace, but I think it's worth it. #define Guard(m) incorrect_use_of_Guard(m) #define RWGuard(m) incorrect_use_of_RWGuard(m) diff --git a/lib/cpp/src/concurrency/PlatformThreadFactory.h b/lib/cpp/src/concurrency/PlatformThreadFactory.h new file mode 100644 index 00000000..9f053a03 --- /dev/null +++ b/lib/cpp/src/concurrency/PlatformThreadFactory.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1 + +#ifndef USE_BOOST_THREAD +# include +#else +# include +#endif + +namespace apache { namespace thrift { namespace concurrency { + +#ifndef USE_BOOST_THREAD + typedef PosixThreadFactory PlatformThreadFactory; +#include +#else + typedef BoostThreadFactory PlatformThreadFactory; +#endif + +}}} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp index fe5ba123..70204f11 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp @@ -195,7 +195,7 @@ void* PthreadThread::threadMain(void* arg) { ProfilerRegisterThread(); #endif - thread->state_ = starting; + thread->state_ = started; thread->runnable()->run(); if (thread->state_ != stopping && thread->state_ != stopped) { thread->state_ = stopping; diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index d4282adb..a9e15af4 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -24,6 +24,10 @@ #include #include +#ifdef USE_BOOST_THREAD +#include +#endif + namespace apache { namespace thrift { namespace concurrency { class Thread; @@ -68,7 +72,11 @@ class Thread { public: +#ifdef USE_BOOST_THREAD + typedef boost::thread::id id_t; +#else typedef uint64_t id_t; +#endif virtual ~Thread() {}; diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h index 2d7976e1..d9066b5d 100644 --- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include @@ -60,7 +60,7 @@ public: */ bool helloWorldTest() { - PosixThreadFactory threadFactory = PosixThreadFactory(); + PlatformThreadFactory threadFactory = PlatformThreadFactory(); shared_ptr task = shared_ptr(new ThreadFactoryTests::Task()); @@ -105,7 +105,7 @@ public: bool reapNThreads(int loop=1, int count=10) { - PosixThreadFactory threadFactory = PosixThreadFactory(); + PlatformThreadFactory threadFactory = PlatformThreadFactory(); Monitor* monitor = new Monitor(); @@ -203,7 +203,7 @@ public: shared_ptr task = shared_ptr(new SynchStartTask(monitor, state)); - PosixThreadFactory threadFactory = PosixThreadFactory(); + PlatformThreadFactory threadFactory = PlatformThreadFactory(); shared_ptr thread = threadFactory.newThread(task); @@ -307,7 +307,7 @@ public: const size_t _id; }; - void foo(PosixThreadFactory *tf) { + void foo(PlatformThreadFactory *tf) { (void) tf; } @@ -317,7 +317,7 @@ public: for(size_t lix = 0; lix < loop; lix++) { - PosixThreadFactory threadFactory = PosixThreadFactory(); + PlatformThreadFactory threadFactory = PlatformThreadFactory(); threadFactory.setDetached(true); for(size_t tix = 0; tix < count; tix++) { diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h index b6b5c3e4..e12201c0 100644 --- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -19,7 +19,7 @@ #include #include -#include +#include #include #include @@ -110,10 +110,11 @@ public: shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); - shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = shared_ptr(new PlatformThreadFactory()); +#ifndef USE_BOOST_THREAD threadFactory->setPriority(PosixThreadFactory::HIGHEST); - +#endif threadManager->threadFactory(threadFactory); threadManager->start(); @@ -249,10 +250,11 @@ public: shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount); - shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = shared_ptr(new PlatformThreadFactory()); +#ifndef USE_BOOST_THREAD threadFactory->setPriority(PosixThreadFactory::HIGHEST); - +#endif threadManager->threadFactory(threadFactory); threadManager->start(); diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h index b89074c0..41f1674d 100644 --- a/lib/cpp/src/concurrency/test/TimerManagerTests.h +++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h @@ -18,7 +18,7 @@ */ #include -#include +#include #include #include @@ -100,7 +100,7 @@ class TimerManagerTests { TimerManager timerManager; - timerManager.threadFactory(shared_ptr(new PosixThreadFactory())); + timerManager.threadFactory(shared_ptr(new PlatformThreadFactory())); timerManager.start(); diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp index f40135c5..6b816a4f 100644 --- a/lib/cpp/src/server/TThreadedServer.cpp +++ b/lib/cpp/src/server/TThreadedServer.cpp @@ -19,12 +19,14 @@ #include "server/TThreadedServer.h" #include "transport/TTransportException.h" -#include "concurrency/PosixThreadFactory.h" +#include #include #include -#include + +#ifdef HAVE_UNISTD_H #include +#endif namespace apache { namespace thrift { namespace server { @@ -123,7 +125,7 @@ void TThreadedServer::init() { stop_ = false; if (!threadFactory_) { - threadFactory_.reset(new PosixThreadFactory); + threadFactory_.reset(new PlatformThreadFactory); } } diff --git a/lib/cpp/src/transport/TFDTransport.cpp b/lib/cpp/src/transport/TFDTransport.cpp index b1479fa5..8a448fa2 100644 --- a/lib/cpp/src/transport/TFDTransport.cpp +++ b/lib/cpp/src/transport/TFDTransport.cpp @@ -22,7 +22,9 @@ #include +#ifdef HAVE_UNISTD_H #include +#endif using namespace std; diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index c6c31550..405c162d 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -47,12 +47,17 @@ #include #endif +#ifdef _WIN32 +#include +#endif + namespace apache { namespace thrift { namespace transport { using boost::scoped_ptr; using boost::shared_ptr; using namespace std; using namespace apache::thrift::protocol; +using namespace apache::thrift::concurrency; #ifndef HAVE_CLOCK_GETTIME @@ -102,13 +107,10 @@ TFileTransport::TFileTransport(string path, bool readOnly) , lastBadChunk_(0) , numCorruptedEventsInChunk_(0) , readOnly_(readOnly) + , notFull_(&mutex_) + , notEmpty_(&mutex_) + , flushed_(&mutex_) { - // initialize all the condition vars/mutexes - pthread_mutex_init(&mutex_, NULL); - pthread_cond_init(¬Full_, NULL); - pthread_cond_init(¬Empty_, NULL); - pthread_cond_init(&flushed_, NULL); - openLogFile(); } @@ -142,16 +144,25 @@ void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) { TFileTransport::~TFileTransport() { // flush the buffer if a writer thread is active +#ifdef USE_BOOST_THREAD + if(writerThreadId_.get()) { +#else if (writerThreadId_ > 0) { +#endif // set state to closing closing_ = true; // wake up the writer thread // Since closing_ is true, it will attempt to flush all data, then exit. - pthread_cond_signal(¬Empty_); + notEmpty_.notify(); +#ifdef USE_BOOST_THREAD + writerThreadId_->join(); + writerThreadId_.reset(); +#else pthread_join(writerThreadId_, NULL); writerThreadId_ = 0; +#endif } if (dequeueBuffer_) { @@ -191,12 +202,18 @@ bool TFileTransport::initBufferAndWriteThread() { return false; } +#ifdef USE_BOOST_THREAD + if(!writerThreadId_.get()) { + writerThreadId_ = std::auto_ptr(new boost::thread(boost::bind(startWriterThread, (void *)this))); + } +#else if (writerThreadId_ == 0) { if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) { T_ERROR("%s", "Could not create writer thread"); return false; } } +#endif dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); @@ -242,20 +259,19 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { toEnqueue->eventSize_ = eventLen + 4; // lock mutex - pthread_mutex_lock(&mutex_); + Guard g(mutex_); // make sure that enqueue buffer is initialized and writer thread is running if (!bufferAndThreadInitialized_) { if (!initBufferAndWriteThread()) { delete toEnqueue; - pthread_mutex_unlock(&mutex_); return; } } // Can't enqueue while buffer is full while (enqueueBuffer_->isFull()) { - pthread_cond_wait(¬Full_, &mutex_); + notFull_.wait(); } // We shouldn't be trying to enqueue new data while a forced flush is @@ -266,23 +282,21 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { // add to the buffer if (!enqueueBuffer_->addEvent(toEnqueue)) { delete toEnqueue; - pthread_mutex_unlock(&mutex_); return; } // signal anybody who's waiting for the buffer to be non-empty - pthread_cond_signal(¬Empty_); + notEmpty_.notify(); // this really should be a loop where it makes sure it got flushed // because condition variables can get triggered by the os for no reason // it is probably a non-factor for the time being - pthread_mutex_unlock(&mutex_); } bool TFileTransport::swapEventBuffers(struct timespec* deadline) { - pthread_mutex_lock(&mutex_); - bool swap; + Guard g(mutex_); + if (!enqueueBuffer_->isEmpty()) { swap = true; } else if (closing_) { @@ -292,10 +306,10 @@ bool TFileTransport::swapEventBuffers(struct timespec* deadline) { } else { if (deadline != NULL) { // if we were handed a deadline time struct, do a timed wait - pthread_cond_timedwait(¬Empty_, &mutex_, deadline); + notEmpty_.waitForTime(deadline); } else { // just wait until the buffer gets an item - pthread_cond_wait(¬Empty_, &mutex_); + notEmpty_.wait(); } // could be empty if we timed out @@ -308,11 +322,9 @@ bool TFileTransport::swapEventBuffers(struct timespec* deadline) { dequeueBuffer_ = temp; } - // unlock the mutex and signal if required - pthread_mutex_unlock(&mutex_); if (swap) { - pthread_cond_signal(¬Full_); + notFull_.notify(); } return swap; @@ -340,7 +352,11 @@ void TFileTransport::writerThread() { seekToEnd(); // throw away any partial events offset_ += readState_.lastDispatchPtr_; +#ifndef _WIN32 ftruncate(fd_, offset_); +#else + _chsize_s(fd_, offset_); +#endif readState_.resetAllValues(); } catch (...) { int errno_copy = errno; @@ -358,12 +374,18 @@ void TFileTransport::writerThread() { // this will only be true when the destructor is being invoked if (closing_) { if (hasIOError) { - pthread_exit(NULL); +#ifndef USE_BOOST_THREAD + pthread_exit(NULL); +#else + return; +#endif } // Try to empty buffers before exit if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { +#ifndef _WIN32 fsync(fd_); +#endif if (-1 == ::close(fd_)) { int errno_copy = errno; GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); @@ -371,8 +393,12 @@ void TFileTransport::writerThread() { //fd successfully closed fd_ = 0; } +#ifndef USE_BOOST_THREAD pthread_exit(NULL); - } +#else + return; +#endif + } } if (swapEventBuffers(&ts_next_flush)) { @@ -387,7 +413,11 @@ void TFileTransport::writerThread() { T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_); usleep(writerThreadIOErrorSleepTime_); if (closing_) { +#ifndef USE_BOOST_THREAD pthread_exit(NULL); +#else + return; +#endif } if (!fd_) { ::close(fd_); @@ -467,7 +497,8 @@ void TFileTransport::writerThread() { // time, it could have changed state in between. This will result in us // making inconsistent decisions. bool forced_flush = false; - pthread_mutex_lock(&mutex_); + { + Guard g(mutex_); if (forceFlush_) { if (!enqueueBuffer_->isEmpty()) { // If forceFlush_ is true, we need to flush all available data. @@ -479,12 +510,11 @@ void TFileTransport::writerThread() { // forceFlush_. Therefore the next time around the loop enqueueBuffer_ // is guaranteed to be empty. (I.e., we're guaranteed to make progress // and clear forceFlush_ the next time around the loop.) - pthread_mutex_unlock(&mutex_); continue; } forced_flush = true; - } - pthread_mutex_unlock(&mutex_); + } + } // determine if we need to perform an fsync bool flush = false; @@ -508,18 +538,19 @@ void TFileTransport::writerThread() { if (flush) { // sync (force flush) file to disk +#ifndef _WIN32 fsync(fd_); +#endif unflushed = 0; getNextFlushTime(&ts_next_flush); // notify anybody waiting for flush completion if (forced_flush) { - pthread_mutex_lock(&mutex_); + Guard g(mutex_); forceFlush_ = false; assert(enqueueBuffer_->isEmpty()); assert(dequeueBuffer_->isEmpty()); - pthread_cond_broadcast(&flushed_); - pthread_mutex_unlock(&mutex_); + flushed_.notifyAll(); } } } @@ -527,22 +558,26 @@ void TFileTransport::writerThread() { void TFileTransport::flush() { // file must be open for writing for any flushing to take place +#ifdef USE_BOOST_THREAD + if (!writerThreadId_.get()) { + return; + } +#else if (writerThreadId_ <= 0) { return; } +#endif // wait for flush to take place - pthread_mutex_lock(&mutex_); + Guard g(mutex_); // Indicate that we are requesting a flush forceFlush_ = true; // Wake up the writer thread so it will perform the flush immediately - pthread_cond_signal(¬Empty_); + notEmpty_.notify(); while (forceFlush_) { - pthread_cond_wait(&flushed_, &mutex_); + flushed_.wait(); } - - pthread_mutex_unlock(&mutex_); } @@ -892,9 +927,15 @@ uint32_t TFileTransport::getCurChunk() { // Utility Functions void TFileTransport::openLogFile() { +#ifndef _WIN32 mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH; int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND; fd_ = ::open(filename_.c_str(), flags, mode); +#else + int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE; + int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND; + fd_ = ::_open(filename_.c_str(), flags, mode); +#endif offset_ = 0; // make sure open call was successful diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index 2ea8c9af..b0e48d1b 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -27,15 +27,26 @@ #include #include +#ifdef HAVE_PTHREAD_H #include +#endif + +#ifdef USE_BOOST_THREAD +#include +#endif #include #include +#include "concurrency/Mutex.h" +#include "concurrency/Monitor.h" + namespace apache { namespace thrift { namespace transport { using apache::thrift::TProcessor; using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::concurrency::Mutex; +using apache::thrift::concurrency::Monitor; // Data pertaining to a single event typedef struct eventInfo { @@ -360,7 +371,11 @@ class TFileTransport : public TFileReaderTransport, static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; // writer thread id - pthread_t writerThreadId_; +#ifdef USE_BOOST_THREAD + std::auto_ptr writerThreadId_; +#else + pthread_t writerThreadId_; +#endif // buffers to hold data before it is flushed. Each element of the buffer stores a msg that // needs to be written to the file. The buffers are swapped by the writer thread. @@ -368,15 +383,15 @@ class TFileTransport : public TFileReaderTransport, TFileTransportBuffer *enqueueBuffer_; // conditions used to block when the buffer is full or empty - pthread_cond_t notFull_, notEmpty_; + Monitor notFull_, notEmpty_; volatile bool closing_; // To keep track of whether the buffer has been flushed - pthread_cond_t flushed_; + Monitor flushed_; volatile bool forceFlush_; // Mutex that is grabbed when enqueueing and swapping the read/write buffers - pthread_mutex_t mutex_; + Mutex mutex_; // File information std::string filename_; diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index 2db8f8b7..a0cc77a3 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -496,6 +496,12 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { } #endif +#ifdef _WIN32 + if(errno_copy == WSAECONNRESET) { + return 0; // EOF + } +#endif + // Now it's not a try again case, but a real probblez GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy); diff --git a/lib/cpp/src/windows/config.h b/lib/cpp/src/windows/config.h index 0f9a3043..2db25967 100644 --- a/lib/cpp/src/windows/config.h +++ b/lib/cpp/src/windows/config.h @@ -32,6 +32,7 @@ #pragma warning(disable: 4250) // Inherits via dominance. #define HAVE_GETTIMEOFDAY 1 +#define HAVE_SYS_STAT_H 1 #include "TargetVersion.h" #include "GetTimeOfDay.h" @@ -53,13 +54,25 @@ typedef boost::uint8_t uint8_t; #pragma comment(lib, "Ws2_32.lib") // pthreads -#include +#if 0 +# include +#else +struct timespec { + long tv_sec; + long tv_nsec; +}; +# define USE_BOOST_THREAD 1 +# define ctime_r( _clock, _buf ) \ + ( strcpy( (_buf), ctime( (_clock) ) ), \ + (_buf) ) +#endif typedef ptrdiff_t ssize_t; // Missing functions. #define usleep(ms) Sleep(ms) +#if WINVER <= 0x0502 #define poll(fds, nfds, timeout) \ poll_win32(fds, nfds, timeout) @@ -80,6 +93,10 @@ inline int poll_win32(LPWSAPOLLFD fdArray, ULONG fds, INT timeout) timeval time_out = {timeout * 0.001, timeout * 1000}; return select(1, &read_fds, &write_fds, &except_fds, &time_out); } +#else +# define poll(fds, nfds, timeout) \ + WSAPoll(fds, nfds, timeout) +#endif // WINVER inline void close(SOCKET socket) { diff --git a/test/cpp/Makefile.am b/test/cpp/Makefile.am index 6c62cfbc..95574fc5 100755 --- a/test/cpp/Makefile.am +++ b/test/cpp/Makefile.am @@ -95,9 +95,9 @@ gen-cpp/ThriftTest.cpp gen-cpp/StressTest_types.cpp gen-cpp/StressTest_constants INCLUDES = \ -I$(top_srcdir)/lib/cpp/src -Igen-cpp -AM_CPPFLAGS = $(BOOST_CPPFLAGS) +AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(LIBEVENT_CPPFLAGS) AM_CXXFLAGS = -Wall -AM_LDFLAGS = $(BOOST_LDFLAGS) +AM_LDFLAGS = $(BOOST_LDFLAGS) $(LIBEVENT_LDFLAGS) clean-local: $(RM) -r gen-cpp @@ -109,5 +109,3 @@ EXTRA_DIST = \ src/StressTestNonBlocking.cpp \ realloc/realloc_test.c \ realloc/Makefile - - diff --git a/test/cpp/src/StressTest.cpp b/test/cpp/src/StressTest.cpp index 4892722d..339e7d13 100755 --- a/test/cpp/src/StressTest.cpp +++ b/test/cpp/src/StressTest.cpp @@ -18,7 +18,7 @@ */ #include -#include +#include #include #include #include @@ -326,7 +326,7 @@ int main(int argc, char **argv) { cerr << usage; } - shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = shared_ptr(new PlatformThreadFactory()); // Dispatcher shared_ptr serviceHandler(new Server()); diff --git a/test/cpp/src/StressTestNonBlocking.cpp b/test/cpp/src/StressTestNonBlocking.cpp index 0d8bc3ab..2ff507b7 100755 --- a/test/cpp/src/StressTestNonBlocking.cpp +++ b/test/cpp/src/StressTestNonBlocking.cpp @@ -18,7 +18,7 @@ */ #include -#include +#include #include #include #include @@ -321,7 +321,7 @@ int main(int argc, char **argv) { cerr << usage; } - shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = shared_ptr(new PlatformThreadFactory()); // Dispatcher shared_ptr serviceHandler(new Server()); diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp index 9aed5510..c1d6e070 100755 --- a/test/cpp/src/TestClient.cpp +++ b/test/cpp/src/TestClient.cpp @@ -46,8 +46,6 @@ using namespace apache::thrift::transport; using namespace thrift::test; using namespace apache::thrift::async; -using std::tr1::placeholders::_1; - //extern uint32_t g_socket_syscalls; // Current time, microseconds since the epoch @@ -86,7 +84,7 @@ static void testVoid_clientReturn(const char* host, int port, event_base *base, delete client; shared_ptr channel(new TEvhttpClientChannel(host, "/", host, port, base)); client = new ThriftTestCobClient(channel, protocolFactory); - client->testString(tr1::bind(testString_clientReturn, host, port, base, protocolFactory, _1), "Test"); + client->testString(tr1::bind(testString_clientReturn, host, port, base, protocolFactory, std::tr1::placeholders::_1), "Test"); } catch (TException& exn) { cout << "Error: " << exn.what() << endl; } @@ -211,7 +209,7 @@ int main(int argc, char** argv) { shared_ptr channel(new TEvhttpClientChannel(host.c_str(), "/", host.c_str(), port, base)); ThriftTestCobClient* client = new ThriftTestCobClient(channel, protocolFactory.get()); - client->testVoid(tr1::bind(testVoid_clientReturn, host.c_str(), port, base, protocolFactory.get(), _1)); + client->testVoid(tr1::bind(testVoid_clientReturn, host.c_str(), port, base, protocolFactory.get(), std::tr1::placeholders::_1)); event_base_loop(base, 0); return 0; diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 4afbef68..456577f8 100755 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -21,7 +21,7 @@ #include #include -#include +#include #include #include #include @@ -641,8 +641,8 @@ int main(int argc, char **argv) { shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workers); - shared_ptr threadFactory = - shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = + shared_ptr(new PlatformThreadFactory()); threadManager->threadFactory(threadFactory); diff --git a/test/threads/ThreadsClient.cpp b/test/threads/ThreadsClient.cpp index 85274a63..f5c076f2 100644 --- a/test/threads/ThreadsClient.cpp +++ b/test/threads/ThreadsClient.cpp @@ -25,9 +25,9 @@ #include #include #include -#include -#include -#include +#include +#include +#include using boost::shared_ptr; using namespace apache::thrift; diff --git a/test/threads/ThreadsServer.cpp b/test/threads/ThreadsServer.cpp index 8734ee89..8420c2f0 100644 --- a/test/threads/ThreadsServer.cpp +++ b/test/threads/ThreadsServer.cpp @@ -26,9 +26,9 @@ #include #include #include -#include -#include -#include +#include +#include +#include using boost::shared_ptr; using namespace apache::thrift; @@ -111,8 +111,8 @@ int main(int argc, char **argv) { /* shared_ptr threadManager = ThreadManager::newSimpleThreadManager(10); - shared_ptr threadFactory = - shared_ptr(new PosixThreadFactory()); + shared_ptr threadFactory = + shared_ptr(new PlatformThreadFactory()); threadManager->threadFactory(threadFactory); threadManager->start(); -- 2.17.1