Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
new file mode 100644
index 0000000..e7b5174
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <set>
+#include <iostream>
+#include <set>
+#include <stdint.h>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadManagerTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task(Monitor& monitor, size_t& count, int64_t timeout) :
+      _monitor(monitor),
+      _count(count),
+      _timeout(timeout),
+      _done(false) {}
+
+    void run() {
+
+      _startTime = Util::currentTime();
+
+      {
+        Synchronized s(_sleep);
+
+        try {
+          _sleep.wait(_timeout);
+        } catch(TimedOutException& e) {
+          ;
+        }catch(...) {
+          assert(0);
+        }
+      }
+
+      _endTime = Util::currentTime();
+
+      _done = true;
+
+      {
+        Synchronized s(_monitor);
+
+        // std::cout << "Thread " << _count << " completed " << std::endl;
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    size_t& _count;
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    bool _done;
+    Monitor _sleep;
+  };
+
+  /**
+   * Dispatch count tasks, each of which blocks for timeout milliseconds then
+   * completes. Verify that all tasks completed and that thread manager cleans
+   * up properly on delete.
+   */
+  bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
+
+    Monitor monitor;
+
+    size_t activeCount = count;
+
+    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+    threadManager->threadFactory(threadFactory);
+
+    threadManager->start();
+
+    std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
+
+    for (size_t ix = 0; ix < count; ix++) {
+
+      tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
+    }
+
+    int64_t time00 = Util::currentTime();
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+        threadManager->add(*ix);
+    }
+
+    {
+      Synchronized s(monitor);
+
+      while(activeCount > 0) {
+
+        monitor.wait();
+      }
+    }
+
+    int64_t time01 = Util::currentTime();
+
+    int64_t firstTime = 9223372036854775807LL;
+    int64_t lastTime = 0;
+
+    double averageTime = 0;
+    int64_t minTime = 9223372036854775807LL;
+    int64_t maxTime = 0;
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+      shared_ptr<ThreadManagerTests::Task> task = *ix;
+
+      int64_t delta = task->_endTime - task->_startTime;
+
+      assert(delta > 0);
+
+      if (task->_startTime < firstTime) {
+        firstTime = task->_startTime;
+      }
+
+      if (task->_endTime > lastTime) {
+        lastTime = task->_endTime;
+      }
+
+      if (delta < minTime) {
+        minTime = delta;
+      }
+
+      if (delta > maxTime) {
+        maxTime = delta;
+      }
+
+      averageTime+= delta;
+    }
+
+    averageTime /= count;
+
+    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
+
+    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+    double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+    if (error < 0) {
+      error*= -1.0;
+    }
+
+    bool success = error < ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+  class BlockTask: public Runnable {
+
+  public:
+
+    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+      _monitor(monitor),
+      _bmonitor(bmonitor),
+      _count(count) {}
+
+    void run() {
+      {
+        Synchronized s(_bmonitor);
+
+        _bmonitor.wait();
+
+      }
+
+      {
+        Synchronized s(_monitor);
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    Monitor& _bmonitor;
+    size_t& _count;
+  };
+
+  /**
+   * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
+   * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */
+
+  bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
+
+    bool success = false;
+
+    try {
+
+      Monitor bmonitor;
+      Monitor monitor;
+
+      size_t pendingTaskMaxCount = workerCount;
+
+      size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+      threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+      threadManager->threadFactory(threadFactory);
+
+      threadManager->start();
+
+      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+      for (size_t ix = 0; ix < workerCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+      }
+
+      for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+      }
+
+      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+        threadManager->add(*ix);
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+        throw TException("Unexpected pending task count");
+      }
+
+      shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+      try {
+        threadManager->add(extraTask, 1);
+        throw TException("Unexpected success adding task in excess of pending task count");
+      } catch(TimedOutException& e) {
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount()  << std::endl;
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[0] != 0) {
+          monitor.wait();
+        }
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+      try {
+        threadManager->add(extraTask, 1);
+      } catch(TimedOutException& e) {
+        std::cout << "\t\t\t" << "add timed out unexpectedly"  << std::endl;
+        throw TException("Unexpected timeout adding task");
+
+      } catch(TooManyPendingTasksException& e) {
+        std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+        throw TException("Unexpected timeout adding task");
+      }
+
+      // Wake up tasks that were pending before and wait for them to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[1] != 0) {
+          monitor.wait();
+        }
+      }
+
+      // Wake up the extra task and wait for it to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[2] != 0) {
+          monitor.wait();
+        }
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == 0))) {
+        throw TException("Unexpected pending task count");
+      }
+
+    } catch(TException& e) {
+    }
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+    return success;
+ }
+};
+
+const double ThreadManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+
+using namespace apache::thrift::concurrency::test;
+