Bring up of thread manager

facebook::thrift::concurrency::test.ThreadManagerTest::test00
	Launch N tasks that block for time T, verify they all complete and that the thread manager cleans up properly
	when it goes out of scope
	


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664725 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index d13ce7b..f8a5c22 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -1,4 +1,5 @@
 #include "ThreadManager.h"
+#include "Exception.h"
 #include "Monitor.h"
 
 #include <assert.h>
@@ -10,12 +11,7 @@
 /** ThreadManager class
     
     This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
-    it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
-    PoolPolicy object bound to instances of this manager of interesting transitions.  It is then up the PoolPolicy object to decide if the thread pool
-    size needs to be adjusted and call this object addThread and removeThread methods to make changes.
-
-    This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
-    policy issues.  The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
+    it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
 
     @author marc
     @version $Id:$ */
@@ -24,34 +20,18 @@
 
  public:
 
-  Impl(size_t highWatermark, size_t lowWatermark) : 
-    _hiwat(highWatermark), 
-    _lowat(lowWatermark) {
+  Impl() : _stopped(false) {}
+
+
+
+  ~Impl() {
+
+    if(!_stopped) {
+      stop();
+    }
   }
 
-  ~Impl() {}
-
-  size_t highWatermark() const {return _hiwat;}
-
-  void highWatermark(size_t value) {_hiwat = value;}
-
-  size_t lowWatermark() const {return _lowat;}
-
-  void lowWatermark(size_t value) {_lowat = value;}
-
-  const PoolPolicy* poolPolicy() const {
-
-    Synchronized s(_monitor); 
-  
-    return _poolPolicy;
-  }
-  
-  void poolPolicy(const PoolPolicy*  value) {
-
-    Synchronized s(_monitor); 
-
-    _poolPolicy = value;
-  }
+  void stop();
 
   const ThreadFactory* threadFactory() const {
 
@@ -67,9 +47,9 @@
     _threadFactory = value;
   }
 
-  void addThread(size_t value);
+  void addWorker(size_t value);
   
-  void removeThread(size_t value);
+  void removeWorker(size_t value);
   
   size_t idleWorkerCount() const {return _idleCount;}
 
@@ -77,7 +57,7 @@
 
     Synchronized s(_monitor); 
 
-    return _workers.size();
+    return _workerCount;
   }
   
   size_t pendingTaskCount() const {
@@ -91,7 +71,7 @@
 
     Synchronized s(_monitor); 
     
-    return _tasks.size() + _workers.size() - _idleCount;
+    return _tasks.size() + _workerCount - _idleCount;
   }
   
   void add(Runnable* value);
@@ -100,13 +80,11 @@
 
 private:
 
-  size_t _hiwat;
-
-  size_t _lowat;
+  size_t _workerCount;
 
   size_t _idleCount;
 
-  const PoolPolicy* _poolPolicy;;
+  bool _stopped;
 
   const ThreadFactory* _threadFactory;
 
@@ -148,6 +126,8 @@
  private:
 
   Runnable* _runnable;
+  
+  friend class ThreadManager::Worker;
 
   STATE _state;
 };
@@ -181,6 +161,10 @@
       if(_state == STARTING) {
 	_state = STARTED;
       }
+
+      _manager->_workerCount++;
+
+      _manager->_monitor.notify();
     }
 
     do {
@@ -207,22 +191,43 @@
 	}
 	
 	if(_state == STARTED) {
+
+	  if(!_manager->_tasks.empty()) {
 	  
-	  task = _manager->_tasks.front();
+	    task = _manager->_tasks.front();
+
+	    _manager->_tasks.pop();
+
+	    if(task->_state == ThreadManager::Task::WAITING) {
+
+	      task->_state = ThreadManager::Task::EXECUTING;
+	    }
+	  }
 	}
       }
 
       if(task != NULL) {
 
-	task->run();
+	if(task->_state == ThreadManager::Task::EXECUTING) {
+	  try {
+	    
+	    task->run();
 
-	delete task;
+	  } catch(...) {
+	    
+	    // XXX need to log this
+	  }
+	  
+	  delete task;
+	}
       }
