Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp
new file mode 100644
index 0000000..c80bb88
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/Tests.cpp
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+#include <vector>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+#include "ThreadManagerTests.h"
+
+int main(int argc, char** argv) {
+
+  std::string arg;
+
+  std::vector<std::string>  args(argc - 1 > 1 ? argc - 1 : 1);
+
+  args[0] = "all";
+
+  for (int ix = 1; ix < argc; ix++) {
+    args[ix - 1] = std::string(argv[ix]);
+  }
+
+  bool runAll = args[0].compare("all") == 0;
+
+  if (runAll || args[0].compare("thread-factory") == 0) {
+
+    ThreadFactoryTests threadFactoryTests;
+
+    std::cout << "ThreadFactory tests..." << std::endl;
+
+    size_t count =  1000;
+    size_t floodLoops =  1;
+    size_t floodCount =  100000;
+
+    std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
+
+    assert(threadFactoryTests.reapNThreads(count));
+
+    std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+    assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
+    std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
+
+    assert(threadFactoryTests.synchStartTest());
+
+    std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
+
+    assert(threadFactoryTests.monitorTimeoutTest());
+  }
+
+  if (runAll || args[0].compare("util") == 0) {
+
+    std::cout << "Util tests..." << std::endl;
+
+    std::cout << "\t\tUtil minimum time" << std::endl;
+
+    int64_t time00 = Util::currentTime();
+    int64_t time01 = Util::currentTime();
+
+    std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
+
+    time00 = Util::currentTime();
+    time01 = time00;
+    size_t count = 0;
+
+    while (time01 < time00 + 10) {
+      count++;
+      time01 = Util::currentTime();
+    }
+
+    std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
+  }
+
+
+  if (runAll || args[0].compare("timer-manager") == 0) {
+
+    std::cout << "TimerManager tests..." << std::endl;
+
+    std::cout << "\t\tTimerManager test00" << std::endl;
+
+    TimerManagerTests timerManagerTests;
+
+    assert(timerManagerTests.test00());
+  }
+
+  if (runAll || args[0].compare("thread-manager") == 0) {
+
+    std::cout << "ThreadManager tests..." << std::endl;
+
+    {
+
+      size_t workerCount = 100;
+
+      size_t taskCount = 100000;
+
+      int64_t delay = 10LL;
+
+      std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+      ThreadManagerTests threadManagerTests;
+
+      assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+
+      std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl;
+
+      assert(threadManagerTests.blockTest(delay, workerCount));
+
+    }
+  }
+
+  if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
+
+    std::cout << "ThreadManager benchmark tests..." << std::endl;
+
+    {
+
+      size_t minWorkerCount = 2;
+
+      size_t maxWorkerCount = 512;
+
+      size_t tasksPerWorker = 1000;
+
+      int64_t delay = 10LL;
+
+      for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
+
+        size_t taskCount = workerCount * tasksPerWorker;
+
+        std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+        ThreadManagerTests threadManagerTests;
+
+        threadManagerTests.loadTest(taskCount, delay, workerCount);
+      }
+    }
+  }
+}
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
new file mode 100644
index 0000000..859fbaf
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using boost::shared_ptr;
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadFactoryTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task() {}
+
+    void run() {
+      std::cout << "\t\t\tHello World" << std::endl;
+    }
+  };
+
+  /**
+   * Hello world test
+   */
+  bool helloWorldTest() {
+
+    PosixThreadFactory threadFactory = PosixThreadFactory();
+
+    shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
+
+    shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+    thread->start();
+
+    thread->join();
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  /**
+   * Reap N threads
+   */
+  class ReapNTask: public Runnable {
+
+   public:
+
+    ReapNTask(Monitor& monitor, int& activeCount) :
+      _monitor(monitor),
+      _count(activeCount) {}
+
+    void run() {
+      Synchronized s(_monitor);
+
+      _count--;
+
+      //std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+      if (_count == 0) {
+        _monitor.notify();
+      }
+    }
+
+    Monitor& _monitor;
+
+    int& _count;
+  };
+
+  bool reapNThreads(int loop=1, int count=10) {
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    Monitor* monitor = new Monitor();
+
+    for(int lix = 0; lix < loop; lix++) {
+
+      int* activeCount  = new int(count);
+
+      std::set<shared_ptr<Thread> > threads;
+
+      int tix;
+
+      for (tix = 0; tix < count; tix++) {
+        try {
+          threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+
+      tix = 0;
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
+
+        try {
+          (*thread)->start();
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+
+      {
+        Synchronized s(*monitor);
+        while (*activeCount > 0) {
+          monitor->wait(1000);
+        }
+      }
+
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+        threads.erase(*thread);
+      }
+
+      std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
+    }
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  class SynchStartTask: public Runnable {
+
+   public:
+
+    enum STATE {
+      UNINITIALIZED,
+      STARTING,
+      STARTED,
+      STOPPING,
+      STOPPED
+    };
+
+    SynchStartTask(Monitor& monitor, volatile  STATE& state) :
+      _monitor(monitor),
+      _state(state) {}
+
+    void run() {
+      {
+        Synchronized s(_monitor);
+        if (_state == SynchStartTask::STARTING) {
+          _state = SynchStartTask::STARTED;
+          _monitor.notify();
+        }
+      }
+
+      {
+        Synchronized s(_monitor);
+        while (_state == SynchStartTask::STARTED) {
+          _monitor.wait();
+        }
+
+        if (_state == SynchStartTask::STOPPING) {
+          _state = SynchStartTask::STOPPED;
+          _monitor.notifyAll();
+        }
+      }
+    }
+
+   private:
+    Monitor& _monitor;
+    volatile  STATE& _state;
+  };
+
+  bool synchStartTest() {
+
+    Monitor monitor;
+
+    SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+
+    shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+    if (state == SynchStartTask::UNINITIALIZED) {
+
+      state = SynchStartTask::STARTING;
+
+      thread->start();
+    }
+
+    {
+      Synchronized s(monitor);
+      while (state == SynchStartTask::STARTING) {
+        monitor.wait();
+      }
+    }
+
+    assert(state != SynchStartTask::STARTING);
+
+    {
+      Synchronized s(monitor);
+
+      try {
+          monitor.wait(100);
+      } catch(TimedOutException& e) {
+      }
+
+      if (state == SynchStartTask::STARTED) {
+
+        state = SynchStartTask::STOPPING;
+
+        monitor.notify();
+      }
+
+      while (state == SynchStartTask::STOPPING) {
+        monitor.wait();
+      }
+    }
+
+    assert(state == SynchStartTask::STOPPED);
+
+    bool success = true;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+
+    return true;
+  }
+
+  /** See how accurate monitor timeout is. */
+
+  bool monitorTimeoutTest(size_t count=1000, int64_t timeout=10) {
+
+    Monitor monitor;
+
+    int64_t startTime = Util::currentTime();
+
+    for (size_t ix = 0; ix < count; ix++) {
+      {
+        Synchronized s(monitor);
+        try {
+            monitor.wait(timeout);
+        } catch(TimedOutException& e) {
+        }
+      }
+    }
+
+    int64_t endTime = Util::currentTime();
+
+    double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
+
+    if (error < 0.0)  {
+
+      error *= 1.0;
+    }
+
+    bool success = error < ThreadFactoryTests::ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+
+  class FloodTask : public Runnable {
+  public:
+
+    FloodTask(const size_t id) :_id(id) {}
+    ~FloodTask(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " done" << std::endl;
+      }
+    }
+
+    void run(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " started" << std::endl;
+      }
+
+      usleep(1);
+    }
+    const size_t _id;
+  };
+
+  void foo(PosixThreadFactory *tf) {
+  }
+
+  bool floodNTest(size_t loop=1, size_t count=100000) {
+
+    bool success = false;
+
+    for(size_t lix = 0; lix < loop; lix++) {
+
+      PosixThreadFactory threadFactory = PosixThreadFactory();
+      threadFactory.setDetached(true);
+
+        for(size_t tix = 0; tix < count; tix++) {
+
+          try {
+
+            shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+            shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+            thread->start();
+
+            usleep(1);
+
+          } catch (TException& e) {
+
+            std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+
+            return success;
+          }
+        }
+
+        std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+        success = true;
+    }
+
+    return success;
+  }
+};
+
+const double ThreadFactoryTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency::test
+
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
new file mode 100644
index 0000000..e7b5174
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <set>
+#include <iostream>
+#include <set>
+#include <stdint.h>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadManagerTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task(Monitor& monitor, size_t& count, int64_t timeout) :
+      _monitor(monitor),
+      _count(count),
+      _timeout(timeout),
+      _done(false) {}
+
+    void run() {
+
+      _startTime = Util::currentTime();
+
+      {
+        Synchronized s(_sleep);
+
+        try {
+          _sleep.wait(_timeout);
+        } catch(TimedOutException& e) {
+          ;
+        }catch(...) {
+          assert(0);
+        }
+      }
+
+      _endTime = Util::currentTime();
+
+      _done = true;
+
+      {
+        Synchronized s(_monitor);
+
+        // std::cout << "Thread " << _count << " completed " << std::endl;
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    size_t& _count;
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    bool _done;
+    Monitor _sleep;
+  };
+
+  /**
+   * Dispatch count tasks, each of which blocks for timeout milliseconds then
+   * completes. Verify that all tasks completed and that thread manager cleans
+   * up properly on delete.
+   */
+  bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
+
+    Monitor monitor;
+
+    size_t activeCount = count;
+
+    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+    threadManager->threadFactory(threadFactory);
+
+    threadManager->start();
+
+    std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
+
+    for (size_t ix = 0; ix < count; ix++) {
+
+      tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
+    }
+
+    int64_t time00 = Util::currentTime();
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+        threadManager->add(*ix);
+    }
+
+    {
+      Synchronized s(monitor);
+
+      while(activeCount > 0) {
+
+        monitor.wait();
+      }
+    }
+
+    int64_t time01 = Util::currentTime();
+
+    int64_t firstTime = 9223372036854775807LL;
+    int64_t lastTime = 0;
+
+    double averageTime = 0;
+    int64_t minTime = 9223372036854775807LL;
+    int64_t maxTime = 0;
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+      shared_ptr<ThreadManagerTests::Task> task = *ix;
+
+      int64_t delta = task->_endTime - task->_startTime;
+
+      assert(delta > 0);
+
+      if (task->_startTime < firstTime) {
+        firstTime = task->_startTime;
+      }
+
+      if (task->_endTime > lastTime) {
+        lastTime = task->_endTime;
+      }
+
+      if (delta < minTime) {
+        minTime = delta;
+      }
+
+      if (delta > maxTime) {
+        maxTime = delta;
+      }
+
+      averageTime+= delta;
+    }
+
+    averageTime /= count;
+
+    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
+
+    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+    double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+    if (error < 0) {
+      error*= -1.0;
+    }
+
+    bool success = error < ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+  class BlockTask: public Runnable {
+
+  public:
+
+    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+      _monitor(monitor),
+      _bmonitor(bmonitor),
+      _count(count) {}
+
+    void run() {
+      {
+        Synchronized s(_bmonitor);
+
+        _bmonitor.wait();
+
+      }
+
+      {
+        Synchronized s(_monitor);
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    Monitor& _bmonitor;
+    size_t& _count;
+  };
+
+  /**
+   * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
+   * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */
+
+  bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
+
+    bool success = false;
+
+    try {
+
+      Monitor bmonitor;
+      Monitor monitor;
+
+      size_t pendingTaskMaxCount = workerCount;
+
+      size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+      threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+      threadManager->threadFactory(threadFactory);
+
+      threadManager->start();
+
+      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+      for (size_t ix = 0; ix < workerCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+      }
+
+      for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+      }
+
+      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+        threadManager->add(*ix);
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+        throw TException("Unexpected pending task count");
+      }
+
+      shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+      try {
+        threadManager->add(extraTask, 1);
+        throw TException("Unexpected success adding task in excess of pending task count");
+      } catch(TimedOutException& e) {
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount()  << std::endl;
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[0] != 0) {
+          monitor.wait();
+        }
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+      try {
+        threadManager->add(extraTask, 1);
+      } catch(TimedOutException& e) {
+        std::cout << "\t\t\t" << "add timed out unexpectedly"  << std::endl;
+        throw TException("Unexpected timeout adding task");
+
+      } catch(TooManyPendingTasksException& e) {
+        std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+        throw TException("Unexpected timeout adding task");
+      }
+
+      // Wake up tasks that were pending before and wait for them to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[1] != 0) {
+          monitor.wait();
+        }
+      }
+
+      // Wake up the extra task and wait for it to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[2] != 0) {
+          monitor.wait();
+        }
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == 0))) {
+        throw TException("Unexpected pending task count");
+      }
+
+    } catch(TException& e) {
+    }
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+    return success;
+ }
+};
+
+const double ThreadManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+
+using namespace apache::thrift::concurrency::test;
+
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
new file mode 100644
index 0000000..e6fe6ce
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <concurrency/TimerManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class TimerManagerTests {
+
+ public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+   public:
+
+    Task(Monitor& monitor, int64_t timeout) :
+      _timeout(timeout),
+      _startTime(Util::currentTime()),
+      _monitor(monitor),
+      _success(false),
+      _done(false) {}
+
+    ~Task() { std::cerr << this << std::endl; }
+
+    void run() {
+
+      _endTime = Util::currentTime();
+
+      // Figure out error percentage
+
+      int64_t delta = _endTime - _startTime;
+
+
+      delta = delta > _timeout ?  delta - _timeout : _timeout - delta;
+
+      float error = delta / _timeout;
+
+      if(error < ERROR) {
+        _success = true;
+      }
+
+      _done = true;
+
+      std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug
+
+      {Synchronized s(_monitor);
+        _monitor.notifyAll();
+      }
+    }
+
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    Monitor& _monitor;
+    bool _success;
+    bool _done;
+  };
+
+  /**
+   * This test creates two tasks and waits for the first to expire within 10%
+   * of the expected expiration time. It then verifies that the timer manager
+   * properly clean up itself and the remaining orphaned timeout task when the
+   * manager goes out of scope and its destructor is called.
+   */
+  bool test00(int64_t timeout=1000LL) {
+
+    shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
+
+    {
+
+      TimerManager timerManager;
+
+      timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
+
+      timerManager.start();
+
+      assert(timerManager.state() == TimerManager::STARTED);
+
+      shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
+
+      {
+        Synchronized s(_monitor);
+
+        timerManager.add(orphanTask, 10 * timeout);
+
+        timerManager.add(task, timeout);
+
+        _monitor.wait();
+      }
+
+      assert(task->_done);
+
+
+      std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+    }
+
+    // timerManager.stop(); This is where it happens via destructor
+
+    assert(!orphanTask->_done);
+
+    return true;
+  }
+
+  friend class TestTask;
+
+  Monitor _monitor;
+};
+
+const double TimerManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+