From 6f038a7b60337c6b00ff7685db9ad4d527b68e62 Mon Sep 17 00:00:00 2001 From: Marc Slemko Date: Thu, 3 Aug 2006 18:58:09 +0000 Subject: [PATCH] Converted concurrency classes to use boost::shared_ptr and boost::weak_ptr: Wrapped all thrift code in facebook::thrift:: namespace git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664735 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 59 +++--- lib/cpp/aclocal/ax_boost_base.m4 | 186 ++++++++++++++++++ lib/cpp/bootstrap.sh | 2 +- lib/cpp/configure.ac | 2 + lib/cpp/src/TProcessor.h | 6 + lib/cpp/src/concurrency/Monitor.cc | 48 +++-- lib/cpp/src/concurrency/PosixThreadFactory.cc | 52 +++-- lib/cpp/src/concurrency/PosixThreadFactory.h | 8 +- lib/cpp/src/concurrency/Thread.h | 24 ++- lib/cpp/src/concurrency/ThreadManager.cc | 77 ++++---- lib/cpp/src/concurrency/ThreadManager.h | 21 +- lib/cpp/src/concurrency/TimerManager.cc | 46 +++-- lib/cpp/src/concurrency/TimerManager.h | 22 ++- lib/cpp/src/concurrency/Util.h | 8 +- .../src/concurrency/test/ThreadFactoryTests.h | 48 +++-- .../src/concurrency/test/ThreadManagerTests.h | 36 ++-- .../src/concurrency/test/TimerManagerTests.h | 48 ++--- lib/cpp/src/protocol/TBinaryProtocol.cc | 3 + lib/cpp/src/protocol/TBinaryProtocol.h | 5 + lib/cpp/src/protocol/TProtocol.h | 7 + lib/cpp/src/server/TServer.h | 6 + lib/cpp/src/server/TSimpleServer.cc | 4 + lib/cpp/src/server/TSimpleServer.h | 4 + lib/cpp/src/transport/TBufferedTransport.cc | 4 + lib/cpp/src/transport/TBufferedTransport.h | 4 + lib/cpp/src/transport/TChunkedTransport.cc | 4 + lib/cpp/src/transport/TChunkedTransport.h | 4 + lib/cpp/src/transport/TNullTransport.h | 4 + lib/cpp/src/transport/TServerSocket.cc | 4 + lib/cpp/src/transport/TServerSocket.h | 4 + lib/cpp/src/transport/TServerTransport.h | 4 + lib/cpp/src/transport/TSocket.cc | 3 + lib/cpp/src/transport/TSocket.h | 3 + lib/cpp/src/transport/TTransport.h | 4 + lib/cpp/src/transport/TTransportException.h | 4 + 35 files changed, 558 insertions(+), 210 deletions(-) create mode 100644 lib/cpp/aclocal/ax_boost_base.m4 diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 822a2c7f..1ec11d49 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -1,30 +1,40 @@ -lib_LTLIBRARIES = libconcurrency.la \ - libthrift.la +#lib_LTLIBRARIES = libthrift.la + +lib_LIBRARIES = libthrift.a + +common_cxxflags = -Isrc $(BOOST_CPPFLAGS) +common_ldflags = $(BOOST_LDFLAGS) # Define the source file for the module -libconcurrency_la_SOURCES = src/concurrency/Monitor.cc \ - src/concurrency/PosixThreadFactory.cc \ - src/concurrency/ThreadManager.cc \ - src/concurrency/TimerManager.cc +libthrift_sources = src/concurrency/Monitor.cc \ + src/concurrency/PosixThreadFactory.cc \ + src/concurrency/ThreadManager.cc \ + src/concurrency/TimerManager.cc \ + src/protocol/TBinaryProtocol.cc \ + src/transport/TBufferedTransport.cc \ + src/transport/TChunkedTransport.cc \ + src/transport/TSocket.cc \ + src/transport/TServerSocket.cc \ + src/server/TSimpleServer.cc \ + src/server/TThreadPoolServer.cc -libconcurrency_inst_headers = src/concurrency/Exception.h \ - src/concurrency/Monitor.h \ - src/concurrency/PosixThreadFactory.h \ - src/concurrency/Thread.h \ - src/concurrency/ThreadManager.h \ - src/concurrency/TimerManager.h +libthrift_a_SOURCES = $(libthrift_sources) +#libthrift_la_SOURCES = $(libthrift_sources) -libthrift_la_SOURCES = src/protocol/TBinaryProtocol.cc \ - src/transport/TBufferedTransport.cc \ - src/transport/TChunkedTransport.cc \ - src/transport/TSocket.cc \ - src/transport/TServerSocket.cc \ - src/server/TSimpleServer.cc +libthrift_cxxflags = $(common_cxxflags) +libthrift_ldflags = $(common_ldflags) -libthrift_la_CXXFLAGS = -Isrc +libthrift_la_CXXFLAGS = $(libthrift_cxxflags) +libthrift_a_CXXFLAGS = $(libthrift_cxxflags) -libthrift_inst_headers = src/protocol/TBinaryProtocol.h \ +libthrift_inst_headers = src/concurrency/Exception.h \ + src/concurrency/Monitor.h \ + src/concurrency/PosixThreadFactory.h \ + src/concurrency/Thread.h \ + src/concurrency/ThreadManager.h \ + src/concurrency/TimerManager.h \ + src/protocol/TBinaryProtocol.h \ src/protocol/TProtocol.h \ src/transport/TBufferedTransport.h \ src/transport/TChunkedTransport.h \ @@ -34,7 +44,9 @@ libthrift_inst_headers = src/protocol/TBinaryProtocol.h \ src/transport/TSocket.h \ src/transport/TTransport.h \ src/transport/TTransport.h \ - src/transport/TTransportException.h + src/transport/TTransportException.h \ + src/server/TSimpleServer.h \ + src/server/TThreadPoolServer.h bin_PROGRAMS = concurrency_test @@ -46,6 +58,7 @@ concurrency_test_SOURCES = src/concurrency/test/Tests.cc \ src/concurrency/test/ThreadManagerTests.h \ src/concurrency/test/TimerManagerTests.h -concurrency_test_LDADD = libconcurrency.la +concurrency_test_LDADD = libthrift.a -concurrency_test_CXXFLAGS = -Isrc/concurrency +concurrency_test_CXXFLAGS = $(common_cxxflags) +concurrency_test_LDFLAGS = $(common_ldflags) diff --git a/lib/cpp/aclocal/ax_boost_base.m4 b/lib/cpp/aclocal/ax_boost_base.m4 new file mode 100644 index 00000000..b7ec8b51 --- /dev/null +++ b/lib/cpp/aclocal/ax_boost_base.m4 @@ -0,0 +1,186 @@ +dnl @synopsis AX_BOOST([MINIMUM-VERSION]) +dnl +dnl Test for the Boost C++ libraries of a particular version (or newer) +dnl +dnl If no path to the installed boost library is given the macro +dnl searchs under /usr, /usr/local, and /opt, and evaluates the +dnl $BOOST_ROOT environment variable. Further documentation is +dnl available at . +dnl +dnl This macro calls: +dnl +dnl AC_SUBST(BOOST_CPPFLAGS) / AC_SUBST(BOOST_LDFLAGS) +dnl +dnl And sets: +dnl +dnl HAVE_BOOST +dnl +dnl @category InstalledPackages +dnl @category Cxx +dnl @author Thomas Porschberg +dnl @version 2006-06-15 +dnl @license AllPermissive + +AC_DEFUN([AX_BOOST_BASE], +[ +AC_ARG_WITH([boost], + AS_HELP_STRING([--with-boost@<:@=DIR@:>@], [use boost (default is No) - it is possible to specify the root directory for boost (optional)]), + [ + if test "$withval" = "no"; then + want_boost="no" + elif test "$withval" = "yes"; then + want_boost="yes" + ac_boost_path="" + else + want_boost="yes" + ac_boost_path="$withval" + fi + ], + [want_boost="no"]) + +if test "x$want_boost" = "xyes"; then + boost_lib_version_req=ifelse([$1], ,1.20.0,$1) + boost_lib_version_req_shorten=`expr $boost_lib_version_req : '\([[0-9]]*\.[[0-9]]*\)'` + boost_lib_version_req_major=`expr $boost_lib_version_req : '\([[0-9]]*\)'` + boost_lib_version_req_minor=`expr $boost_lib_version_req : '[[0-9]]*\.\([[0-9]]*\)'` + boost_lib_version_req_sub_minor=`expr $boost_lib_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'` + if test "x$boost_lib_version_req_sub_minor" = "x" ; then + boost_lib_version_req_sub_minor="0" + fi + WANT_BOOST_VERSION=`expr $boost_lib_version_req_major \* 100000 \+ $boost_lib_version_req_minor \* 100 \+ $boost_lib_version_req_sub_minor` + AC_MSG_CHECKING(for boostlib >= $boost_lib_version_req) + succeeded=no + + dnl first we check the system location for boost libraries + dnl this location ist chosen if boost libraries are installed with the --layout=system option + dnl or if you install boost with RPM + if test "$ac_boost_path" != ""; then + BOOST_LDFLAGS="-L$ac_boost_path/lib" + BOOST_CPPFLAGS="-I$ac_boost_path/include" + else + for ac_boost_path_tmp in /usr /usr/local /opt ; do + if test -d "$ac_boost_path_tmp/include/boost" && test -r "$ac_boost_path_tmp/include/boost"; then + BOOST_LDFLAGS="-L$ac_boost_path_tmp/lib" + BOOST_CPPFLAGS="-I$ac_boost_path_tmp/include" + break; + fi + done + fi + + CPPFLAGS_SAVED="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS" + export CPPFLAGS + + LDFLAGS_SAVED="$LDFLAGS" + LDFLAGS="$LDFLAGS $BOOST_LDFLAGS" + export LDFLAGS + + AC_LANG_PUSH(C++) + AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[ + @%:@include + ]], [[ + #if BOOST_VERSION >= $WANT_BOOST_VERSION + // Everything is okay + #else + # error Boost version is too old + #endif + ]])],[ + AC_MSG_RESULT(yes) + succeeded=yes + found_system=yes + ],[ + ]) + AC_LANG_POP([C++]) + + + + dnl if we found no boost with system layout we search for boost libraries + dnl built and installed without the --layout=system option or for a staged(not installed) version + if test "x$succeeded" != "xyes"; then + _version=0 + if test "$ac_boost_path" != ""; then + BOOST_LDFLAGS="-L$ac_boost_path/lib" + if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then + for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do + _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'` + V_CHECK=`expr $_version_tmp \> $_version` + if test "$V_CHECK" = "1" ; then + _version=$_version_tmp + fi + VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'` + BOOST_CPPFLAGS="-I$ac_boost_path/include/boost-$VERSION_UNDERSCORE" + done + fi + else + for ac_boost_path in /usr /usr/local /opt ; do + if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then + for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do + _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'` + V_CHECK=`expr $_version_tmp \> $_version` + if test "$V_CHECK" = "1" ; then + _version=$_version_tmp + best_path=$ac_boost_path + fi + done + fi + done + + VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'` + BOOST_CPPFLAGS="-I$best_path/include/boost-$VERSION_UNDERSCORE" + BOOST_LDFLAGS="-L$best_path/lib" + + if test "x$BOOST_ROOT" != "x"; then + if test -d "$BOOST_ROOT" && test -r "$BOOST_ROOT" && test -d "$BOOST_ROOT/stage/lib" && test -r "$BOOST_ROOT/stage/lib"; then + version_dir=`expr //$BOOST_ROOT : '.*/\(.*\)'` + stage_version=`echo $version_dir | sed 's/boost_//' | sed 's/_/./g'` + stage_version_shorten=`expr $stage_version : '\([[0-9]]*\.[[0-9]]*\)'` + V_CHECK=`expr $stage_version_shorten \>\= $_version` + if test "$V_CHECK" = "1" ; then + AC_MSG_NOTICE(We will use a staged boost library from $BOOST_ROOT) + BOOST_CPPFLAGS="-I$BOOST_ROOT" + BOOST_LDFLAGS="-L$BOOST_ROOT/stage/lib" + fi + fi + fi + fi + + CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS" + export CPPFLAGS + LDFLAGS="$LDFLAGS $BOOST_LDFLAGS" + export LDFLAGS + + AC_LANG_PUSH(C++) + AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[ + @%:@include + ]], [[ + #if BOOST_VERSION >= $WANT_BOOST_VERSION + // Everything is okay + #else + # error Boost version is too old + #endif + ]])],[ + AC_MSG_RESULT(yes) + succeeded=yes + found_system=yes + ],[ + ]) + AC_LANG_POP([C++]) + fi + + if test "$succeeded" != "yes" ; then + if test "$_version" = "0" ; then + AC_MSG_ERROR([[We could not detect the boost libraries (version $boost_lib_version_req_shorten or higher). If you have a staged boost library (still not installed) please specify \$BOOST_ROOT in your environment and do not give a PATH to --with-boost option. If you are sure you have boost installed, then check your version number looking in . See http://randspringer.de/boost for more documentation.]]) + else + AC_MSG_NOTICE([Your boost libraries seems to old (version $_version).]) + fi + else + AC_SUBST(BOOST_CPPFLAGS) + AC_SUBST(BOOST_LDFLAGS) + AC_DEFINE(HAVE_BOOST,,[define if the Boost library is available]) + fi + + CPPFLAGS="$CPPFLAGS_SAVED" + LDFLAGS="$LDFLAGS_SAVED" +fi + +]) diff --git a/lib/cpp/bootstrap.sh b/lib/cpp/bootstrap.sh index e85fd2b7..607655d0 100755 --- a/lib/cpp/bootstrap.sh +++ b/lib/cpp/bootstrap.sh @@ -33,7 +33,7 @@ missing autoscan autoheader -aclocal +aclocal -I ./aclocal libtoolize --automake touch NEWS README AUTHORS ChangeLog autoconf diff --git a/lib/cpp/configure.ac b/lib/cpp/configure.ac index 77a4f328..ee9995e6 100644 --- a/lib/cpp/configure.ac +++ b/lib/cpp/configure.ac @@ -32,6 +32,8 @@ AC_CHECK_HEADERS([sys/time.h]) AC_CHECK_HEADERS([unistd.h]) +AX_BOOST_BASE([1.33.1]) + AC_CHECK_LIB(pthread, pthread_create) AC_CHECK_LIB(rt, sched_get_priority_min) diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h index f01379dc..2ed24304 100644 --- a/lib/cpp/src/TProcessor.h +++ b/lib/cpp/src/TProcessor.h @@ -4,6 +4,10 @@ #include #include "transport/TTransport.h" +namespace facebook { namespace thrift { + +using namespace facebook::thrift::transport; + /** * A processor is a generic object that acts upon two streams of data, one * an input and the other an output. The definition of this object is loose, @@ -21,4 +25,6 @@ class TProcessor { TProcessor() {} }; +}} // facebook::thrift + #endif diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc index b1d7b726..7493ec39 100644 --- a/lib/cpp/src/concurrency/Monitor.cc +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -22,35 +22,26 @@ class Monitor::Impl { public: Impl() : - mutexInitialized(false) { + mutexInitialized(false), + condInitialized(false) { - /* XXX - Need to fix this to handle failures without leaking. */ + try { - assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); + assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); - mutexInitialized = true; + mutexInitialized = true; - assert(pthread_cond_init(&_pthread_cond, NULL) == 0); - } - - ~Impl() { - - if(mutexInitialized) { + assert(pthread_cond_init(&_pthread_cond, NULL) == 0); - mutexInitialized = false; + condInitialized = true; - assert(pthread_mutex_destroy(&_pthread_mutex) == 0); - } - - if(condInitialized) { - - condInitialized = false; - - assert(pthread_cond_destroy(&_pthread_cond) == 0); + } catch(...) { + cleanup(); } } + ~Impl() {cleanup();} + void lock() const {pthread_mutex_lock(&_pthread_mutex);} void unlock() const {pthread_mutex_unlock(&_pthread_mutex);} @@ -98,6 +89,23 @@ class Monitor::Impl { private: + void cleanup() { + + if(mutexInitialized) { + + mutexInitialized = false; + + assert(pthread_mutex_destroy(&_pthread_mutex) == 0); + } + + if(condInitialized) { + + condInitialized = false; + + assert(pthread_cond_destroy(&_pthread_cond) == 0); + } + } + mutable pthread_mutex_t _pthread_mutex; mutable bool mutexInitialized; diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index 00b2613d..f07db86a 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -3,8 +3,14 @@ #include #include +#include + +#include + namespace facebook { namespace thrift { namespace concurrency { +using namespace boost; + /** The POSIX thread class. @author marc @@ -36,18 +42,23 @@ private: int _stackSize; + weak_ptr _self; + public: - PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) : + PthreadThread(int policy, int priority, int stackSize, shared_ptr runnable) : _pthread(0), _state(uninitialized), _policy(policy), _priority(priority), - _stackSize(stackSize) { + _stackSize(stackSize) { this->Thread::runnable(runnable); } + ~PthreadThread() { + } + void start() { if(_state != uninitialized) { @@ -75,9 +86,13 @@ public: // Set thread priority - // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); + assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); + + shared_ptr* selfRef = new shared_ptr(); + + *selfRef = _self.lock(); - assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0); + assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0); } void join() { @@ -90,17 +105,27 @@ public: } } - Runnable* runnable() const {return Thread::runnable();} + shared_ptr runnable() const {return Thread::runnable();} - void runnable(Runnable* value) {Thread::runnable(value);} + void runnable(shared_ptr value) {Thread::runnable(value);} + void weakRef(shared_ptr self) { + assert(self.get() == this); + _self = weak_ptr(self); + } }; void* PthreadThread::threadMain(void* arg) { // XXX need a lock here when testing thread state - PthreadThread* thread = (PthreadThread*)arg; - + shared_ptr thread = *(shared_ptr*)arg; + + delete reinterpret_cast*>(arg); + + if(thread == NULL) { + return (void*)0; + } + if(thread->_state != starting) { return (void*)0; } @@ -184,9 +209,12 @@ public: @param runnable A runnable object */ - Thread* newThread(Runnable* runnable) const { + shared_ptr newThread(shared_ptr runnable) const { - return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable); + shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable)); + result->weakRef(result); + runnable->thread(result); + return result; } int stackSize() const { return _stackSize;} @@ -198,7 +226,7 @@ public: /** Sets priority. XXX - Need to handle incremental priorities properl. */ + Need to handle incremental priorities properly. */ void priority(PRIORITY value) { _priority = value;} @@ -207,7 +235,7 @@ public: PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} -Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);} +shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const {return _impl->newThread(runnable);} int PosixThreadFactory::stackSize() const {return _impl->stackSize();} diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index b42981b7..0095cf81 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -3,8 +3,12 @@ #include "Thread.h" +#include + namespace facebook { namespace thrift { namespace concurrency { +using namespace boost; + /** A thread factory to create posix threads @author marc @@ -43,7 +47,7 @@ class PosixThreadFactory : public ThreadFactory { // From ThreadFactory; - Thread* newThread(Runnable* runnable) const; + shared_ptr newThread(shared_ptr runnable) const; /** Sets stack size for created threads @@ -69,7 +73,7 @@ class PosixThreadFactory : public ThreadFactory { class Impl; - Impl* _impl; + shared_ptr _impl; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index 2416887e..ea6b999f 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -1,8 +1,13 @@ #if !defined(_concurrency_Thread_h_) #define _concurrency_Thread_h_ 1 +#include +#include + namespace facebook { namespace thrift { namespace concurrency { +using namespace boost; + class Thread; /** Minimal runnable class. More or less analogous to java.lang.Runnable. @@ -18,17 +23,17 @@ class Runnable { virtual void run() = 0; - virtual Thread* thread() {return _thread;} + /** Gets the thread object that is hosting this runnable object - can return an empty shared pointer if no references remain on thet thread object */ - private: + virtual shared_ptr thread() {return _thread.lock();} /** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */ - friend class Thread; + virtual void thread(shared_ptr value) {_thread = value;} - virtual void thread(Thread* value) {_thread = value;} + private: - Thread* _thread; + weak_ptr _thread; }; /** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread @@ -55,15 +60,14 @@ class Thread { /** Gets the runnable object this thread is hosting */ - virtual Runnable* runnable() const {return _runnable;} + virtual shared_ptr runnable() const {return _runnable;} protected: - virtual void runnable(Runnable* value, bool x=false) {_runnable = value; _runnable->thread(this);} + virtual void runnable(shared_ptr value) {_runnable = value;} private: - - Runnable* _runnable; + shared_ptr _runnable; }; /** Factory to create platform-specific thread object and bind them to Runnable object for execution */ @@ -74,7 +78,7 @@ class ThreadFactory { virtual ~ThreadFactory() {} - virtual Thread* newThread(Runnable* runnable) const = 0; + virtual shared_ptr newThread(shared_ptr runnable) const = 0; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc index ca2bbb5f..a5b8f05b 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cc +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -2,12 +2,20 @@ #include "Exception.h" #include "Monitor.h" +#include + #include #include #include +#if defined(DEBUG) +#include +#endif //defined(DEBUG) + namespace facebook { namespace thrift { namespace concurrency { +using namespace boost; + /** ThreadManager class @@ -21,9 +29,16 @@ class ThreadManager::Impl : public ThreadManager { public: - Impl() : _state(ThreadManager::UNINITIALIZED) {} + Impl() : + _workerCount(0), + _workerMaxCount(0), + _idleCount(0), + _state(ThreadManager::UNINITIALIZED) + {} - ~Impl() {stop();} + ~Impl() { + stop(); + } void start(); @@ -33,14 +48,14 @@ class ThreadManager::Impl : public ThreadManager { return _state; }; - const ThreadFactory* threadFactory() const { + shared_ptr threadFactory() const { Synchronized s(_monitor); return _threadFactory; } - void threadFactory(const ThreadFactory* value) { + void threadFactory(shared_ptr value) { Synchronized s(_monitor); @@ -74,9 +89,9 @@ class ThreadManager::Impl : public ThreadManager { return _tasks.size() + _workerCount - _idleCount; } - void add(Runnable* value); + void add(shared_ptr value); - void remove(Runnable* task); + void remove(shared_ptr task); private: @@ -88,11 +103,11 @@ private: ThreadManager::STATE _state; - const ThreadFactory* _threadFactory; + shared_ptr _threadFactory; friend class ThreadManager::Task; - std::queue _tasks; + std::queue > _tasks; Monitor _monitor; @@ -100,9 +115,9 @@ private: friend class ThreadManager::Worker; - std::set _workers; + std::set > _workers; - std::set _deadWorkers; + std::set > _deadWorkers; }; class ThreadManager::Task : public Runnable { @@ -115,7 +130,7 @@ public: COMPLETE }; - Task(Runnable* runnable) : + Task(shared_ptr runnable) : _runnable(runnable), _state(WAITING) {} @@ -131,7 +146,7 @@ public: private: - Runnable* _runnable; + shared_ptr _runnable; friend class ThreadManager::Worker; @@ -199,7 +214,7 @@ class ThreadManager::Worker: public Runnable { while(active) { - ThreadManager::Task* task = NULL; + shared_ptr task; /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop). @@ -260,10 +275,6 @@ class ThreadManager::Worker: public Runnable { // XXX need to log this } - - delete task; - - task = NULL; } } } @@ -294,13 +305,13 @@ class ThreadManager::Worker: public Runnable { void ThreadManager::Impl::addWorker(size_t value) { - std::set newThreads; + std::set > newThreads; for(size_t ix = 0; ix < value; ix++) { class ThreadManager::Worker; - ThreadManager::Worker* worker = new ThreadManager::Worker(this); + shared_ptr worker = shared_ptr(new ThreadManager::Worker(this)); newThreads.insert(_threadFactory->newThread(worker)); } @@ -312,9 +323,9 @@ void ThreadManager::Impl::addWorker(size_t value) { _workers.insert(newThreads.begin(), newThreads.end()); } - for(std::set::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { + for(std::set >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { - ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable(); + shared_ptr worker = dynamic_pointer_cast((*ix)->runnable()); worker->_state = ThreadManager::Worker::STARTING; @@ -378,13 +389,14 @@ void ThreadManager::Impl::stop() { _state = ThreadManager::STOPPING; } - // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid + // XXX + // should be able to block here for transition to STOPPED since we're now using shared_ptrs } void ThreadManager::Impl::removeWorker(size_t value) { - std::set removedThreads; + std::set > removedThreads; {Synchronized s(_monitor); @@ -413,20 +425,17 @@ void ThreadManager::Impl::removeWorker(size_t value) { _workerMonitor.wait(); } - for(std::set::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { + for(std::set >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { _workers.erase(*ix); - delete (*ix)->runnable(); - - delete (*ix); } _deadWorkers.clear(); } } -void ThreadManager::Impl::add(Runnable* value) { +void ThreadManager::Impl::add(shared_ptr value) { Synchronized s(_monitor); @@ -435,7 +444,7 @@ void ThreadManager::Impl::add(Runnable* value) { throw IllegalStateException(); } - _tasks.push(new ThreadManager::Task(value)); + _tasks.push(shared_ptr(new ThreadManager::Task(value))); /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this task in time. */ @@ -446,7 +455,7 @@ void ThreadManager::Impl::add(Runnable* value) { } } -void ThreadManager::Impl::remove(Runnable* task) { +void ThreadManager::Impl::remove(shared_ptr task) { Synchronized s(_monitor); @@ -479,12 +488,12 @@ private: }; -ThreadManager* ThreadManager::newThreadManager() { - return new ThreadManager::Impl(); +shared_ptr ThreadManager::newThreadManager() { + return shared_ptr(new ThreadManager::Impl()); } -ThreadManager* ThreadManager::newSimpleThreadManager(size_t count) { - return new SimpleThreadManager(count); +shared_ptr ThreadManager::newSimpleThreadManager(size_t count) { + return shared_ptr(new SimpleThreadManager(count)); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index aa5a98a9..f3656435 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -1,12 +1,16 @@ #if !defined(_concurrency_ThreadManager_h_) #define _concurrency_ThreadManager_h_ 1 +#include + #include #include "Thread.h" namespace facebook { namespace thrift { namespace concurrency { +using namespace boost; + /** Thread Pool Manager and related classes @author marc @@ -52,9 +56,9 @@ class ThreadManager { virtual const STATE state() const = 0; - virtual const ThreadFactory* threadFactory() const = 0; + virtual shared_ptr threadFactory() const = 0; - virtual void threadFactory(const ThreadFactory* value) = 0; + virtual void threadFactory(shared_ptr value) = 0; virtual void addWorker(size_t value=1) = 0; @@ -76,19 +80,22 @@ class ThreadManager { virtual size_t totalTaskCount() const = 0; - /** Adds a task to be execued at some time in the future by a worker thread. */ + /** Adds a task to be execued at some time in the future by a worker thread. + + @param value The task to run */ + - virtual void add(Runnable* value) = 0; + virtual void add(shared_ptrvalue) = 0; /** Removes a pending task */ - virtual void remove(Runnable* task) = 0; + virtual void remove(shared_ptr task) = 0; - static ThreadManager* newThreadManager(); + static shared_ptr newThreadManager(); /** Creates a simple thread manager the uses count number of worker threads */ - static ThreadManager* newSimpleThreadManager(size_t count=4); + static shared_ptr newSimpleThreadManager(size_t count=4); class Task; diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc index 7952122c..a223a77a 100644 --- a/lib/cpp/src/concurrency/TimerManager.cc +++ b/lib/cpp/src/concurrency/TimerManager.cc @@ -13,7 +13,7 @@ namespace facebook { namespace thrift { namespace concurrency { @author marc @version $Id:$ */ -typedef std::multimap::iterator task_iterator; +typedef std::multimap >::iterator task_iterator; typedef std::pair task_range; class TimerManager::Task : public Runnable { @@ -26,12 +26,14 @@ public: COMPLETE }; - Task(Runnable* runnable) : + Task(shared_ptr runnable) : _runnable(runnable), _state(WAITING) {} - ~Task() {}; + ~Task() { + std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug +}; void run() { if(_state == EXECUTING) { @@ -42,7 +44,7 @@ public: private: - Runnable* _runnable; + shared_ptr _runnable; class TimerManager::Dispatcher; @@ -58,7 +60,9 @@ public: _manager(manager) { } - ~Dispatcher() {} + ~Dispatcher() { + std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug + } /** Dispatcher entry point @@ -78,7 +82,7 @@ public: do { - std::set expiredTasks; + std::set > expiredTasks; {Synchronized s(_manager->_monitor); @@ -107,7 +111,7 @@ public: for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { - TimerManager::Task* task = ix->second; + shared_ptr task = ix->second; expiredTasks.insert(task); @@ -123,11 +127,9 @@ public: } } - for(std::set::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { + for(std::set >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { (*ix)->run(); - - delete *ix; } } while(_manager->_state == TimerManager::STARTED); @@ -156,7 +158,7 @@ public: TimerManager::TimerManager() : _taskCount(0), _state(TimerManager::UNINITIALIZED), - _dispatcher(new Dispatcher(this)) { + _dispatcher(shared_ptr(new Dispatcher(this))) { } @@ -164,6 +166,8 @@ TimerManager::~TimerManager() { /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since stop already takes care of reentrancy. */ + + std::cerr << "TimerManager::dtor[" << this << "]" << std::endl; if(_state != STOPPED) { @@ -172,6 +176,8 @@ TimerManager::~TimerManager() { stop(); } catch(...) { + std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl; + throw; // uhoh @@ -244,23 +250,23 @@ void TimerManager::stop() { for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) { - delete ix->second; - _taskMap.erase(ix); } - delete _dispatcher; + // Remove dispatcher's reference to us. + + _dispatcher->_manager = NULL; } } -const ThreadFactory* TimerManager::threadFactory() const { +shared_ptr TimerManager::threadFactory() const { Synchronized s(_monitor); return _threadFactory; } -void TimerManager::threadFactory(const ThreadFactory* value) { +void TimerManager::threadFactory(shared_ptr value) { Synchronized s(_monitor); @@ -272,7 +278,7 @@ size_t TimerManager::taskCount() const { return _taskCount; } -void TimerManager::add(Runnable* task, long long timeout) { +void TimerManager::add(shared_ptr task, long long timeout) { long long now = Util::currentTime(); @@ -286,7 +292,7 @@ void TimerManager::add(Runnable* task, long long timeout) { _taskCount++; - _taskMap.insert(std::pair(timeout, new Task(task))); + _taskMap.insert(std::pair >(timeout, shared_ptr(new Task(task)))); /* If the task map was empty, or if we have an expiration that is earlier than any previously seen, kick the dispatcher so it can update its timeout */ @@ -298,7 +304,7 @@ void TimerManager::add(Runnable* task, long long timeout) { } } -void TimerManager::add(Runnable* task, const struct timespec& value) { +void TimerManager::add(shared_ptr task, const struct timespec& value) { long long expiration; @@ -314,7 +320,7 @@ void TimerManager::add(Runnable* task, const struct timespec& value) { } -void TimerManager::remove(Runnable* task) { +void TimerManager::remove(shared_ptr task) { {Synchronized s(_monitor); if(_state != TimerManager::STARTED) { diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h index 2b92f232..c0e63408 100644 --- a/lib/cpp/src/concurrency/TimerManager.h +++ b/lib/cpp/src/concurrency/TimerManager.h @@ -5,12 +5,16 @@ #include "Monitor.h" #include "Thread.h" +#include + #include #include namespace facebook { namespace thrift { namespace concurrency { +using namespace boost; + /** Timer Manager This class dispatches timer tasks when they fall due. @@ -26,9 +30,9 @@ class TimerManager { virtual ~TimerManager(); - virtual const ThreadFactory* threadFactory() const; + virtual shared_ptr threadFactory() const; - virtual void threadFactory(const ThreadFactory* value); + virtual void threadFactory(shared_ptr value); /** Starts the timer manager service @@ -47,14 +51,14 @@ class TimerManager { @param task The task to execute @param timeout Time in milliseconds to delay before executing task */ - virtual void add(Runnable* task, long long timeout); + virtual void add(shared_ptr task, long long timeout); /** Adds a task to be executed at some time in the future by a worker thread. @param task The task to execute @param timeout Absolute time in the future to execute task. */ - virtual void add(Runnable* task, const struct timespec& timeout); + virtual void add(shared_ptr task, const struct timespec& timeout); /** Removes a pending task @@ -63,7 +67,7 @@ class TimerManager { @throws UncancellableTaskException Specified task is already being executed or has completed execution. */ - virtual void remove(Runnable* task); + virtual void remove(shared_ptr task); enum STATE { UNINITIALIZED, @@ -77,13 +81,13 @@ class TimerManager { private: - const ThreadFactory* _threadFactory; + shared_ptr _threadFactory; class Task; friend class Task; - std::multimap _taskMap; + std::multimap > _taskMap; size_t _taskCount; @@ -95,9 +99,9 @@ class TimerManager { friend class Dispatcher; - Dispatcher* _dispatcher; + shared_ptr _dispatcher; - Thread* _dispatcherThread; + shared_ptr _dispatcherThread; }; diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h index 6e8891db..fc24f74d 100644 --- a/lib/cpp/src/concurrency/Util.h +++ b/lib/cpp/src/concurrency/Util.h @@ -5,7 +5,7 @@ #include #include -#include +#include namespace facebook { namespace thrift { namespace concurrency { @@ -55,9 +55,9 @@ class Util { struct timespec now; - assert(clock_gettime(&now, NULL) == 0); + assert(clock_gettime(CLOCK_REALTIME, &now) == 0); - return = (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; + return (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ; #elif defined(HAVE_GETTIMEOFDAY) @@ -66,11 +66,11 @@ class Util { assert(gettimeofday(&now, NULL) == 0); return (((long long)now.tv_sec) * MS_PER_S) + (now.tv_usec / MS_PER_S) + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0); + #endif // defined(HAVE_GETTIMEDAY) } }; - }}} // facebook::thrift::concurrency #endif // !defined(_concurrency_Util_h_) diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h index d1ec0df0..c0191593 100644 --- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h +++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h @@ -1,7 +1,7 @@ -#include -#include -#include -#include +#include +#include +#include +#include #include #include @@ -18,10 +18,15 @@ using namespace facebook::thrift::concurrency; class ThreadFactoryTests { - class Task: public Runnable { +public: + + static const double ERROR; + + class Task: public Runnable { public: + Task() {} void run() { @@ -29,26 +34,20 @@ class ThreadFactoryTests { } }; -public: - /** Hello world test */ bool helloWorldTest() { PosixThreadFactory threadFactory = PosixThreadFactory(); - Task* task = new ThreadFactoryTests::Task(); + shared_ptr task = shared_ptr(new ThreadFactoryTests::Task()); - Thread* thread = threadFactory.newThread(task); + shared_ptr thread = threadFactory.newThread(task); thread->start(); thread->join(); - delete thread; - - delete task; - std::cout << "\t\t\tSuccess!" << std::endl; return true; @@ -92,13 +91,13 @@ public: PosixThreadFactory threadFactory = PosixThreadFactory(); - std::set threads; + std::set > threads; for(int ix = 0; ix < count; ix++) { - threads.insert(threadFactory.newThread(new ReapNTask(*monitor, *activeCount))); + threads.insert(threadFactory.newThread(shared_ptr(new ReapNTask(*monitor, *activeCount)))); } - for(std::set::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + for(std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { (*thread)->start(); } @@ -111,11 +110,9 @@ public: } } - for(std::set::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { + for(std::set >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) { - delete (*thread)->runnable(); - - delete *thread; + threads.erase(*thread); } std::cout << "\t\t\tSuccess!" << std::endl; @@ -177,11 +174,11 @@ public: SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED; - SynchStartTask* task = new SynchStartTask(monitor, state); + shared_ptr task = shared_ptr(new SynchStartTask(monitor, state)); PosixThreadFactory threadFactory = PosixThreadFactory(); - Thread* thread = threadFactory.newThread(task); + shared_ptr thread = threadFactory.newThread(task); if(state == SynchStartTask::UNINITIALIZED) { @@ -247,16 +244,15 @@ public: error *= 1.0; } - bool success = error < .10; + bool success = error < ThreadFactoryTests::ERROR; std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl; return success; } }; - -}}}} // facebook::thrift::concurrency +const double ThreadFactoryTests::ERROR = .20; -using namespace facebook::thrift::concurrency::test; +}}}} // facebook::thrift::concurrency::test diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h index 8b2dda8b..7e74aac8 100644 --- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h +++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h @@ -1,8 +1,8 @@ #include -#include -#include -#include -#include +#include +#include +#include +#include #include #include @@ -23,6 +23,8 @@ class ThreadManagerTests { public: + static const double ERROR; + class Task: public Runnable { public: @@ -78,9 +80,9 @@ public: size_t activeCount = count; - ThreadManager* threadManager = ThreadManager::newSimpleThreadManager(workerCount); + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); - PosixThreadFactory* threadFactory = new PosixThreadFactory(); + shared_ptr threadFactory = shared_ptr(new PosixThreadFactory()); threadFactory->priority(PosixThreadFactory::HIGHEST); @@ -88,16 +90,16 @@ public: threadManager->start(); - std::set tasks; + std::set > tasks; for(size_t ix = 0; ix < count; ix++) { - tasks.insert(new ThreadManagerTests::Task(monitor, activeCount, timeout)); + tasks.insert(shared_ptr(new ThreadManagerTests::Task(monitor, activeCount, timeout))); } long long time00 = Util::currentTime(); - for(std::set::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + for(std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { threadManager->add(*ix); } @@ -119,9 +121,9 @@ public: long long minTime = 9223372036854775807LL; long long maxTime = 0; - for(std::set::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { + for(std::set >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) { - ThreadManagerTests::Task* task = *ix; + shared_ptr task = *ix; long long delta = task->_endTime - task->_startTime; @@ -144,8 +146,6 @@ public: } averageTime+= delta; - - delete *ix; } averageTime /= count; @@ -160,18 +160,16 @@ public: error*= -1.0; } - bool success = error < .10; - - delete threadManager; - - delete threadFactory; + bool success = error < ERROR; std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl; return success; } }; - + +const double ThreadManagerTests::ERROR = .20; + }}}} // facebook::thrift::concurrency using namespace facebook::thrift::concurrency::test; diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h index 3c7fc0bc..fe56d312 100644 --- a/lib/cpp/src/concurrency/test/TimerManagerTests.h +++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h @@ -1,7 +1,7 @@ -#include -#include -#include -#include +#include +#include +#include +#include #include #include @@ -17,17 +17,23 @@ using namespace facebook::thrift::concurrency; class TimerManagerTests { + public: + + static const double ERROR; + class Task: public Runnable { public: - Task(Monitor& monitor, long long timeout) : - _timeout(timeout), - _startTime(Util::currentTime()), - _monitor(monitor), - _success(false), + Task(Monitor& monitor, long long timeout) : + _timeout(timeout), + _startTime(Util::currentTime()), + _monitor(monitor), + _success(false), _done(false) {} + ~Task() {std::cerr << this << std::endl;} + void run() { _endTime = Util::currentTime(); @@ -41,19 +47,20 @@ class TimerManagerTests { float error = delta / _timeout; - if(error < .10) { + if(error < ERROR) { _success = true; } - std::cout << "\t\t\tHello World" << std::endl; - _done = true; - + + std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug + {Synchronized s(_monitor); _monitor.notifyAll(); } } + long long _timeout; long long _startTime; @@ -63,27 +70,25 @@ class TimerManagerTests { bool _done; }; -public: - /** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its destructor is called. */ bool test00(long long timeout=1000LL) { - TimerManagerTests::Task* orphanTask = new TimerManagerTests::Task(_monitor, 10 * timeout); + shared_ptr orphanTask = shared_ptr(new TimerManagerTests::Task(_monitor, 10 * timeout)); { TimerManager timerManager; - timerManager.threadFactory(new PosixThreadFactory()); + timerManager.threadFactory(shared_ptr(new PosixThreadFactory())); timerManager.start(); assert(timerManager.state() == TimerManager::STARTED); - TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor, timeout); + shared_ptr task = shared_ptr(new TimerManagerTests::Task(_monitor, timeout)); {Synchronized s(_monitor); @@ -98,16 +103,12 @@ public: std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl; - - delete task; } // timerManager.stop(); This is where it happens via destructor assert(!orphanTask->_done); - delete orphanTask; - return true; } @@ -115,7 +116,8 @@ public: Monitor _monitor; }; - +const double TimerManagerTests::ERROR = .20; + }}}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cc b/lib/cpp/src/protocol/TBinaryProtocol.cc index 6ac028a4..ed482b88 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.cc +++ b/lib/cpp/src/protocol/TBinaryProtocol.cc @@ -1,6 +1,8 @@ #include "protocol/TBinaryProtocol.h" using std::string; +namespace facebook { namespace thrift { namespace protocol { + uint32_t TBinaryProtocol::writeStructBegin(TTransport* out, const string& name) const { return 0; @@ -249,3 +251,4 @@ uint32_t TBinaryProtocol::readString(TTransport* in, return result + (uint32_t)size; } +}}} // facebook::thrift::protocol diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h index ea0bd257..0f0560a1 100644 --- a/lib/cpp/src/protocol/TBinaryProtocol.h +++ b/lib/cpp/src/protocol/TBinaryProtocol.h @@ -3,6 +3,8 @@ #include "protocol/TProtocol.h" +namespace facebook { namespace thrift { namespace protocol { + /** * The default binary protocol for thrift. Writes all data in a very basic * binary format, essentially just spitting out the raw bytes. @@ -124,4 +126,7 @@ class TBinaryProtocol : public TProtocol { }; +}}} // facebook::thrift::protocol + #endif + diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h index 0007406d..40beaddd 100644 --- a/lib/cpp/src/protocol/TProtocol.h +++ b/lib/cpp/src/protocol/TProtocol.h @@ -8,6 +8,10 @@ #include "transport/TTransport.h" +namespace facebook { namespace thrift { namespace protocol { + +using namespace facebook::thrift::transport; + #define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32)))) #define htonll(x) ntohll(x) @@ -260,4 +264,7 @@ class TProtocol { TProtocol() {} }; +}}} // facebook::thrift::protocol + #endif + diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index 0d275ba3..f34944b2 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -3,6 +3,10 @@ #include "TProcessor.h" +namespace facebook { namespace thrift { namespace server { + +using namespace facebook::thrift; + class TServerOptions; /** @@ -33,4 +37,6 @@ class TServerOptions { // TODO(mcslee): Fill data members in here }; +}}} // facebook::thrift::server + #endif diff --git a/lib/cpp/src/server/TSimpleServer.cc b/lib/cpp/src/server/TSimpleServer.cc index 03069ae0..7199ab9c 100644 --- a/lib/cpp/src/server/TSimpleServer.cc +++ b/lib/cpp/src/server/TSimpleServer.cc @@ -5,6 +5,8 @@ #include using namespace std; +namespace facebook { namespace thrift { namespace server { + /** * A simple single-threaded application server. Perfect for unit tests! * @@ -52,3 +54,5 @@ void TSimpleServer::run() { // TODO(mcslee): Could this be a timeout case? Or always the real thing? } + +}}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h index 9e0f79f9..a4a2d98a 100644 --- a/lib/cpp/src/server/TSimpleServer.h +++ b/lib/cpp/src/server/TSimpleServer.h @@ -4,6 +4,8 @@ #include "server/TServer.h" #include "transport/TServerTransport.h" +namespace facebook { namespace thrift { namespace server { + /** * This is the most basic simple server. It is single-threaded and runs a * continuous loop of accepting a single connection, processing requests on @@ -27,4 +29,6 @@ class TSimpleServer : public TServer { TServerTransport* serverTransport_; }; +}}} // facebook::thrift::server + #endif diff --git a/lib/cpp/src/transport/TBufferedTransport.cc b/lib/cpp/src/transport/TBufferedTransport.cc index d7ce56a5..cab02dff 100644 --- a/lib/cpp/src/transport/TBufferedTransport.cc +++ b/lib/cpp/src/transport/TBufferedTransport.cc @@ -1,6 +1,8 @@ #include "TBufferedTransport.h" using std::string; +namespace facebook { namespace thrift { namespace transport { + uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) { uint32_t need = len; @@ -58,3 +60,5 @@ void TBufferedTransport::flush() { // Flush the underlying transport transport_->flush(); } + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h index 43c31f1b..b8153fef 100644 --- a/lib/cpp/src/transport/TBufferedTransport.h +++ b/lib/cpp/src/transport/TBufferedTransport.h @@ -4,6 +4,8 @@ #include "transport/TTransport.h" #include +namespace facebook { namespace thrift { namespace transport { + /** * Buffered transport. For reads it will read more data than is requested * and will serve future data out of a local buffer. For writes, data is @@ -76,4 +78,6 @@ class TBufferedTransport : public TTransport { uint32_t wLen_; }; +}}} // facebook::thrift::transport + #endif diff --git a/lib/cpp/src/transport/TChunkedTransport.cc b/lib/cpp/src/transport/TChunkedTransport.cc index f35d7472..bb42e380 100644 --- a/lib/cpp/src/transport/TChunkedTransport.cc +++ b/lib/cpp/src/transport/TChunkedTransport.cc @@ -1,6 +1,8 @@ #include "TChunkedTransport.h" using std::string; +namespace facebook { namespace thrift { namespace transport { + uint32_t TChunkedTransport::read(uint8_t* buf, uint32_t len) { uint32_t need = len; @@ -95,3 +97,5 @@ void TChunkedTransport::flush() { // Flush the underlying transport_->flush(); } + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h index 07bdbb5a..16f9e0e4 100644 --- a/lib/cpp/src/transport/TChunkedTransport.h +++ b/lib/cpp/src/transport/TChunkedTransport.h @@ -4,6 +4,8 @@ #include "transport/TTransport.h" #include +namespace facebook { namespace thrift { namespace transport { + /** * Chunked transport. All writes go into an in-memory buffer until flush is * called, at which point the transport writes the length of the entire @@ -73,4 +75,6 @@ class TChunkedTransport : public TTransport { void readChunk(); }; +}}} // facebook::thrift::transport + #endif diff --git a/lib/cpp/src/transport/TNullTransport.h b/lib/cpp/src/transport/TNullTransport.h index 9562d9fc..25221481 100644 --- a/lib/cpp/src/transport/TNullTransport.h +++ b/lib/cpp/src/transport/TNullTransport.h @@ -3,6 +3,8 @@ #include "transport/TTransport.h" +namespace facebook { namespace thrift { namespace transport { + /** * The null transport is a dummy transport that doesn't actually do anything. * It's sort of an analogy to /dev/null, you can never read anything from it @@ -21,4 +23,6 @@ class TNullTransport : public TTransport { void write(const std::string& s) {} }; +}}} // facebook::thrift::transport + #endif diff --git a/lib/cpp/src/transport/TServerSocket.cc b/lib/cpp/src/transport/TServerSocket.cc index 1cf4a320..21230d99 100644 --- a/lib/cpp/src/transport/TServerSocket.cc +++ b/lib/cpp/src/transport/TServerSocket.cc @@ -5,6 +5,8 @@ #include "transport/TSocket.h" #include "transport/TServerSocket.h" +namespace facebook { namespace thrift { namespace transport { + TServerSocket::TServerSocket(int port) : port_(port), serverSocket_(0), acceptBacklog_(1024) {} @@ -88,3 +90,5 @@ void TServerSocket::close() { } serverSocket_ = 0; } + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h index ca30a034..c18a8d23 100644 --- a/lib/cpp/src/transport/TServerSocket.h +++ b/lib/cpp/src/transport/TServerSocket.h @@ -3,6 +3,8 @@ #include "transport/TServerTransport.h" +namespace facebook { namespace thrift { namespace transport { + class TSocket; /** @@ -29,4 +31,6 @@ class TServerSocket : public TServerTransport { int acceptBacklog_; }; +}}} // facebook::thrift::transport + #endif diff --git a/lib/cpp/src/transport/TServerTransport.h b/lib/cpp/src/transport/TServerTransport.h index 9d71539d..9bf74d1a 100644 --- a/lib/cpp/src/transport/TServerTransport.h +++ b/lib/cpp/src/transport/TServerTransport.h @@ -4,6 +4,8 @@ #include "transport/TTransport.h" #include "transport/TTransportException.h" +namespace facebook { namespace thrift { namespace transport { + /** * Server transport framework. A server needs to have some facility for * creating base transports to read/write from. @@ -58,4 +60,6 @@ class TServerTransport { }; +}}} // facebook::thrift::transport + #endif diff --git a/lib/cpp/src/transport/TSocket.cc b/lib/cpp/src/transport/TSocket.cc index 2161697d..a1e0327a 100644 --- a/lib/cpp/src/transport/TSocket.cc +++ b/lib/cpp/src/transport/TSocket.cc @@ -10,6 +10,8 @@ #include "transport/TSocket.h" #include "transport/TTransportException.h" +namespace facebook { namespace thrift { namespace transport { + using namespace std; uint32_t g_socket_syscalls = 0; @@ -230,3 +232,4 @@ void TSocket::setNoDelay(bool noDelay) { perror("TSocket::setNoDelay()"); } } +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index 18abfa71..8a6fc8fc 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -6,6 +6,8 @@ #include "transport/TTransport.h" #include "transport/TServerSocket.h" +namespace facebook { namespace thrift { namespace transport { + /** * TCP Socket implementation of the TTransport interface. * @@ -97,4 +99,5 @@ class TSocket : public TTransport { int socket_; }; +}}} // facebook::thrift::transport #endif diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index fcaece77..e9366d3c 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -4,6 +4,8 @@ #include #include "transport/TTransportException.h" +namespace facebook { namespace thrift { namespace transport { + /** * Generic interface for a method of transporting data. A TTransport may be * capable of either reading or writing, but not necessarily both. @@ -94,4 +96,6 @@ class TTransport { TTransport() {} }; +}}} // facebook::thrift::transport + #endif diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h index 044e16d6..4b2be874 100644 --- a/lib/cpp/src/transport/TTransportException.h +++ b/lib/cpp/src/transport/TTransportException.h @@ -3,6 +3,8 @@ #include +namespace facebook { namespace thrift { namespace transport { + /** * Error codes for the various types of exceptions. */ @@ -60,4 +62,6 @@ class TTransportException { std::string message_; }; +}}} // facebook::thrift::transport + #endif -- 2.17.1