-
+      
     } while(_state == STARTED);
 
     {Synchronized s(_manager->_monitor);
 
+      _manager->_workerCount--;
+
       if(_state == STOPPING) {
 
 	_state = STOPPED; 
@@ -246,35 +251,56 @@
   bool _idle;
 };
 
-void ThreadManager::Impl::addThread(size_t value) {
+void ThreadManager::Impl::addWorker(size_t value) {
     
-    std::set<Thread*> newThreads;
+  std::set<Thread*> newThreads;
   
-    for(size_t ix = 0; ix < value; ix++) {
+  for(size_t ix = 0; ix < value; ix++) {
 
-      class ThreadManager::Worker;
+    class ThreadManager::Worker;
       
-      ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+    ThreadManager::Worker* worker = new ThreadManager::Worker(this);
 
-      newThreads.insert(_threadFactory->newThread(worker));
-    }
-  
-    for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
-      (*ix)->start();
-    }
-    for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
-      (*ix)->start();
-    }
-    
-    {Synchronized s(_monitor); 
-      
-      _workers.insert(newThreads.begin(), newThreads.end());
-    }
+    newThreads.insert(_threadFactory->newThread(worker));
   }
   
-void ThreadManager::Impl::removeThread(size_t value) {
+  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+    ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+
+    worker->_state = ThreadManager::Worker::STARTING;
+
+    (*ix)->start();
+  }
+
+  {Synchronized s(_monitor); 
+      
+    _workers.insert(newThreads.begin(), newThreads.end());
+
+    while(_workerCount != _workers.size()) {
+      _monitor.wait();
+    }
+  }
+}
+
+void ThreadManager::Impl::stop() {
+
+  bool doStop = false;
+
+  {Synchronized s(_monitor); 
+
+    if(!_stopped) {
+      doStop = true;
+      _stopped = true;
+    }
+  }
+
+  if(doStop) {
+    removeWorker(_workerCount);
+  }
+}
+  
+void ThreadManager::Impl::removeWorker(size_t value) {
 
   std::set<Thread*> removedThreads;
 
@@ -285,14 +311,19 @@
 	 First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
 	 and remove a sufficient number of busy threads. */
 
-      for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+      for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
       
 	for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
 
 	  Worker* worker = (Worker*)(*workerThread)->runnable();
 
 	  if(worker->_idle || !idleOnly) {
-	
+
+	    if(worker->_state == ThreadManager::Worker::STARTED) {
+
+	      worker->_state = ThreadManager::Worker::STOPPING;
+	    }
+
 	    removedThreads.insert(*workerThread);
 	
 	    _workers.erase(workerThread);
@@ -320,15 +351,17 @@
 
     Synchronized s(_monitor); 
 
+    bool isEmpty = _tasks.empty();
+
     _tasks.push(new ThreadManager::Task(value));
 
-    /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+    /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
        task in time. */
 
-    if(_tasks.size() == 1) {
+    if(isEmpty) {
 
-      assert(_idleCount == _workers.size());
-
+      assert(_idleCount == _workerCount);
+      
       _monitor.notify();
     }
   }
@@ -336,39 +369,54 @@
 void ThreadManager::Impl::remove(Runnable* task) {
 
     Synchronized s(_monitor); 
-  }
-
-ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
-  return new ThreadManager::Impl(lowWatermark, highWatermark);
 }
 
-/**  Basic Pool Policy Implementation */
+class SimpleThreadManager : public ThreadManager::Impl {
 
-class BasicPoolPolicy::Impl : public PoolPolicy {
+public:
 
- public:
+  SimpleThreadManager(size_t workerCount=4) : 
+    _workerCount(workerCount),
+    _firstTime(true) {
+  }
 
-  Impl() {}
+  void add(Runnable* task) {
 
-  ~Impl() {}
+    bool addWorkers = false;
 
-  void onEmpty(ThreadManager* source) const  {}
+    {Synchronized s(_monitor);
 
-  void onLowWatermark(ThreadManager* source) const  {}
+      if(_firstTime) {
 
-  void onHighWatermark(ThreadManager* source) const {}
+	_firstTime = false;
+
+	addWorkers = true;
+      }
+    }
+
+    if(addWorkers) {
+
+      addWorker(_workerCount);
+    }
+
+    Impl::add(task);
+  }
+
+private:
+
+  const size_t _workerCount;
+  bool _firstTime;
+  Monitor _monitor;
 };
 
-BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl())  {}
 
-BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
+ThreadManager* ThreadManager::newThreadManager() {
+   return new ThreadManager::Impl();
+}
 
-void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
-
-void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
-
-void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
-
+ThreadManager* ThreadManager::newSimpleThreadManager() {
+   return new SimpleThreadManager();
+}
 
 }}} // facebook::thrift::concurrency