Added thread factory test - problems in thread

Fixed stupid typo in  TimerManager::start


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664723 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index bd68264..015ffba 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -1,8 +1,9 @@
 #include "TimerManager.h"
+#include "Exception.h"
 #include "Util.h"
 
 #include <assert.h>
-
+#include <iostream>
 #include <set>
 
 namespace facebook { namespace thrift { namespace concurrency { 
@@ -43,24 +44,19 @@
 
   Runnable* _runnable;
 
+  class TimerManager::Dispatcher;
+
+  friend class TimerManager::Dispatcher;
+
   STATE _state;
 };
 
 class TimerManager::Dispatcher: public Runnable {
 
-  enum STATE {
-    UNINITIALIZED,
-    STARTING,
-    STARTED,
-    STOPPING,
-    STOPPED
-  };
-  
 public:
   Dispatcher(TimerManager* manager) : 
-    _manager(manager),
-    _state(UNINITIALIZED)
-  {}
+    _manager(manager) {
+}
   
   ~Dispatcher() {}
   
@@ -70,10 +66,13 @@
 
   void run() {
 
-    {Synchronized(_manager->_monitor);
+    {Synchronized s(_manager->_monitor);
 
-      if(_state == STARTING) {
-	_state = STARTED;
+      if(_manager->_state == TimerManager::STARTING) {
+
+	_manager->_state = TimerManager::STARTED;
+
+	_manager->_monitor.notifyAll();
       }
     }
 
@@ -81,26 +80,38 @@
 
       std::set<TimerManager::Task*> expiredTasks;
 
-      {Synchronized(_manager->_monitor);
-	
-	long long now = Util::currentTime();
+      {Synchronized s(_manager->_monitor);
+
+	/* Update next timeout if necessary */
 
 	task_iterator expiredTaskEnd;
-	
-	while(_state == STARTED && 
-	      (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
-	  
-	  _manager->_monitor.wait(_manager->_nextTimeout - now);
+
+	while(_manager->_state == TimerManager::STARTED && 
+	      (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+
+	  long long timeout = 0LL;
+
+	  if(!_manager->_taskMap.empty()) {
+
+	    timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+	  }
+	    
+ 	  _manager->_monitor.wait(timeout);
 	  
 	}
 	
-	if(_state == STARTED) {
+	if(_manager->_state == TimerManager::STARTED) {
 	  
 	  for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
 
 	    TimerManager::Task* task = ix->second;
 	    
 	    expiredTasks.insert(task);
+
+	    if(task->_state == TimerManager::Task::WAITING) {
+
+	      task->_state = TimerManager::Task::EXECUTING;
+	    }
 	    
 	    _manager->_taskCount--;
 	  }
@@ -116,13 +127,13 @@
 	delete *ix;
       }
       
-    } while(_state == STARTED);
+    } while(_manager->_state == TimerManager::STARTED);
 
-    {Synchronized(_manager->_monitor);
+    {Synchronized s(_manager->_monitor);
 
-      if(_state == STOPPING) {
+      if(_manager->_state == TimerManager::STOPPING) {
 
-	_state = STOPPED; 
+	_manager->_state = TimerManager::STOPPED; 
 
 	_manager->_monitor.notify();
 
@@ -137,13 +148,73 @@
   TimerManager* _manager;
 
   friend class TimerManager;
-
-  STATE _state;
 };
 
-TimerManager::TimerManager() {}
+TimerManager::TimerManager() :
+  _state(TimerManager::UNINITIALIZED),
+  _dispatcher(new Dispatcher(this)) {
+}
 
-TimerManager::~TimerManager() {}
+
+TimerManager::~TimerManager() {
+  delete _dispatcher;
+}
+
+void TimerManager::start() {
+
+  bool doStart = false;
+
+  {Synchronized s(_monitor);
+
+    if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+    if(_state == TimerManager::UNINITIALIZED) {
+
+      _state = TimerManager::STARTING;
+
+      doStart = true;
+    }
+  }
+
+  if(doStart) {
+
+    _dispatcherThread = _threadFactory->newThread(_dispatcher);
+
+    _dispatcherThread->start();
+  }
+
+  {Synchronized s(_monitor);
+
+    while(_state == TimerManager::STARTING) {
+
+      _monitor.wait();
+    }
+    
+    assert(_state != TimerManager::STARTING);
+  }
+}
+
+void TimerManager::stop() {
+
+  {Synchronized s(_monitor);
+
+    if(_state == TimerManager::UNINITIALIZED) {
+
+      _state = TimerManager::STOPPED;
+
+    } else if(_state != STOPPING &&  _state != STOPPED) {
+
+      _state = STOPPING;
+
+      _monitor.notifyAll();
+    }
+
+    while(_state != STOPPED) {
+
+      _monitor.wait();
+    }
+  }
+}
 
 const ThreadFactory* TimerManager::threadFactory() const {
 
@@ -159,6 +230,11 @@
   _threadFactory = value;
 }
 
+size_t TimerManager::taskCount() const {
+
+  return _taskCount;
+}
+      
 void TimerManager::add(Runnable* task, long long timeout) {
 
   long long now = Util::currentTime();
@@ -167,6 +243,10 @@
 
   {Synchronized s(_monitor); 
 
+    if(_state != TimerManager::STARTED) {
+      throw IllegalStateException();
+    }
+
     _taskCount++;
 
     _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
@@ -174,15 +254,10 @@
     /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
        kick the dispatcher so it can update its timeout */
 
-    if(_taskCount == 1 || timeout < _nextTimeout) {
+    if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
 
       _monitor.notify();
     }
-    
-    if(timeout < _nextTimeout) {
-
-      _nextTimeout = timeout;
-    }
   }
 }
 
@@ -192,20 +267,26 @@
 
   Util::toMilliseconds(expiration, value);
 
-  /* XXX
-     Need to convert this to an explicit exception */
-
   long long now = Util::currentTime();
 
-  assert(expiration < now);
+  if(expiration < now) {
+    throw  InvalidArgumentException();
+  }
 
   add(task, expiration - now);
 }
 
 
 void TimerManager::remove(Runnable* task) {
+  {Synchronized s(_monitor); 
 
+    if(_state != TimerManager::STARTED) {
+      throw IllegalStateException();
+    }
+  }
 }
 
+const TimerManager::STATE TimerManager::state() const { return _state;}
+
 }}} // facebook::thrift::concurrency