Converted concurrency classes to use boost::shared_ptr and boost::weak_ptr:

Wrapped all thrift code in facebook::thrift:: namespace


	


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664735 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index ca2bbb5..a5b8f05 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -2,12 +2,20 @@
 #include "Exception.h"
 #include "Monitor.h"
 
+#include <boost/shared_ptr.hpp>
+
 #include <assert.h>
 #include <queue>
 #include <set>
 
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
 namespace facebook { namespace thrift { namespace concurrency { 
 
+using namespace boost;
+
 
 /** ThreadManager class
     
@@ -21,9 +29,16 @@
 
  public:
 
-  Impl() : _state(ThreadManager::UNINITIALIZED) {}
+  Impl() : 
+    _workerCount(0),
+    _workerMaxCount(0),
+    _idleCount(0),
+    _state(ThreadManager::UNINITIALIZED)
+  {}
 
-  ~Impl() {stop();}
+  ~Impl() {
+    stop();
+  }
 
   void start();
 
@@ -33,14 +48,14 @@
     return _state;
   };
 
-  const ThreadFactory* threadFactory() const {
+  shared_ptr<ThreadFactory> threadFactory() const {
 
     Synchronized s(_monitor); 
 
     return _threadFactory;
   }
       
-  void threadFactory(const ThreadFactory*  value) {
+  void threadFactory(shared_ptr<ThreadFactory> value) {
     
     Synchronized s(_monitor); 
     
@@ -74,9 +89,9 @@
     return _tasks.size() + _workerCount - _idleCount;
   }
   
-  void add(Runnable* value);
+  void add(shared_ptr<Runnable> value);
 
-  void remove(Runnable* task);
+  void remove(shared_ptr<Runnable> task);
 
 private:
 
@@ -88,11 +103,11 @@
 
   ThreadManager::STATE _state;
 
-  const ThreadFactory* _threadFactory;
+  shared_ptr<ThreadFactory> _threadFactory;
 
   friend class ThreadManager::Task;
 
-  std::queue<Task*> _tasks;
+  std::queue<shared_ptr<Task> > _tasks;
 
   Monitor _monitor;
 
@@ -100,9 +115,9 @@
 
   friend class ThreadManager::Worker;
 
-  std::set<Thread*> _workers;
+  std::set<shared_ptr<Thread> > _workers;
   
-  std::set<Thread*> _deadWorkers;
+  std::set<shared_ptr<Thread> > _deadWorkers;
 };
 
 class ThreadManager::Task : public Runnable {
@@ -115,7 +130,7 @@
     COMPLETE
   };
 
-  Task(Runnable* runnable) :
+  Task(shared_ptr<Runnable> runnable) :
     _runnable(runnable),
     _state(WAITING) 
     {}
@@ -131,7 +146,7 @@
 
  private:
 
-  Runnable* _runnable;
+  shared_ptr<Runnable> _runnable;
   
   friend class ThreadManager::Worker;
 
@@ -199,7 +214,7 @@
 
     while(active) {
 
-      ThreadManager::Task* task = NULL;
+      shared_ptr<ThreadManager::Task> task;
 
       /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
 
@@ -260,10 +275,6 @@
 	    
 	    // XXX need to log this
 	  }
-	  
-	  delete task;
-	  
-	  task = NULL;
 	}
       }
     }
@@ -294,13 +305,13 @@
 
 void ThreadManager::Impl::addWorker(size_t value) {
     
-  std::set<Thread*> newThreads;
+  std::set<shared_ptr<Thread> > newThreads;
   
   for(size_t ix = 0; ix < value; ix++) {
 
     class ThreadManager::Worker;
       
-    ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+    shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
 
     newThreads.insert(_threadFactory->newThread(worker));
   }
@@ -312,9 +323,9 @@
     _workers.insert(newThreads.begin(), newThreads.end());
   }
   
-  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+  for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
 
-    ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+    shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
 
     worker->_state = ThreadManager::Worker::STARTING;
 
@@ -378,13 +389,14 @@
     _state = ThreadManager::STOPPING;
   }
 
-  // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+  // XXX 
+  // should be able to block here for transition to STOPPED since we're now using shared_ptrs
      
 }
   
 void ThreadManager::Impl::removeWorker(size_t value) {
 
-  std::set<Thread*> removedThreads;
+  std::set<shared_ptr<Thread> > removedThreads;
 
   {Synchronized s(_monitor); 
 
@@ -413,20 +425,17 @@
       _workerMonitor.wait();
     }
 
-    for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+    for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
 
 	_workers.erase(*ix);
 	
-	delete (*ix)->runnable();
-
-	delete (*ix);
     }
 
     _deadWorkers.clear();
   }
 }
   
-void ThreadManager::Impl::add(Runnable* value) {
+void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
 
     Synchronized s(_monitor); 
 
@@ -435,7 +444,7 @@
       throw IllegalStateException();
     }
 
-    _tasks.push(new ThreadManager::Task(value));
+    _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
 
     /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
        task in time. */
@@ -446,7 +455,7 @@
     }
   }
 
-void ThreadManager::Impl::remove(Runnable* task) {
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
 
     Synchronized s(_monitor); 
 
@@ -479,12 +488,12 @@
 };
 
 
-ThreadManager* ThreadManager::newThreadManager() {
-   return new ThreadManager::Impl();
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+  return shared_ptr<ThreadManager>(new ThreadManager::Impl());
 }
 
-ThreadManager* ThreadManager::newSimpleThreadManager(size_t count) {
-   return new SimpleThreadManager(count);
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
+  return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
 }
 
 }}} // facebook::thrift::concurrency