Another checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.

Added TimerManager -  I can't live without one after all.

Added Util - handy place for common time operations et al.

Initial test code


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664722 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index c4ca2b1..b5d02e6 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -1,6 +1,9 @@
 #include "ThreadManager.h"
+#include "Monitor.h"
 
 #include <assert.h>
+#include <queue>
+#include <set>
 
 namespace facebook { namespace thrift { namespace concurrency { 
 
@@ -15,7 +18,108 @@
     policy issues.  The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
 
     @author marc
-    @version $Id */
+    @version $Id:$ */
+
+class ThreadManager::Impl : public ThreadManager  {
+
+ public:
+
+  Impl(size_t highWatermark, size_t lowWatermark) : 
+    _hiwat(highWatermark), 
+    _lowat(lowWatermark) {
+  }
+
+  ~Impl() {}
+
+  size_t highWatermark() const {return _hiwat;}
+
+  void highWatermark(size_t value) {_hiwat = value;}
+
+  size_t lowWatermark() const {return _lowat;}
+
+  void lowWatermark(size_t value) {_lowat = value;}
+
+  const PoolPolicy* poolPolicy() const {
+
+    Synchronized s(_monitor); 
+  
+    return _poolPolicy;
+  }
+  
+  void poolPolicy(const PoolPolicy*  value) {
+
+    Synchronized s(_monitor); 
+
+    _poolPolicy = value;
+  }
+
+  const ThreadFactory* threadFactory() const {
+
+    Synchronized s(_monitor); 
+
+    return _threadFactory;
+  }
+      
+  void threadFactory(const ThreadFactory*  value) {
+    
+    Synchronized s(_monitor); 
+    
+    _threadFactory = value;
+  }
+
+  void addThread(size_t value);
+  
+  void removeThread(size_t value);
+  
+  size_t idleWorkerCount() const {return _idleCount;}
+
+  size_t workerCount() const {
+
+    Synchronized s(_monitor); 
+
+    return _workers.size();
+  }
+  
+  size_t pendingTaskCount() const {
+
+    Synchronized s(_monitor); 
+
+    return _tasks.size();
+  }
+
+  size_t totalTaskCount() const {
+
+    Synchronized s(_monitor); 
+    
+    return _tasks.size() + _workers.size() - _idleCount;
+  }
+  
+  void add(Runnable* value);
+
+  void remove(Runnable* task);
+
+private:
+
+  size_t _hiwat;
+
+  size_t _lowat;
+
+  size_t _idleCount;
+
+  const PoolPolicy* _poolPolicy;;
+
+  const ThreadFactory* _threadFactory;
+
+  friend class ThreadManager::Task;
+
+  std::queue<Task*> _tasks;
+
+  Monitor _monitor;
+
+  friend class ThreadManager::Worker;
+
+  std::set<Thread*> _workers;
+};
 
 class ThreadManager::Task : public Runnable {
 
@@ -49,7 +153,6 @@
 };
 
 class ThreadManager::Worker: public Runnable {
-
   enum STATE {
     UNINITIALIZED,
     STARTING,
@@ -59,7 +162,7 @@
   };
 
  public:
-  Worker(ThreadManager* manager) : 
+  Worker(ThreadManager::Impl* manager) : 
     _manager(manager),
     _state(UNINITIALIZED),
     _idle(false)
@@ -134,170 +237,138 @@
 
  private:
 
-  ThreadManager* _manager;
+  ThreadManager::Impl* _manager;
 
-  friend class ThreadManager;
+  friend class ThreadManager::Impl;
 
   STATE _state;
 
   bool _idle;
 };
 
-ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) : 
-  _hiwat(highWatermark), 
-  _lowat(lowWatermark) {
-}
-
-ThreadManager::~ThreadManager() {}
-
-size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
-
-void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
-
-size_t ThreadManager::lowWatermark() const {return _lowat;}
-
-void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
-
-const PoolPolicy* ThreadManager::poolPolicy() const {
-
-  Synchronized s(_monitor); 
-  
-  return _poolPolicy;
-}
-  
-void ThreadManager::poolPolicy(const PoolPolicy*  value) {
-
-  Synchronized s(_monitor); 
-
-  _poolPolicy = value;
-}
-
-const ThreadFactory* ThreadManager::threadFactory() const {
-
-  Synchronized s(_monitor); 
-
-  return _threadFactory;
-}
-      
-void ThreadManager::threadFactory(const ThreadFactory*  value) {
+void ThreadManager::Impl::addThread(size_t value) {
     
-  Synchronized s(_monitor); 
+    std::set<Thread*> newThreads;
   
-  _threadFactory = value;
-}
+    for(size_t ix = 0; ix < value; ix++) {
 
-void ThreadManager::addThread(size_t value) {
+      class ThreadManager::Worker;
+      
+      ThreadManager::Worker* worker = new ThreadManager::Worker(this);
 
-  std::set<Thread*> newThreads;
+      newThreads.insert(_threadFactory->newThread(worker));
+    }
   
-  for(size_t ix = 0; ix < value; ix++) {
+    for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
 
-    ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+      (*ix)->start();
+    }
+    for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
 
-    newThreads.insert(_threadFactory->newThread(worker));
+      (*ix)->start();
+    }
+    
+    {Synchronized s(_monitor); 
+      
+      _workers.insert(newThreads.begin(), newThreads.end());
+    }
   }
   
-  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
-    (*ix)->start();
-  }
-  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
-    (*ix)->start();
-  }
-
-  {Synchronized s(_monitor); 
-
-    _workers.insert(newThreads.begin(), newThreads.end());
-  }
-}
-
-void ThreadManager::removeThread(size_t value) {
+void ThreadManager::Impl::removeThread(size_t value) {
 
   std::set<Thread*> removedThreads;
 
-  {Synchronized s(_monitor); 
+    {Synchronized s(_monitor); 
   
-    /* Overly clever loop
+      /* Overly clever loop
+	 
+	 First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
+	 and remove a sufficient number of busy threads. */
 
-       First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
-       and remove a sufficient number of busy threads. */
-
-    for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+      for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
       
-      for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+	for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
 
-	Worker* worker = (Worker*)(*workerThread)->runnable();
+	  Worker* worker = (Worker*)(*workerThread)->runnable();
 
-	if(worker->_idle || !idleOnly) {
+	  if(worker->_idle || !idleOnly) {
 	
-	  removedThreads.insert(*workerThread);
+	    removedThreads.insert(*workerThread);
 	
-	  _workers.erase(workerThread);
+	    _workers.erase(workerThread);
+	  }
 	}
       }
+      
+      _monitor.notifyAll();
     }
+
     
-    _monitor.notifyAll();
+    // Join removed threads and free worker 
+
+    for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
+
+      Worker* worker = (Worker*)(*workerThread)->runnable();
+
+      (*workerThread)->join();
+      
+      delete worker;
+    }
+  }
+  
+void ThreadManager::Impl::add(Runnable* value) {
+
+    Synchronized s(_monitor); 
+
+    _tasks.push(new ThreadManager::Task(value));
+
+    /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+       task in time. */
+
+    if(_tasks.size() == 1) {
+
+      assert(_idleCount == _workers.size());
+
+      _monitor.notify();
+    }
   }
 
