Converted concurrency classes to use boost::shared_ptr and boost::weak_ptr:
Wrapped all thrift code in facebook::thrift:: namespace
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664735 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index ca2bbb5..a5b8f05 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -2,12 +2,20 @@
#include "Exception.h"
#include "Monitor.h"
+#include <boost/shared_ptr.hpp>
+
#include <assert.h>
#include <queue>
#include <set>
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** ThreadManager class
@@ -21,9 +29,16 @@
public:
- Impl() : _state(ThreadManager::UNINITIALIZED) {}
+ Impl() :
+ _workerCount(0),
+ _workerMaxCount(0),
+ _idleCount(0),
+ _state(ThreadManager::UNINITIALIZED)
+ {}
- ~Impl() {stop();}
+ ~Impl() {
+ stop();
+ }
void start();
@@ -33,14 +48,14 @@
return _state;
};
- const ThreadFactory* threadFactory() const {
+ shared_ptr<ThreadFactory> threadFactory() const {
Synchronized s(_monitor);
return _threadFactory;
}
- void threadFactory(const ThreadFactory* value) {
+ void threadFactory(shared_ptr<ThreadFactory> value) {
Synchronized s(_monitor);
@@ -74,9 +89,9 @@
return _tasks.size() + _workerCount - _idleCount;
}
- void add(Runnable* value);
+ void add(shared_ptr<Runnable> value);
- void remove(Runnable* task);
+ void remove(shared_ptr<Runnable> task);
private:
@@ -88,11 +103,11 @@
ThreadManager::STATE _state;
- const ThreadFactory* _threadFactory;
+ shared_ptr<ThreadFactory> _threadFactory;
friend class ThreadManager::Task;
- std::queue<Task*> _tasks;
+ std::queue<shared_ptr<Task> > _tasks;
Monitor _monitor;
@@ -100,9 +115,9 @@
friend class ThreadManager::Worker;
- std::set<Thread*> _workers;
+ std::set<shared_ptr<Thread> > _workers;
- std::set<Thread*> _deadWorkers;
+ std::set<shared_ptr<Thread> > _deadWorkers;
};
class ThreadManager::Task : public Runnable {
@@ -115,7 +130,7 @@
COMPLETE
};
- Task(Runnable* runnable) :
+ Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
_state(WAITING)
{}
@@ -131,7 +146,7 @@
private:
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
friend class ThreadManager::Worker;
@@ -199,7 +214,7 @@
while(active) {
- ThreadManager::Task* task = NULL;
+ shared_ptr<ThreadManager::Task> task;
/* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
@@ -260,10 +275,6 @@
// XXX need to log this
}
-
- delete task;
-
- task = NULL;
}
}
}
@@ -294,13 +305,13 @@
void ThreadManager::Impl::addWorker(size_t value) {
- std::set<Thread*> newThreads;
+ std::set<shared_ptr<Thread> > newThreads;
for(size_t ix = 0; ix < value; ix++) {
class ThreadManager::Worker;
- ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+ shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(_threadFactory->newThread(worker));
}
@@ -312,9 +323,9 @@
_workers.insert(newThreads.begin(), newThreads.end());
}
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+ for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+ shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->_state = ThreadManager::Worker::STARTING;
@@ -378,13 +389,14 @@
_state = ThreadManager::STOPPING;
}
- // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+ // XXX
+ // should be able to block here for transition to STOPPED since we're now using shared_ptrs
}
void ThreadManager::Impl::removeWorker(size_t value) {
- std::set<Thread*> removedThreads;
+ std::set<shared_ptr<Thread> > removedThreads;
{Synchronized s(_monitor);
@@ -413,20 +425,17 @@
_workerMonitor.wait();
}
- for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+ for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
_workers.erase(*ix);
- delete (*ix)->runnable();
-
- delete (*ix);
}
_deadWorkers.clear();
}
}
-void ThreadManager::Impl::add(Runnable* value) {
+void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
Synchronized s(_monitor);
@@ -435,7 +444,7 @@
throw IllegalStateException();
}
- _tasks.push(new ThreadManager::Task(value));
+ _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
/* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
task in time. */
@@ -446,7 +455,7 @@
}
}
-void ThreadManager::Impl::remove(Runnable* task) {
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Synchronized s(_monitor);
@@ -479,12 +488,12 @@
};
-ThreadManager* ThreadManager::newThreadManager() {
- return new ThreadManager::Impl();
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+ return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
-ThreadManager* ThreadManager::newSimpleThreadManager(size_t count) {
- return new SimpleThreadManager(count);
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
}
}}} // facebook::thrift::concurrency