-lib_LTLIBRARIES = libconcurrency.la \
- libthrift.la
+#lib_LTLIBRARIES = libthrift.la
+
+lib_LIBRARIES = libthrift.a
+
+common_cxxflags = -Isrc $(BOOST_CPPFLAGS)
+common_ldflags = $(BOOST_LDFLAGS)
# Define the source file for the module
-libconcurrency_la_SOURCES = src/concurrency/Monitor.cc \
- src/concurrency/PosixThreadFactory.cc \
- src/concurrency/ThreadManager.cc \
- src/concurrency/TimerManager.cc
+libthrift_sources = src/concurrency/Monitor.cc \
+ src/concurrency/PosixThreadFactory.cc \
+ src/concurrency/ThreadManager.cc \
+ src/concurrency/TimerManager.cc \
+ src/protocol/TBinaryProtocol.cc \
+ src/transport/TBufferedTransport.cc \
+ src/transport/TChunkedTransport.cc \
+ src/transport/TSocket.cc \
+ src/transport/TServerSocket.cc \
+ src/server/TSimpleServer.cc \
+ src/server/TThreadPoolServer.cc
-libconcurrency_inst_headers = src/concurrency/Exception.h \
- src/concurrency/Monitor.h \
- src/concurrency/PosixThreadFactory.h \
- src/concurrency/Thread.h \
- src/concurrency/ThreadManager.h \
- src/concurrency/TimerManager.h
+libthrift_a_SOURCES = $(libthrift_sources)
+#libthrift_la_SOURCES = $(libthrift_sources)
-libthrift_la_SOURCES = src/protocol/TBinaryProtocol.cc \
- src/transport/TBufferedTransport.cc \
- src/transport/TChunkedTransport.cc \
- src/transport/TSocket.cc \
- src/transport/TServerSocket.cc \
- src/server/TSimpleServer.cc
+libthrift_cxxflags = $(common_cxxflags)
+libthrift_ldflags = $(common_ldflags)
-libthrift_la_CXXFLAGS = -Isrc
+libthrift_la_CXXFLAGS = $(libthrift_cxxflags)
+libthrift_a_CXXFLAGS = $(libthrift_cxxflags)
-libthrift_inst_headers = src/protocol/TBinaryProtocol.h \
+libthrift_inst_headers = src/concurrency/Exception.h \
+ src/concurrency/Monitor.h \
+ src/concurrency/PosixThreadFactory.h \
+ src/concurrency/Thread.h \
+ src/concurrency/ThreadManager.h \
+ src/concurrency/TimerManager.h \
+ src/protocol/TBinaryProtocol.h \
src/protocol/TProtocol.h \
src/transport/TBufferedTransport.h \
src/transport/TChunkedTransport.h \
src/transport/TSocket.h \
src/transport/TTransport.h \
src/transport/TTransport.h \
- src/transport/TTransportException.h
+ src/transport/TTransportException.h \
+ src/server/TSimpleServer.h \
+ src/server/TThreadPoolServer.h
bin_PROGRAMS = concurrency_test
src/concurrency/test/ThreadManagerTests.h \
src/concurrency/test/TimerManagerTests.h
-concurrency_test_LDADD = libconcurrency.la
+concurrency_test_LDADD = libthrift.a
-concurrency_test_CXXFLAGS = -Isrc/concurrency
+concurrency_test_CXXFLAGS = $(common_cxxflags)
+concurrency_test_LDFLAGS = $(common_ldflags)
--- /dev/null
+dnl @synopsis AX_BOOST([MINIMUM-VERSION])
+dnl
+dnl Test for the Boost C++ libraries of a particular version (or newer)
+dnl
+dnl If no path to the installed boost library is given the macro
+dnl searchs under /usr, /usr/local, and /opt, and evaluates the
+dnl $BOOST_ROOT environment variable. Further documentation is
+dnl available at <http://randspringer.de/boost/index.html>.
+dnl
+dnl This macro calls:
+dnl
+dnl AC_SUBST(BOOST_CPPFLAGS) / AC_SUBST(BOOST_LDFLAGS)
+dnl
+dnl And sets:
+dnl
+dnl HAVE_BOOST
+dnl
+dnl @category InstalledPackages
+dnl @category Cxx
+dnl @author Thomas Porschberg <thomas@randspringer.de>
+dnl @version 2006-06-15
+dnl @license AllPermissive
+
+AC_DEFUN([AX_BOOST_BASE],
+[
+AC_ARG_WITH([boost],
+ AS_HELP_STRING([--with-boost@<:@=DIR@:>@], [use boost (default is No) - it is possible to specify the root directory for boost (optional)]),
+ [
+ if test "$withval" = "no"; then
+ want_boost="no"
+ elif test "$withval" = "yes"; then
+ want_boost="yes"
+ ac_boost_path=""
+ else
+ want_boost="yes"
+ ac_boost_path="$withval"
+ fi
+ ],
+ [want_boost="no"])
+
+if test "x$want_boost" = "xyes"; then
+ boost_lib_version_req=ifelse([$1], ,1.20.0,$1)
+ boost_lib_version_req_shorten=`expr $boost_lib_version_req : '\([[0-9]]*\.[[0-9]]*\)'`
+ boost_lib_version_req_major=`expr $boost_lib_version_req : '\([[0-9]]*\)'`
+ boost_lib_version_req_minor=`expr $boost_lib_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
+ boost_lib_version_req_sub_minor=`expr $boost_lib_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
+ if test "x$boost_lib_version_req_sub_minor" = "x" ; then
+ boost_lib_version_req_sub_minor="0"
+ fi
+ WANT_BOOST_VERSION=`expr $boost_lib_version_req_major \* 100000 \+ $boost_lib_version_req_minor \* 100 \+ $boost_lib_version_req_sub_minor`
+ AC_MSG_CHECKING(for boostlib >= $boost_lib_version_req)
+ succeeded=no
+
+ dnl first we check the system location for boost libraries
+ dnl this location ist chosen if boost libraries are installed with the --layout=system option
+ dnl or if you install boost with RPM
+ if test "$ac_boost_path" != ""; then
+ BOOST_LDFLAGS="-L$ac_boost_path/lib"
+ BOOST_CPPFLAGS="-I$ac_boost_path/include"
+ else
+ for ac_boost_path_tmp in /usr /usr/local /opt ; do
+ if test -d "$ac_boost_path_tmp/include/boost" && test -r "$ac_boost_path_tmp/include/boost"; then
+ BOOST_LDFLAGS="-L$ac_boost_path_tmp/lib"
+ BOOST_CPPFLAGS="-I$ac_boost_path_tmp/include"
+ break;
+ fi
+ done
+ fi
+
+ CPPFLAGS_SAVED="$CPPFLAGS"
+ CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+ export CPPFLAGS
+
+ LDFLAGS_SAVED="$LDFLAGS"
+ LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+ export LDFLAGS
+
+ AC_LANG_PUSH(C++)
+ AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
+ @%:@include <boost/version.hpp>
+ ]], [[
+ #if BOOST_VERSION >= $WANT_BOOST_VERSION
+ // Everything is okay
+ #else
+ # error Boost version is too old
+ #endif
+ ]])],[
+ AC_MSG_RESULT(yes)
+ succeeded=yes
+ found_system=yes
+ ],[
+ ])
+ AC_LANG_POP([C++])
+
+
+
+ dnl if we found no boost with system layout we search for boost libraries
+ dnl built and installed without the --layout=system option or for a staged(not installed) version
+ if test "x$succeeded" != "xyes"; then
+ _version=0
+ if test "$ac_boost_path" != ""; then
+ BOOST_LDFLAGS="-L$ac_boost_path/lib"
+ if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then
+ for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do
+ _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'`
+ V_CHECK=`expr $_version_tmp \> $_version`
+ if test "$V_CHECK" = "1" ; then
+ _version=$_version_tmp
+ fi
+ VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'`
+ BOOST_CPPFLAGS="-I$ac_boost_path/include/boost-$VERSION_UNDERSCORE"
+ done
+ fi
+ else
+ for ac_boost_path in /usr /usr/local /opt ; do
+ if test -d "$ac_boost_path" && test -r "$ac_boost_path"; then
+ for i in `ls -d $ac_boost_path/include/boost-* 2>/dev/null`; do
+ _version_tmp=`echo $i | sed "s#$ac_boost_path##" | sed 's/\/include\/boost-//' | sed 's/_/./'`
+ V_CHECK=`expr $_version_tmp \> $_version`
+ if test "$V_CHECK" = "1" ; then
+ _version=$_version_tmp
+ best_path=$ac_boost_path
+ fi
+ done
+ fi
+ done
+
+ VERSION_UNDERSCORE=`echo $_version | sed 's/\./_/'`
+ BOOST_CPPFLAGS="-I$best_path/include/boost-$VERSION_UNDERSCORE"
+ BOOST_LDFLAGS="-L$best_path/lib"
+
+ if test "x$BOOST_ROOT" != "x"; then
+ if test -d "$BOOST_ROOT" && test -r "$BOOST_ROOT" && test -d "$BOOST_ROOT/stage/lib" && test -r "$BOOST_ROOT/stage/lib"; then
+ version_dir=`expr //$BOOST_ROOT : '.*/\(.*\)'`
+ stage_version=`echo $version_dir | sed 's/boost_//' | sed 's/_/./g'`
+ stage_version_shorten=`expr $stage_version : '\([[0-9]]*\.[[0-9]]*\)'`
+ V_CHECK=`expr $stage_version_shorten \>\= $_version`
+ if test "$V_CHECK" = "1" ; then
+ AC_MSG_NOTICE(We will use a staged boost library from $BOOST_ROOT)
+ BOOST_CPPFLAGS="-I$BOOST_ROOT"
+ BOOST_LDFLAGS="-L$BOOST_ROOT/stage/lib"
+ fi
+ fi
+ fi
+ fi
+
+ CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+ export CPPFLAGS
+ LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+ export LDFLAGS
+
+ AC_LANG_PUSH(C++)
+ AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
+ @%:@include <boost/version.hpp>
+ ]], [[
+ #if BOOST_VERSION >= $WANT_BOOST_VERSION
+ // Everything is okay
+ #else
+ # error Boost version is too old
+ #endif
+ ]])],[
+ AC_MSG_RESULT(yes)
+ succeeded=yes
+ found_system=yes
+ ],[
+ ])
+ AC_LANG_POP([C++])
+ fi
+
+ if test "$succeeded" != "yes" ; then
+ if test "$_version" = "0" ; then
+ AC_MSG_ERROR([[We could not detect the boost libraries (version $boost_lib_version_req_shorten or higher). If you have a staged boost library (still not installed) please specify \$BOOST_ROOT in your environment and do not give a PATH to --with-boost option. If you are sure you have boost installed, then check your version number looking in <boost/version.hpp>. See http://randspringer.de/boost for more documentation.]])
+ else
+ AC_MSG_NOTICE([Your boost libraries seems to old (version $_version).])
+ fi
+ else
+ AC_SUBST(BOOST_CPPFLAGS)
+ AC_SUBST(BOOST_LDFLAGS)
+ AC_DEFINE(HAVE_BOOST,,[define if the Boost library is available])
+ fi
+
+ CPPFLAGS="$CPPFLAGS_SAVED"
+ LDFLAGS="$LDFLAGS_SAVED"
+fi
+
+])
autoscan
autoheader
-aclocal
+aclocal -I ./aclocal
libtoolize --automake
touch NEWS README AUTHORS ChangeLog
autoconf
AC_CHECK_HEADERS([unistd.h])
+AX_BOOST_BASE([1.33.1])
+
AC_CHECK_LIB(pthread, pthread_create)
AC_CHECK_LIB(rt, sched_get_priority_min)
#include <string>
#include "transport/TTransport.h"
+namespace facebook { namespace thrift {
+
+using namespace facebook::thrift::transport;
+
/**
* A processor is a generic object that acts upon two streams of data, one
* an input and the other an output. The definition of this object is loose,
TProcessor() {}
};
+}} // facebook::thrift
+
#endif
public:
Impl() :
- mutexInitialized(false) {
+ mutexInitialized(false),
+ condInitialized(false) {
- /* XXX
- Need to fix this to handle failures without leaking. */
+ try {
- assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
+ assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
- mutexInitialized = true;
+ mutexInitialized = true;
- assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
- }
-
- ~Impl() {
-
- if(mutexInitialized) {
+ assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
- mutexInitialized = false;
+ condInitialized = true;
- assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
- }
-
- if(condInitialized) {
-
- condInitialized = false;
-
- assert(pthread_cond_destroy(&_pthread_cond) == 0);
+ } catch(...) {
+ cleanup();
}
}
+ ~Impl() {cleanup();}
+
void lock() const {pthread_mutex_lock(&_pthread_mutex);}
void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
private:
+ void cleanup() {
+
+ if(mutexInitialized) {
+
+ mutexInitialized = false;
+
+ assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+ }
+
+ if(condInitialized) {
+
+ condInitialized = false;
+
+ assert(pthread_cond_destroy(&_pthread_cond) == 0);
+ }
+ }
+
mutable pthread_mutex_t _pthread_mutex;
mutable bool mutexInitialized;
#include <assert.h>
#include <pthread.h>
+#include <iostream>
+
+#include <boost/weak_ptr.hpp>
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** The POSIX thread class.
@author marc
int _stackSize;
+ weak_ptr<PthreadThread> _self;
+
public:
- PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
+ PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
_pthread(0),
_state(uninitialized),
_policy(policy),
_priority(priority),
- _stackSize(stackSize) {
+ _stackSize(stackSize) {
this->Thread::runnable(runnable);
}
+ ~PthreadThread() {
+ }
+
void start() {
if(_state != uninitialized) {
// Set thread priority
- // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+ assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+
+ shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
+
+ *selfRef = _self.lock();
- assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0);
+ assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
}
void join() {
}
}
- Runnable* runnable() const {return Thread::runnable();}
+ shared_ptr<Runnable> runnable() const {return Thread::runnable();}
- void runnable(Runnable* value) {Thread::runnable(value);}
+ void runnable(shared_ptr<Runnable> value) {Thread::runnable(value);}
+ void weakRef(shared_ptr<PthreadThread> self) {
+ assert(self.get() == this);
+ _self = weak_ptr<PthreadThread>(self);
+ }
};
void* PthreadThread::threadMain(void* arg) {
// XXX need a lock here when testing thread state
- PthreadThread* thread = (PthreadThread*)arg;
-
+ shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
+
+ delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
+
+ if(thread == NULL) {
+ return (void*)0;
+ }
+
if(thread->_state != starting) {
return (void*)0;
}
@param runnable A runnable object */
- Thread* newThread(Runnable* runnable) const {
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
}
int stackSize() const { return _stackSize;}
/** Sets priority.
XXX
- Need to handle incremental priorities properl. */
+ Need to handle incremental priorities properly. */
void priority(PRIORITY value) { _priority = value;}
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
-Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {return _impl->newThread(runnable);}
int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
#include "Thread.h"
+#include <boost/shared_ptr.hpp>
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** A thread factory to create posix threads
@author marc
// From ThreadFactory;
- Thread* newThread(Runnable* runnable) const;
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const;
/** Sets stack size for created threads
class Impl;
- Impl* _impl;
+ shared_ptr<Impl> _impl;
};
}}} // facebook::thrift::concurrency
#if !defined(_concurrency_Thread_h_)
#define _concurrency_Thread_h_ 1
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
class Thread;
/** Minimal runnable class. More or less analogous to java.lang.Runnable.
virtual void run() = 0;
- virtual Thread* thread() {return _thread;}
+ /** Gets the thread object that is hosting this runnable object - can return an empty shared pointer if no references remain on thet thread object */
- private:
+ virtual shared_ptr<Thread> thread() {return _thread.lock();}
/** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */
- friend class Thread;
+ virtual void thread(shared_ptr<Thread> value) {_thread = value;}
- virtual void thread(Thread* value) {_thread = value;}
+ private:
- Thread* _thread;
+ weak_ptr<Thread> _thread;
};
/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread
/** Gets the runnable object this thread is hosting */
- virtual Runnable* runnable() const {return _runnable;}
+ virtual shared_ptr<Runnable> runnable() const {return _runnable;}
protected:
- virtual void runnable(Runnable* value, bool x=false) {_runnable = value; _runnable->thread(this);}
+ virtual void runnable(shared_ptr<Runnable> value) {_runnable = value;}
private:
-
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
};
/** Factory to create platform-specific thread object and bind them to Runnable object for execution */
virtual ~ThreadFactory() {}
- virtual Thread* newThread(Runnable* runnable) const = 0;
+ virtual shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const = 0;
};
}}} // facebook::thrift::concurrency
#include "Exception.h"
#include "Monitor.h"
+#include <boost/shared_ptr.hpp>
+
#include <assert.h>
#include <queue>
#include <set>
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** ThreadManager class
public:
- Impl() : _state(ThreadManager::UNINITIALIZED) {}
+ Impl() :
+ _workerCount(0),
+ _workerMaxCount(0),
+ _idleCount(0),
+ _state(ThreadManager::UNINITIALIZED)
+ {}
- ~Impl() {stop();}
+ ~Impl() {
+ stop();
+ }
void start();
return _state;
};
- const ThreadFactory* threadFactory() const {
+ shared_ptr<ThreadFactory> threadFactory() const {
Synchronized s(_monitor);
return _threadFactory;
}
- void threadFactory(const ThreadFactory* value) {
+ void threadFactory(shared_ptr<ThreadFactory> value) {
Synchronized s(_monitor);
return _tasks.size() + _workerCount - _idleCount;
}
- void add(Runnable* value);
+ void add(shared_ptr<Runnable> value);
- void remove(Runnable* task);
+ void remove(shared_ptr<Runnable> task);
private:
ThreadManager::STATE _state;
- const ThreadFactory* _threadFactory;
+ shared_ptr<ThreadFactory> _threadFactory;
friend class ThreadManager::Task;
- std::queue<Task*> _tasks;
+ std::queue<shared_ptr<Task> > _tasks;
Monitor _monitor;
friend class ThreadManager::Worker;
- std::set<Thread*> _workers;
+ std::set<shared_ptr<Thread> > _workers;
- std::set<Thread*> _deadWorkers;
+ std::set<shared_ptr<Thread> > _deadWorkers;
};
class ThreadManager::Task : public Runnable {
COMPLETE
};
- Task(Runnable* runnable) :
+ Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
_state(WAITING)
{}
private:
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
friend class ThreadManager::Worker;
while(active) {
- ThreadManager::Task* task = NULL;
+ shared_ptr<ThreadManager::Task> task;
/* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
// XXX need to log this
}
-
- delete task;
-
- task = NULL;
}
}
}
void ThreadManager::Impl::addWorker(size_t value) {
- std::set<Thread*> newThreads;
+ std::set<shared_ptr<Thread> > newThreads;
for(size_t ix = 0; ix < value; ix++) {
class ThreadManager::Worker;
- ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+ shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(_threadFactory->newThread(worker));
}
_workers.insert(newThreads.begin(), newThreads.end());
}
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+ for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+ shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->_state = ThreadManager::Worker::STARTING;
_state = ThreadManager::STOPPING;
}
- // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+ // XXX
+ // should be able to block here for transition to STOPPED since we're now using shared_ptrs
}
void ThreadManager::Impl::removeWorker(size_t value) {
- std::set<Thread*> removedThreads;
+ std::set<shared_ptr<Thread> > removedThreads;
{Synchronized s(_monitor);
_workerMonitor.wait();
}
- for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+ for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
_workers.erase(*ix);
- delete (*ix)->runnable();
-
- delete (*ix);
}
_deadWorkers.clear();
}
}
-void ThreadManager::Impl::add(Runnable* value) {
+void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
Synchronized s(_monitor);
throw IllegalStateException();
}
- _tasks.push(new ThreadManager::Task(value));
+ _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
/* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
task in time. */
}
}
-void ThreadManager::Impl::remove(Runnable* task) {
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Synchronized s(_monitor);
};
-ThreadManager* ThreadManager::newThreadManager() {
- return new ThreadManager::Impl();
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+ return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
-ThreadManager* ThreadManager::newSimpleThreadManager(size_t count) {
- return new SimpleThreadManager(count);
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
}
}}} // facebook::thrift::concurrency
#if !defined(_concurrency_ThreadManager_h_)
#define _concurrency_ThreadManager_h_ 1
+#include <boost/shared_ptr.hpp>
+
#include <sys/types.h>
#include "Thread.h"
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** Thread Pool Manager and related classes
@author marc
virtual const STATE state() const = 0;
- virtual const ThreadFactory* threadFactory() const = 0;
+ virtual shared_ptr<ThreadFactory> threadFactory() const = 0;
- virtual void threadFactory(const ThreadFactory* value) = 0;
+ virtual void threadFactory(shared_ptr<ThreadFactory> value) = 0;
virtual void addWorker(size_t value=1) = 0;
virtual size_t totalTaskCount() const = 0;
- /** Adds a task to be execued at some time in the future by a worker thread. */
+ /** Adds a task to be execued at some time in the future by a worker thread.
+
+ @param value The task to run */
+
- virtual void add(Runnable* value) = 0;
+ virtual void add(shared_ptr<Runnable>value) = 0;
/** Removes a pending task */
- virtual void remove(Runnable* task) = 0;
+ virtual void remove(shared_ptr<Runnable> task) = 0;
- static ThreadManager* newThreadManager();
+ static shared_ptr<ThreadManager> newThreadManager();
/** Creates a simple thread manager the uses count number of worker threads */
- static ThreadManager* newSimpleThreadManager(size_t count=4);
+ static shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4);
class Task;
@author marc
@version $Id:$ */
-typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
+typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
class TimerManager::Task : public Runnable {
COMPLETE
};
- Task(Runnable* runnable) :
+ Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
_state(WAITING)
{}
- ~Task() {};
+ ~Task() {
+ std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug
+};
void run() {
if(_state == EXECUTING) {
private:
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
class TimerManager::Dispatcher;
_manager(manager) {
}
- ~Dispatcher() {}
+ ~Dispatcher() {
+ std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug
+ }
/** Dispatcher entry point
do {
- std::set<TimerManager::Task*> expiredTasks;
+ std::set<shared_ptr<TimerManager::Task> > expiredTasks;
{Synchronized s(_manager->_monitor);
for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
- TimerManager::Task* task = ix->second;
+ shared_ptr<TimerManager::Task> task = ix->second;
expiredTasks.insert(task);
}
}
- for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+ for(std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
(*ix)->run();
-
- delete *ix;
}
} while(_manager->_state == TimerManager::STARTED);
TimerManager::TimerManager() :
_taskCount(0),
_state(TimerManager::UNINITIALIZED),
- _dispatcher(new Dispatcher(this)) {
+ _dispatcher(shared_ptr<Dispatcher>(new Dispatcher(this))) {
}
/* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
stop already takes care of reentrancy. */
+
+ std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
if(_state != STOPPED) {
stop();
} catch(...) {
+ std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
+ throw;
// uhoh
for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
- delete ix->second;
-
_taskMap.erase(ix);
}
- delete _dispatcher;
+ // Remove dispatcher's reference to us.
+
+ _dispatcher->_manager = NULL;
}
}
-const ThreadFactory* TimerManager::threadFactory() const {
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Synchronized s(_monitor);
return _threadFactory;
}
-void TimerManager::threadFactory(const ThreadFactory* value) {
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Synchronized s(_monitor);
return _taskCount;
}
-void TimerManager::add(Runnable* task, long long timeout) {
+void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
long long now = Util::currentTime();
_taskCount++;
- _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
+ _taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
/* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
kick the dispatcher so it can update its timeout */
}
}
-void TimerManager::add(Runnable* task, const struct timespec& value) {
+void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
long long expiration;
}
-void TimerManager::remove(Runnable* task) {
+void TimerManager::remove(shared_ptr<Runnable> task) {
{Synchronized s(_monitor);
if(_state != TimerManager::STARTED) {
#include "Monitor.h"
#include "Thread.h"
+#include <boost/shared_ptr.hpp>
+
#include <map>
#include <time.h>
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** Timer Manager
This class dispatches timer tasks when they fall due.
virtual ~TimerManager();
- virtual const ThreadFactory* threadFactory() const;
+ virtual shared_ptr<const ThreadFactory> threadFactory() const;
- virtual void threadFactory(const ThreadFactory* value);
+ virtual void threadFactory(shared_ptr<const ThreadFactory> value);
/** Starts the timer manager service
@param task The task to execute
@param timeout Time in milliseconds to delay before executing task */
- virtual void add(Runnable* task, long long timeout);
+ virtual void add(shared_ptr<Runnable> task, long long timeout);
/** Adds a task to be executed at some time in the future by a worker thread.
@param task The task to execute
@param timeout Absolute time in the future to execute task. */
- virtual void add(Runnable* task, const struct timespec& timeout);
+ virtual void add(shared_ptr<Runnable> task, const struct timespec& timeout);
/** Removes a pending task
@throws UncancellableTaskException Specified task is already being executed or has completed execution. */
- virtual void remove(Runnable* task);
+ virtual void remove(shared_ptr<Runnable> task);
enum STATE {
UNINITIALIZED,
private:
- const ThreadFactory* _threadFactory;
+ shared_ptr<const ThreadFactory> _threadFactory;
class Task;
friend class Task;
- std::multimap<long long, Task*> _taskMap;
+ std::multimap<long long, shared_ptr<Task> > _taskMap;
size_t _taskCount;
friend class Dispatcher;
- Dispatcher* _dispatcher;
+ shared_ptr<Dispatcher> _dispatcher;
- Thread* _dispatcherThread;
+ shared_ptr<Thread> _dispatcherThread;
};
#include <assert.h>
#include <stddef.h>
-#include <sys/time.h>
+#include <time.h>
namespace facebook { namespace thrift { namespace concurrency {
struct timespec now;
- assert(clock_gettime(&now, NULL) == 0);
+ assert(clock_gettime(CLOCK_REALTIME, &now) == 0);
- return = (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
+ return (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
#elif defined(HAVE_GETTIMEOFDAY)
assert(gettimeofday(&now, NULL) == 0);
return (((long long)now.tv_sec) * MS_PER_S) + (now.tv_usec / MS_PER_S) + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0);
+
#endif // defined(HAVE_GETTIMEDAY)
}
};
-
}}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_Util_h_)
-#include <Thread.h>
-#include <PosixThreadFactory.h>
-#include <Monitor.h>
-#include <Util.h>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
#include <assert.h>
#include <iostream>
class ThreadFactoryTests {
- class Task: public Runnable {
+public:
+
+ static const double ERROR;
+
+ class Task: public Runnable {
public:
+
Task() {}
void run() {
}
};
-public:
-
/** Hello world test */
bool helloWorldTest() {
PosixThreadFactory threadFactory = PosixThreadFactory();
- Task* task = new ThreadFactoryTests::Task();
+ shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
- Thread* thread = threadFactory.newThread(task);
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
thread->start();
thread->join();
- delete thread;
-
- delete task;
-
std::cout << "\t\t\tSuccess!" << std::endl;
return true;
PosixThreadFactory threadFactory = PosixThreadFactory();
- std::set<Thread*> threads;
+ std::set<shared_ptr<Thread> > threads;
for(int ix = 0; ix < count; ix++) {
- threads.insert(threadFactory.newThread(new ReapNTask(*monitor, *activeCount)));
+ threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
}
- for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
(*thread)->start();
}
}
}
- for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
- delete (*thread)->runnable();
-
- delete *thread;
+ threads.erase(*thread);
}
std::cout << "\t\t\tSuccess!" << std::endl;
SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
- SynchStartTask* task = new SynchStartTask(monitor, state);
+ shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
PosixThreadFactory threadFactory = PosixThreadFactory();
- Thread* thread = threadFactory.newThread(task);
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
if(state == SynchStartTask::UNINITIALIZED) {
error *= 1.0;
}
- bool success = error < .10;
+ bool success = error < ThreadFactoryTests::ERROR;
std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
return success;
}
};
-
-}}}} // facebook::thrift::concurrency
+const double ThreadFactoryTests::ERROR = .20;
-using namespace facebook::thrift::concurrency::test;
+}}}} // facebook::thrift::concurrency::test
#include <config.h>
-#include <ThreadManager.h>
-#include <PosixThreadFactory.h>
-#include <Monitor.h>
-#include <Util.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
#include <assert.h>
#include <set>
public:
+ static const double ERROR;
+
class Task: public Runnable {
public:
size_t activeCount = count;
- ThreadManager* threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
- PosixThreadFactory* threadFactory = new PosixThreadFactory();
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadFactory->priority(PosixThreadFactory::HIGHEST);
threadManager->start();
- std::set<ThreadManagerTests::Task*> tasks;
+ std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
for(size_t ix = 0; ix < count; ix++) {
- tasks.insert(new ThreadManagerTests::Task(monitor, activeCount, timeout));
+ tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
}
long long time00 = Util::currentTime();
- for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
threadManager->add(*ix);
}
long long minTime = 9223372036854775807LL;
long long maxTime = 0;
- for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
- ThreadManagerTests::Task* task = *ix;
+ shared_ptr<ThreadManagerTests::Task> task = *ix;
long long delta = task->_endTime - task->_startTime;
}
averageTime+= delta;
-
- delete *ix;
}
averageTime /= count;
error*= -1.0;
}
- bool success = error < .10;
-
- delete threadManager;
-
- delete threadFactory;
+ bool success = error < ERROR;
std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
return success;
}
};
-
+
+const double ThreadManagerTests::ERROR = .20;
+
}}}} // facebook::thrift::concurrency
using namespace facebook::thrift::concurrency::test;
-#include <TimerManager.h>
-#include <PosixThreadFactory.h>
-#include <Monitor.h>
-#include <Util.h>
+#include <concurrency/TimerManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
#include <assert.h>
#include <iostream>
class TimerManagerTests {
+ public:
+
+ static const double ERROR;
+
class Task: public Runnable {
public:
- Task(Monitor& monitor, long long timeout) :
- _timeout(timeout),
- _startTime(Util::currentTime()),
- _monitor(monitor),
- _success(false),
+ Task(Monitor& monitor, long long timeout) :
+ _timeout(timeout),
+ _startTime(Util::currentTime()),
+ _monitor(monitor),
+ _success(false),
_done(false) {}
+ ~Task() {std::cerr << this << std::endl;}
+
void run() {
_endTime = Util::currentTime();
float error = delta / _timeout;
- if(error < .10) {
+ if(error < ERROR) {
_success = true;
}
- std::cout << "\t\t\tHello World" << std::endl;
-
_done = true;
-
+
+ std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug
+
{Synchronized s(_monitor);
_monitor.notifyAll();
}
}
+
long long _timeout;
long long _startTime;
bool _done;
};
-public:
-
/** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that
the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its
destructor is called. */
bool test00(long long timeout=1000LL) {
- TimerManagerTests::Task* orphanTask = new TimerManagerTests::Task(_monitor, 10 * timeout);
+ shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
{
TimerManager timerManager;
- timerManager.threadFactory(new PosixThreadFactory());
+ timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
timerManager.start();
assert(timerManager.state() == TimerManager::STARTED);
- TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor, timeout);
+ shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
{Synchronized s(_monitor);
std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
-
- delete task;
}
// timerManager.stop(); This is where it happens via destructor
assert(!orphanTask->_done);
- delete orphanTask;
-
return true;
}
Monitor _monitor;
};
-
+const double TimerManagerTests::ERROR = .20;
+
}}}} // facebook::thrift::concurrency
#include "protocol/TBinaryProtocol.h"
using std::string;
+namespace facebook { namespace thrift { namespace protocol {
+
uint32_t TBinaryProtocol::writeStructBegin(TTransport* out,
const string& name) const {
return 0;
return result + (uint32_t)size;
}
+}}} // facebook::thrift::protocol
#include "protocol/TProtocol.h"
+namespace facebook { namespace thrift { namespace protocol {
+
/**
* The default binary protocol for thrift. Writes all data in a very basic
* binary format, essentially just spitting out the raw bytes.
};
+}}} // facebook::thrift::protocol
+
#endif
+
#include "transport/TTransport.h"
+namespace facebook { namespace thrift { namespace protocol {
+
+using namespace facebook::thrift::transport;
+
#define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32))))
#define htonll(x) ntohll(x)
TProtocol() {}
};
+}}} // facebook::thrift::protocol
+
#endif
+
#include "TProcessor.h"
+namespace facebook { namespace thrift { namespace server {
+
+using namespace facebook::thrift;
+
class TServerOptions;
/**
// TODO(mcslee): Fill data members in here
};
+}}} // facebook::thrift::server
+
#endif
#include <iostream>
using namespace std;
+namespace facebook { namespace thrift { namespace server {
+
/**
* A simple single-threaded application server. Perfect for unit tests!
*
// TODO(mcslee): Could this be a timeout case? Or always the real thing?
}
+
+}}} // facebook::thrift::server
#include "server/TServer.h"
#include "transport/TServerTransport.h"
+namespace facebook { namespace thrift { namespace server {
+
/**
* This is the most basic simple server. It is single-threaded and runs a
* continuous loop of accepting a single connection, processing requests on
TServerTransport* serverTransport_;
};
+}}} // facebook::thrift::server
+
#endif
#include "TBufferedTransport.h"
using std::string;
+namespace facebook { namespace thrift { namespace transport {
+
uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
uint32_t need = len;
// Flush the underlying transport
transport_->flush();
}
+
+}}} // facebook::thrift::transport
#include "transport/TTransport.h"
#include <string>
+namespace facebook { namespace thrift { namespace transport {
+
/**
* Buffered transport. For reads it will read more data than is requested
* and will serve future data out of a local buffer. For writes, data is
uint32_t wLen_;
};
+}}} // facebook::thrift::transport
+
#endif
#include "TChunkedTransport.h"
using std::string;
+namespace facebook { namespace thrift { namespace transport {
+
uint32_t TChunkedTransport::read(uint8_t* buf, uint32_t len) {
uint32_t need = len;
// Flush the underlying
transport_->flush();
}
+
+}}} // facebook::thrift::transport
#include "transport/TTransport.h"
#include <string>
+namespace facebook { namespace thrift { namespace transport {
+
/**
* Chunked transport. All writes go into an in-memory buffer until flush is
* called, at which point the transport writes the length of the entire
void readChunk();
};
+}}} // facebook::thrift::transport
+
#endif
#include "transport/TTransport.h"
+namespace facebook { namespace thrift { namespace transport {
+
/**
* The null transport is a dummy transport that doesn't actually do anything.
* It's sort of an analogy to /dev/null, you can never read anything from it
void write(const std::string& s) {}
};
+}}} // facebook::thrift::transport
+
#endif
#include "transport/TSocket.h"
#include "transport/TServerSocket.h"
+namespace facebook { namespace thrift { namespace transport {
+
TServerSocket::TServerSocket(int port) :
port_(port), serverSocket_(0), acceptBacklog_(1024) {}
}
serverSocket_ = 0;
}
+
+}}} // facebook::thrift::transport
#include "transport/TServerTransport.h"
+namespace facebook { namespace thrift { namespace transport {
+
class TSocket;
/**
int acceptBacklog_;
};
+}}} // facebook::thrift::transport
+
#endif
#include "transport/TTransport.h"
#include "transport/TTransportException.h"
+namespace facebook { namespace thrift { namespace transport {
+
/**
* Server transport framework. A server needs to have some facility for
* creating base transports to read/write from.
};
+}}} // facebook::thrift::transport
+
#endif
#include "transport/TSocket.h"
#include "transport/TTransportException.h"
+namespace facebook { namespace thrift { namespace transport {
+
using namespace std;
uint32_t g_socket_syscalls = 0;
perror("TSocket::setNoDelay()");
}
}
+}}} // facebook::thrift::transport
#include "transport/TTransport.h"
#include "transport/TServerSocket.h"
+namespace facebook { namespace thrift { namespace transport {
+
/**
* TCP Socket implementation of the TTransport interface.
*
int socket_;
};
+}}} // facebook::thrift::transport
#endif
#include <string>
#include "transport/TTransportException.h"
+namespace facebook { namespace thrift { namespace transport {
+
/**
* Generic interface for a method of transporting data. A TTransport may be
* capable of either reading or writing, but not necessarily both.
TTransport() {}
};
+}}} // facebook::thrift::transport
+
#endif
#include <string>
+namespace facebook { namespace thrift { namespace transport {
+
/**
* Error codes for the various types of exceptions.
*/
std::string message_;
};
+}}} // facebook::thrift::transport
+
#endif