+void ThreadManager::Impl::remove(Runnable* task) {
 
-  // Join removed threads and free worker 
-
-  for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
-    Worker* worker = (Worker*)(*workerThread)->runnable();
-
-    (*workerThread)->join();
-
-    delete worker;
+    Synchronized s(_monitor); 
   }
+
+ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
+  return new ThreadManager::Impl(lowWatermark, highWatermark);
 }
 
-size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
+/**  Basic Pool Policy Implementation */
 
-size_t ThreadManager::workerCount() const {
+class BasicPoolPolicy::Impl : public PoolPolicy {
 
-  Synchronized s(_monitor); 
+ public:
 
-  return _workers.size();
-}
+  Impl() {}
 
-size_t ThreadManager::pendingTaskCount() const {
+  ~Impl() {}
 
-  Synchronized s(_monitor); 
+  void onEmpty(ThreadManager* source) const  {}
 
-  return _tasks.size();
-}
+  void onLowWatermark(ThreadManager* source) const  {}
 
-size_t ThreadManager::totalTaskCount() const {
+  void onHighWatermark(ThreadManager* source) const {}
+};
 
-  Synchronized s(_monitor); 
+BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl())  {}
 
-  return _tasks.size() + _workers.size() - _idleCount;
-}
+BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
 
-void ThreadManager::add(Runnable* value) {
+void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
 
-  Synchronized s(_monitor); 
+void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
 
-  _tasks.push(new ThreadManager::Task(value));
+void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
 
-  /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
-     task in time. */
-
-  if(_tasks.size() == 1) {
-
-    assert(_idleCount == _workers.size());
-
-    _monitor.notify();
-  }
-}
-
-void ThreadManager::remove(Runnable* task) {
-
-  Synchronized s(_monitor); 
-}
 
 }}} // facebook::thrift::concurrency