THRIFT-1336 thrift: added server and processor test code

move the tests from src to test:
lib\cpp\src\thrift\concurrency\test to lib\cpp\test\concurrency
lib\cpp\src\thrift\processor\test to lib\cpp\test\processor

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1337098 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am
old mode 100644
new mode 100755
index bf41935..f7b1c26
--- a/lib/cpp/test/Makefile.am
+++ b/lib/cpp/test/Makefile.am
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-noinst_LTLIBRARIES = libtestgencpp.la
+noinst_LTLIBRARIES = libtestgencpp.la libprocessortest.la
 nodist_libtestgencpp_la_SOURCES = \
 	gen-cpp/DebugProtoTest_types.cpp \
 	gen-cpp/OptionalRequiredTest_types.cpp \
@@ -29,6 +29,14 @@
 	ThriftTest_extras.cpp \
 	DebugProtoTest_extras.cpp
 
+nodist_libprocessortest_la_SOURCES = \
+	gen-cpp/ChildService.cpp \
+	gen-cpp/ChildService.h \
+	gen-cpp/ParentService.cpp \
+	gen-cpp/ParentService.h \
+	gen-cpp/proc_types.cpp \
+	gen-cpp/proc_types.h
+
 ThriftTest_extras.o: gen-cpp/ThriftTest_types.h
 DebugProtoTest_extras.o: gen-cpp/DebugProtoTest_types.h
 
@@ -52,7 +60,9 @@
 	TransportTest \
 	ZlibTest \
 	TFileTransportTest \
-	UnitTests
+	UnitTests \
+	concurrency_test \
+	processor_test
 
 TESTS_ENVIRONMENT= \
 	BOOST_TEST_LOG_SINK=tests.xml \
@@ -163,7 +173,29 @@
 
 SpecializationTest_LDADD = libtestgencpp.la
 
+concurrency_test_SOURCES = \
+	concurrency/Tests.cpp \
+	concurrency/ThreadFactoryTests.h \
+	concurrency/ThreadManagerTests.h \
+	concurrency/TimerManagerTests.h
 
+concurrency_test_LDADD = \
+  $(top_builddir)/lib/cpp/libthrift.la
+
+processor_test_SOURCES = \
+	processor/ProcessorTest.cpp \
+	processor/EventLog.cpp \
+	processor/ServerThread.cpp \
+	processor/EventLog.h \
+	processor/Handlers.h \
+	processor/ServerThread.h
+
+processor_test_LDADD = libprocessortest.la \
+                       $(top_builddir)/lib/cpp/libthrift.la \
+                       $(top_builddir)/lib/cpp/libthriftnb.la \
+                       $(BOOST_LDFLAGS) \
+                       -levent \
+                       $(BOOST_ROOT_PATH)/lib/libboost_unit_test_framework.a
 #
 # Common thrift code generation rules
 #
@@ -181,6 +213,9 @@
 gen-cpp/SecondService.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/ThriftTest.cpp gen-cpp/ThriftTest_types.cpp gen-cpp/ThriftTest_types.h: $(top_srcdir)/test/ThriftTest.thrift
 	$(THRIFT) --gen cpp:dense $<
 
+gen-cpp/ChildService.cpp: processor/proc.thrift
+	$(THRIFT) --gen cpp:templates,cob_style $<
+
 INCLUDES = \
 	-I$(top_srcdir)/lib/cpp/src
 
diff --git a/lib/cpp/test/concurrency/Tests.cpp b/lib/cpp/test/concurrency/Tests.cpp
new file mode 100644
index 0000000..c80bb88
--- /dev/null
+++ b/lib/cpp/test/concurrency/Tests.cpp
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+#include <vector>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+#include "ThreadManagerTests.h"
+
+int main(int argc, char** argv) {
+
+  std::string arg;
+
+  std::vector<std::string>  args(argc - 1 > 1 ? argc - 1 : 1);
+
+  args[0] = "all";
+
+  for (int ix = 1; ix < argc; ix++) {
+    args[ix - 1] = std::string(argv[ix]);
+  }
+
+  bool runAll = args[0].compare("all") == 0;
+
+  if (runAll || args[0].compare("thread-factory") == 0) {
+
+    ThreadFactoryTests threadFactoryTests;
+
+    std::cout << "ThreadFactory tests..." << std::endl;
+
+    size_t count =  1000;
+    size_t floodLoops =  1;
+    size_t floodCount =  100000;
+
+    std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
+
+    assert(threadFactoryTests.reapNThreads(count));
+
+    std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+    assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
+    std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
+
+    assert(threadFactoryTests.synchStartTest());
+
+    std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
+
+    assert(threadFactoryTests.monitorTimeoutTest());
+  }
+
+  if (runAll || args[0].compare("util") == 0) {
+
+    std::cout << "Util tests..." << std::endl;
+
+    std::cout << "\t\tUtil minimum time" << std::endl;
+
+    int64_t time00 = Util::currentTime();
+    int64_t time01 = Util::currentTime();
+
+    std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
+
+    time00 = Util::currentTime();
+    time01 = time00;
+    size_t count = 0;
+
+    while (time01 < time00 + 10) {
+      count++;
+      time01 = Util::currentTime();
+    }
+
+    std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
+  }
+
+
+  if (runAll || args[0].compare("timer-manager") == 0) {
+
+    std::cout << "TimerManager tests..." << std::endl;
+
+    std::cout << "\t\tTimerManager test00" << std::endl;
+
+    TimerManagerTests timerManagerTests;
+
+    assert(timerManagerTests.test00());
+  }
+
+  if (runAll || args[0].compare("thread-manager") == 0) {
+
+    std::cout << "ThreadManager tests..." << std::endl;
+
+    {
+
+      size_t workerCount = 100;
+
+      size_t taskCount = 100000;
+
+      int64_t delay = 10LL;
+
+      std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+      ThreadManagerTests threadManagerTests;
+
+      assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+
+      std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl;
+
+      assert(threadManagerTests.blockTest(delay, workerCount));
+
+    }
+  }
+
+  if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
+
+    std::cout << "ThreadManager benchmark tests..." << std::endl;
+
+    {
+
+      size_t minWorkerCount = 2;
+
+      size_t maxWorkerCount = 512;
+
+      size_t tasksPerWorker = 1000;
+
+      int64_t delay = 10LL;
+
+      for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
+
+        size_t taskCount = workerCount * tasksPerWorker;
+
+        std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+        ThreadManagerTests threadManagerTests;
+
+        threadManagerTests.loadTest(taskCount, delay, workerCount);
+      }
+    }
+  }
+}
diff --git a/lib/cpp/test/concurrency/ThreadFactoryTests.h b/lib/cpp/test/concurrency/ThreadFactoryTests.h
new file mode 100644
index 0000000..b7e873f
--- /dev/null
+++ b/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Util.h>
+
+#include <assert.h>
+#include <unistd.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using boost::shared_ptr;
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadFactoryTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task() {}
+
+    void run() {
+      std::cout << "\t\t\tHello World" << std::endl;
+    }
+  };
+
+  /**
+   * Hello world test
+   */
+  bool helloWorldTest() {
+
+    PlatformThreadFactory threadFactory = PlatformThreadFactory();
+
+    shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
+
+    shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+    thread->start();
+
+    thread->join();
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  /**
+   * Reap N threads
+   */
+  class ReapNTask: public Runnable {
+
+   public:
+
+    ReapNTask(Monitor& monitor, int& activeCount) :
+      _monitor(monitor),
+      _count(activeCount) {}
+
+    void run() {
+      Synchronized s(_monitor);
+
+      _count--;
+
+      //std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+      if (_count == 0) {
+        _monitor.notify();
+      }
+    }
+
+    Monitor& _monitor;
+
+    int& _count;
+  };
+
+  bool reapNThreads(int loop=1, int count=10) {
+
+    PlatformThreadFactory threadFactory =  PlatformThreadFactory();
+
+    Monitor* monitor = new Monitor();
+
+    for(int lix = 0; lix < loop; lix++) {
+
+      int* activeCount  = new int(count);
+
+      std::set<shared_ptr<Thread> > threads;
+
+      int tix;
+
+      for (tix = 0; tix < count; tix++) {
+        try {
+          threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+
+      tix = 0;
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
+
+        try {
+          (*thread)->start();
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+
+      {
+        Synchronized s(*monitor);
+        while (*activeCount > 0) {
+          monitor->wait(1000);
+        }
+      }
+      delete activeCount;
+      std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
+    }
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  class SynchStartTask: public Runnable {
+
+   public:
+
+    enum STATE {
+      UNINITIALIZED,
+      STARTING,
+      STARTED,
+      STOPPING,
+      STOPPED
+    };
+
+    SynchStartTask(Monitor& monitor, volatile  STATE& state) :
+      _monitor(monitor),
+      _state(state) {}
+
+    void run() {
+      {
+        Synchronized s(_monitor);
+        if (_state == SynchStartTask::STARTING) {
+          _state = SynchStartTask::STARTED;
+          _monitor.notify();
+        }
+      }
+
+      {
+        Synchronized s(_monitor);
+        while (_state == SynchStartTask::STARTED) {
+          _monitor.wait();
+        }
+
+        if (_state == SynchStartTask::STOPPING) {
+          _state = SynchStartTask::STOPPED;
+          _monitor.notifyAll();
+        }
+      }
+    }
+
+   private:
+    Monitor& _monitor;
+    volatile  STATE& _state;
+  };
+
+  bool synchStartTest() {
+
+    Monitor monitor;
+
+    SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+
+    shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
+
+    PlatformThreadFactory threadFactory =  PlatformThreadFactory();
+
+    shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+    if (state == SynchStartTask::UNINITIALIZED) {
+
+      state = SynchStartTask::STARTING;
+
+      thread->start();
+    }
+
+    {
+      Synchronized s(monitor);
+      while (state == SynchStartTask::STARTING) {
+        monitor.wait();
+      }
+    }
+
+    assert(state != SynchStartTask::STARTING);
+
+    {
+      Synchronized s(monitor);
+
+      try {
+          monitor.wait(100);
+      } catch(TimedOutException& e) {
+      }
+
+      if (state == SynchStartTask::STARTED) {
+
+        state = SynchStartTask::STOPPING;
+
+        monitor.notify();
+      }
+
+      while (state == SynchStartTask::STOPPING) {
+        monitor.wait();
+      }
+    }
+
+    assert(state == SynchStartTask::STOPPED);
+
+    bool success = true;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+
+    return true;
+  }
+
+  /** See how accurate monitor timeout is. */
+
+  bool monitorTimeoutTest(size_t count=1000, int64_t timeout=10) {
+
+    Monitor monitor;
+
+    int64_t startTime = Util::currentTime();
+
+    for (size_t ix = 0; ix < count; ix++) {
+      {
+        Synchronized s(monitor);
+        try {
+            monitor.wait(timeout);
+        } catch(TimedOutException& e) {
+        }
+      }
+    }
+
+    int64_t endTime = Util::currentTime();
+
+    double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
+
+    if (error < 0.0)  {
+
+      error *= 1.0;
+    }
+
+    bool success = error < ThreadFactoryTests::ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+
+  class FloodTask : public Runnable {
+  public:
+
+    FloodTask(const size_t id) :_id(id) {}
+    ~FloodTask(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " done" << std::endl;
+      }
+    }
+
+    void run(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " started" << std::endl;
+      }
+
+      usleep(1);
+    }
+    const size_t _id;
+  };
+
+  void foo(PlatformThreadFactory *tf) {
+    (void) tf;
+  }
+
+  bool floodNTest(size_t loop=1, size_t count=100000) {
+
+    bool success = false;
+
+    for(size_t lix = 0; lix < loop; lix++) {
+
+      PlatformThreadFactory threadFactory = PlatformThreadFactory();
+      threadFactory.setDetached(true);
+
+        for(size_t tix = 0; tix < count; tix++) {
+
+          try {
+
+            shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+            shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+            thread->start();
+
+            usleep(1);
+
+          } catch (TException& e) {
+
+            std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+
+            return success;
+          }
+        }
+
+        std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+        success = true;
+    }
+
+    return success;
+  }
+};
+
+const double ThreadFactoryTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency::test
+
diff --git a/lib/cpp/test/concurrency/ThreadManagerTests.h b/lib/cpp/test/concurrency/ThreadManagerTests.h
new file mode 100644
index 0000000..b734f7a
--- /dev/null
+++ b/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Util.h>
+
+#include <assert.h>
+#include <set>
+#include <iostream>
+#include <set>
+#include <stdint.h>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadManagerTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task(Monitor& monitor, size_t& count, int64_t timeout) :
+      _monitor(monitor),
+      _count(count),
+      _timeout(timeout),
+      _done(false) {}
+
+    void run() {
+
+      _startTime = Util::currentTime();
+
+      {
+        Synchronized s(_sleep);
+
+        try {
+          _sleep.wait(_timeout);
+        } catch(TimedOutException& e) {
+          ;
+        }catch(...) {
+          assert(0);
+        }
+      }
+
+      _endTime = Util::currentTime();
+
+      _done = true;
+
+      {
+        Synchronized s(_monitor);
+
+        // std::cout << "Thread " << _count << " completed " << std::endl;
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    size_t& _count;
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    bool _done;
+    Monitor _sleep;
+  };
+
+  /**
+   * Dispatch count tasks, each of which blocks for timeout milliseconds then
+   * completes. Verify that all tasks completed and that thread manager cleans
+   * up properly on delete.
+   */
+  bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
+
+    Monitor monitor;
+
+    size_t activeCount = count;
+
+    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+    shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
+
+#ifndef USE_BOOST_THREAD
+    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+#endif
+    threadManager->threadFactory(threadFactory);
+
+    threadManager->start();
+
+    std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
+
+    for (size_t ix = 0; ix < count; ix++) {
+
+      tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
+    }
+
+    int64_t time00 = Util::currentTime();
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+        threadManager->add(*ix);
+    }
+
+    {
+      Synchronized s(monitor);
+
+      while(activeCount > 0) {
+
+        monitor.wait();
+      }
+    }
+
+    int64_t time01 = Util::currentTime();
+
+    int64_t firstTime = 9223372036854775807LL;
+    int64_t lastTime = 0;
+
+    double averageTime = 0;
+    int64_t minTime = 9223372036854775807LL;
+    int64_t maxTime = 0;
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+      shared_ptr<ThreadManagerTests::Task> task = *ix;
+
+      int64_t delta = task->_endTime - task->_startTime;
+
+      assert(delta > 0);
+
+      if (task->_startTime < firstTime) {
+        firstTime = task->_startTime;
+      }
+
+      if (task->_endTime > lastTime) {
+        lastTime = task->_endTime;
+      }
+
+      if (delta < minTime) {
+        minTime = delta;
+      }
+
+      if (delta > maxTime) {
+        maxTime = delta;
+      }
+
+      averageTime+= delta;
+    }
+
+    averageTime /= count;
+
+    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
+
+    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+    double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+    if (error < 0) {
+      error*= -1.0;
+    }
+
+    bool success = error < ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+  class BlockTask: public Runnable {
+
+  public:
+
+    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+      _monitor(monitor),
+      _bmonitor(bmonitor),
+      _count(count) {}
+
+    void run() {
+      {
+        Synchronized s(_bmonitor);
+
+        _bmonitor.wait();
+
+      }
+
+      {
+        Synchronized s(_monitor);
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    Monitor& _bmonitor;
+    size_t& _count;
+  };
+
+  /**
+   * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
+   * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */
+
+  bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
+    (void) timeout;
+    bool success = false;
+
+    try {
+
+      Monitor bmonitor;
+      Monitor monitor;
+
+      size_t pendingTaskMaxCount = workerCount;
+
+      size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+      shared_ptr<PlatformThreadFactory> threadFactory = shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());
+
+#ifndef USE_BOOST_THREAD
+      threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+#endif
+      threadManager->threadFactory(threadFactory);
+
+      threadManager->start();
+
+      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+      for (size_t ix = 0; ix < workerCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+      }
+
+      for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+      }
+
+      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+        threadManager->add(*ix);
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+        throw TException("Unexpected pending task count");
+      }
+
+      shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+      try {
+        threadManager->add(extraTask, 1);
+        throw TException("Unexpected success adding task in excess of pending task count");
+      } catch(TooManyPendingTasksException& e) {
+        throw TException("Should have timed out adding task in excess of pending task count");
+      } catch(TimedOutException& e) {
+        // Expected result
+      }
+
+      try {
+        threadManager->add(extraTask, -1);
+        throw TException("Unexpected success adding task in excess of pending task count");
+      } catch(TimedOutException& e) {
+        throw TException("Unexpected timeout adding task in excess of pending task count");
+      } catch(TooManyPendingTasksException& e) {
+        // Expected result
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount()  << std::endl;
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[0] != 0) {
+          monitor.wait();
+        }
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+      try {
+        threadManager->add(extraTask, 1);
+      } catch(TimedOutException& e) {
+        std::cout << "\t\t\t" << "add timed out unexpectedly"  << std::endl;
+        throw TException("Unexpected timeout adding task");
+
+      } catch(TooManyPendingTasksException& e) {
+        std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+        throw TException("Unexpected timeout adding task");
+      }
+
+      // Wake up tasks that were pending before and wait for them to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[1] != 0) {
+          monitor.wait();
+        }
+      }
+
+      // Wake up the extra task and wait for it to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[2] != 0) {
+          monitor.wait();
+        }
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == 0))) {
+        throw TException("Unexpected pending task count");
+      }
+
+    } catch(TException& e) {
+      std::cout << "ERROR: " << e.what() << std::endl;
+    }
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+    return success;
+ }
+};
+
+const double ThreadManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+
+using namespace apache::thrift::concurrency::test;
+
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
new file mode 100644
index 0000000..4fe9667
--- /dev/null
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/concurrency/TimerManager.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class TimerManagerTests {
+
+ public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+   public:
+
+    Task(Monitor& monitor, int64_t timeout) :
+      _timeout(timeout),
+      _startTime(Util::currentTime()),
+      _monitor(monitor),
+      _success(false),
+      _done(false) {}
+
+    ~Task() { std::cerr << this << std::endl; }
+
+    void run() {
+
+      _endTime = Util::currentTime();
+
+      // Figure out error percentage
+
+      int64_t delta = _endTime - _startTime;
+
+
+      delta = delta > _timeout ?  delta - _timeout : _timeout - delta;
+
+      float error = delta / _timeout;
+
+      if(error < ERROR) {
+        _success = true;
+      }
+
+      _done = true;
+
+      std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug
+
+      {Synchronized s(_monitor);
+        _monitor.notifyAll();
+      }
+    }
+
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    Monitor& _monitor;
+    bool _success;
+    bool _done;
+  };
+
+  /**
+   * This test creates two tasks and waits for the first to expire within 10%
+   * of the expected expiration time. It then verifies that the timer manager
+   * properly clean up itself and the remaining orphaned timeout task when the
+   * manager goes out of scope and its destructor is called.
+   */
+  bool test00(int64_t timeout=1000LL) {
+
+    shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
+
+    {
+
+      TimerManager timerManager;
+
+      timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
+
+      timerManager.start();
+
+      assert(timerManager.state() == TimerManager::STARTED);
+
+      // Don't create task yet, because its constructor sets the expected completion time, and we
+      // need to delay between inserting the two tasks into the run queue.
+      shared_ptr<TimerManagerTests::Task> task;
+
+      {
+        Synchronized s(_monitor);
+
+        timerManager.add(orphanTask, 10 * timeout);
+
+        try {
+          // Wait for 1 second in order to give timerManager a chance to start sleeping in response
+          // to adding orphanTask. We need to do this so we can verify that adding the second task
+          // kicks the dispatcher out of the current wait and starts the new 1 second wait.
+          _monitor.wait (1000);
+          assert (0 == "ERROR: This wait should time out. TimerManager dispatcher may have a problem.");
+        } catch (TimedOutException &ex) {
+        }
+
+        task.reset (new TimerManagerTests::Task(_monitor, timeout));
+
+        timerManager.add(task, timeout);
+
+        _monitor.wait();
+      }
+
+      assert(task->_done);
+
+
+      std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+    }
+
+    // timerManager.stop(); This is where it happens via destructor
+
+    assert(!orphanTask->_done);
+
+    return true;
+  }
+
+  friend class TestTask;
+
+  Monitor _monitor;
+};
+
+const double TimerManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/test/processor/EventLog.cpp b/lib/cpp/test/processor/EventLog.cpp
new file mode 100755
index 0000000..0ac3028
--- /dev/null
+++ b/lib/cpp/test/processor/EventLog.cpp
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "EventLog.h"
+
+#include <stdarg.h>
+
+using namespace std;
+using namespace apache::thrift::concurrency;
+
+namespace {
+
+void debug(const char* fmt, ...) {
+  // Comment out this return to enable debug logs from the test code.
+  return;
+
+  va_list ap;
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+
+  fprintf(stderr, "\n");
+}
+
+}
+
+namespace apache { namespace thrift { namespace test {
+
+uint32_t EventLog::nextId_ = 0;
+
+#define EVENT_TYPE(value) EventType EventLog::value = #value
+EVENT_TYPE(ET_LOG_END);
+EVENT_TYPE(ET_CONN_CREATED);
+EVENT_TYPE(ET_CONN_DESTROYED);
+EVENT_TYPE(ET_CALL_STARTED);
+EVENT_TYPE(ET_CALL_FINISHED);
+EVENT_TYPE(ET_PROCESS);
+EVENT_TYPE(ET_PRE_READ);
+EVENT_TYPE(ET_POST_READ);
+EVENT_TYPE(ET_PRE_WRITE);
+EVENT_TYPE(ET_POST_WRITE);
+EVENT_TYPE(ET_ASYNC_COMPLETE);
+EVENT_TYPE(ET_HANDLER_ERROR);
+
+EVENT_TYPE(ET_CALL_INCREMENT_GENERATION);
+EVENT_TYPE(ET_CALL_GET_GENERATION);
+EVENT_TYPE(ET_CALL_ADD_STRING);
+EVENT_TYPE(ET_CALL_GET_STRINGS);
+EVENT_TYPE(ET_CALL_GET_DATA_WAIT);
+EVENT_TYPE(ET_CALL_ONEWAY_WAIT);
+EVENT_TYPE(ET_CALL_EXCEPTION_WAIT);
+EVENT_TYPE(ET_CALL_UNEXPECTED_EXCEPTION_WAIT);
+EVENT_TYPE(ET_CALL_SET_VALUE);
+EVENT_TYPE(ET_CALL_GET_VALUE);
+EVENT_TYPE(ET_WAIT_RETURN);
+
+EventLog::EventLog() {
+  id_ = nextId_++;
+  debug("New log: %d", id_);
+}
+
+void EventLog::append(EventType type, uint32_t connectionId, uint32_t callId,
+                      const string& message) {
+  Synchronized s(monitor_);
+  debug("%d <-- %u, %u, %s \"%s\"", id_, connectionId, callId, type,
+        message.c_str());
+
+  Event e(type, connectionId, callId, message);
+  events_.push_back(e);
+
+  monitor_.notify();
+}
+
+Event EventLog::waitForEvent(int64_t timeout) {
+  Synchronized s(monitor_);
+
+  try {
+    while (events_.empty()) {
+      monitor_.wait(timeout);
+    }
+  } catch (TimedOutException ex) {
+    return Event(ET_LOG_END, 0, 0, "");
+  }
+
+  Event event = events_.front();
+  events_.pop_front();
+  return event;
+}
+
+Event EventLog::waitForConnEvent(uint32_t connId, int64_t timeout) {
+  Synchronized s(monitor_);
+
+  EventList::iterator it = events_.begin();
+  while (true) {
+    try {
+      // TODO: it would be nicer to honor timeout for the duration of this
+      // call, rather than restarting it for each call to wait().  It shouldn't
+      // be a big problem in practice, though.
+      while (it == events_.end()) {
+        monitor_.wait(timeout);
+      }
+    } catch (TimedOutException ex) {
+      return Event(ET_LOG_END, 0, 0, "");
+    }
+
+    if (it->connectionId == connId) {
+      Event event = *it;
+      events_.erase(it);
+      return event;
+    }
+  }
+}
+
+}}} // apache::thrift::test
diff --git a/lib/cpp/test/processor/EventLog.h b/lib/cpp/test/processor/EventLog.h
new file mode 100755
index 0000000..d731cec
--- /dev/null
+++ b/lib/cpp/test/processor/EventLog.h
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef _THRIFT_TEST_EVENTLOG_H_
+#define _THRIFT_TEST_EVENTLOG_H_ 1
+
+#include <thrift/concurrency/Monitor.h>
+
+namespace apache { namespace thrift { namespace test {
+
+// Initially I made EventType an enum, but using char* results
+// in much more readable error messages when there is a mismatch.
+// It also lets users of EventLog easily define their own new types.
+// Comparing the literal pointer values should be safe, barring any strange
+// linking setup that results in duplicate symbols.
+typedef const char* EventType;
+
+struct Event {
+  Event(EventType type, uint32_t connectionId, uint32_t callId,
+        const std::string& message) :
+      type(type),
+      connectionId(connectionId),
+      callId(callId),
+      message(message) {}
+
+  EventType type;
+  uint32_t  connectionId;
+  uint32_t  callId;
+  std::string    message;
+};
+
+class EventLog {
+ public:
+  static EventType ET_LOG_END;
+  static EventType ET_CONN_CREATED;
+  static EventType ET_CONN_DESTROYED;
+  static EventType ET_CALL_STARTED;
+  static EventType ET_CALL_FINISHED;
+  static EventType ET_PROCESS;
+  static EventType ET_PRE_READ;
+  static EventType ET_POST_READ;
+  static EventType ET_PRE_WRITE;
+  static EventType ET_POST_WRITE;
+  static EventType ET_ASYNC_COMPLETE;
+  static EventType ET_HANDLER_ERROR;
+
+  static EventType ET_CALL_INCREMENT_GENERATION;
+  static EventType ET_CALL_GET_GENERATION;
+  static EventType ET_CALL_ADD_STRING;
+  static EventType ET_CALL_GET_STRINGS;
+  static EventType ET_CALL_GET_DATA_WAIT;
+  static EventType ET_CALL_ONEWAY_WAIT;
+  static EventType ET_CALL_UNEXPECTED_EXCEPTION_WAIT;
+  static EventType ET_CALL_EXCEPTION_WAIT;
+  static EventType ET_WAIT_RETURN;
+  static EventType ET_CALL_SET_VALUE;
+  static EventType ET_CALL_GET_VALUE;
+
+  EventLog();
+
+  void append(EventType type, uint32_t connectionId, uint32_t callId,
+              const std::string& message = "");
+
+  Event waitForEvent(int64_t timeout = 500);
+  Event waitForConnEvent(uint32_t connId, int64_t timeout = 500);
+
+ protected:
+  typedef std::list<Event> EventList;
+
+  concurrency::Monitor monitor_;
+  EventList events_;
+  uint32_t id_;
+
+  static uint32_t nextId_;
+};
+
+}}} // apache::thrift::test
+
+#endif // _THRIFT_TEST_EVENTLOG_H_
diff --git a/lib/cpp/test/processor/Handlers.h b/lib/cpp/test/processor/Handlers.h
new file mode 100755
index 0000000..2be262a
--- /dev/null
+++ b/lib/cpp/test/processor/Handlers.h
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef _THRIFT_PROCESSOR_TEST_HANDLERS_H_
+#define _THRIFT_PROCESSOR_TEST_HANDLERS_H_ 1
+
+#include "EventLog.h"
+#include "gen-cpp/ParentService.h"
+#include "gen-cpp/ChildService.h"
+
+namespace apache { namespace thrift { namespace test {
+
+class ParentHandler : virtual public ParentServiceIf {
+ public:
+  ParentHandler(const boost::shared_ptr<EventLog>& log) :
+      triggerMonitor(&mutex_),
+      generation_(0),
+      wait_(false),
+      log_(log) { }
+
+  int32_t incrementGeneration() {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_INCREMENT_GENERATION, 0, 0);
+    return ++generation_;
+  }
+
+  int32_t getGeneration() {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_GET_GENERATION, 0, 0);
+    return generation_;
+  }
+
+  void addString(const std::string& s) {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_ADD_STRING, 0, 0);
+    strings_.push_back(s);
+  }
+
+  void getStrings(std::vector<std::string>& _return) {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_GET_STRINGS, 0, 0);
+    _return = strings_;
+  }
+
+  void getDataWait(std::string& _return, int32_t length) {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_GET_DATA_WAIT, 0, 0);
+
+    blockUntilTriggered();
+
+    _return.append(length, 'a');
+  }
+
+  void onewayWait() {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_ONEWAY_WAIT, 0, 0);
+
+    blockUntilTriggered();
+  }
+
+  void exceptionWait(const std::string& message) {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_EXCEPTION_WAIT, 0, 0);
+
+    blockUntilTriggered();
+
+    MyError e;
+    e.message = message;
+    throw e;
+  }
+
+  void unexpectedExceptionWait(const std::string& message) {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, 0, 0);
+
+    blockUntilTriggered();
+
+    MyError e;
+    e.message = message;
+    throw e;
+  }
+
+  /**
+   * After prepareTriggeredCall() is invoked, calls to any of the *Wait()
+   * functions won't return until triggerPendingCalls() is invoked
+   *
+   * This has to be a separate function invoked by the main test thread
+   * in order to to avoid race conditions.
+   */
+  void prepareTriggeredCall() {
+    concurrency::Guard g(mutex_);
+    wait_ = true;
+  }
+
+  /**
+   * Wake up all calls waiting in blockUntilTriggered()
+   */
+  void triggerPendingCalls() {
+    concurrency::Guard g(mutex_);
+    wait_ = false;
+    triggerMonitor.notifyAll();
+  }
+
+ protected:
+  /**
+   * blockUntilTriggered() won't return until triggerPendingCalls() is invoked
+   * in another thread.
+   *
+   * This should only be called when already holding mutex_.
+   */
+  void blockUntilTriggered() {
+    while (wait_) {
+      triggerMonitor.waitForever();
+    }
+
+    // Log an event when we return
+    log_->append(EventLog::ET_WAIT_RETURN, 0, 0);
+  }
+
+  concurrency::Mutex mutex_;
+  concurrency::Monitor triggerMonitor;
+  int32_t generation_;
+  bool wait_;
+  std::vector<std::string> strings_;
+  boost::shared_ptr<EventLog> log_;
+};
+
+class ChildHandler : public ParentHandler, virtual public ChildServiceIf {
+ public:
+  ChildHandler(const boost::shared_ptr<EventLog>& log) :
+      ParentHandler(log),
+      value_(0) {}
+
+  int32_t setValue(int32_t value) {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_SET_VALUE, 0, 0);
+
+    int32_t oldValue = value_;
+    value_ = value;
+    return oldValue;
+  }
+
+  int32_t getValue() {
+    concurrency::Guard g(mutex_);
+    log_->append(EventLog::ET_CALL_GET_VALUE, 0, 0);
+
+    return value_;
+  }
+
+ protected:
+  int32_t value_;
+};
+
+struct ConnContext {
+ public:
+  ConnContext(boost::shared_ptr<protocol::TProtocol> in,
+              boost::shared_ptr<protocol::TProtocol> out,
+              uint32_t id) :
+      input(in),
+      output(out),
+      id(id) {}
+
+  boost::shared_ptr<protocol::TProtocol> input;
+  boost::shared_ptr<protocol::TProtocol> output;
+  uint32_t id;
+};
+
+struct CallContext {
+ public:
+  CallContext(ConnContext *context, uint32_t id, const std::string& name) :
+      connContext(context),
+      name(name),
+      id(id) {}
+
+  ConnContext *connContext;
+  std::string name;
+  uint32_t id;
+};
+
+class ServerEventHandler : public server::TServerEventHandler {
+ public:
+  ServerEventHandler(const boost::shared_ptr<EventLog>& log) :
+      nextId_(1),
+      log_(log) {}
+
+  virtual void preServe() {}
+
+  virtual void* createContext(boost::shared_ptr<protocol::TProtocol> input,
+                              boost::shared_ptr<protocol::TProtocol> output) {
+    ConnContext* context = new ConnContext(input, output, nextId_);
+    ++nextId_;
+    log_->append(EventLog::ET_CONN_CREATED, context->id, 0);
+    return context;
+  }
+
+  virtual void deleteContext(void* serverContext,
+                             boost::shared_ptr<protocol::TProtocol>input,
+                             boost::shared_ptr<protocol::TProtocol>output) {
+    ConnContext* context = reinterpret_cast<ConnContext*>(serverContext);
+
+    if (input != context->input) {
+      abort();
+    }
+    if (output != context->output) {
+      abort();
+    }
+
+    log_->append(EventLog::ET_CONN_DESTROYED, context->id, 0);
+
+    delete context;
+  }
+
+  virtual void processContext(
+      void* serverContext,
+      boost::shared_ptr<transport::TTransport> transport) {
+    // TODO: We currently don't test the behavior of the processContext()
+    // calls.  The various server implementations call processContext() at
+    // slightly different times, and it is too annoying to try and account for
+    // their various differences.
+    //
+    // TThreadedServer, TThreadPoolServer, and TSimpleServer usually wait until
+    // they see the first byte of a request before calling processContext().
+    // However, they don't wait for the first byte of the very first request,
+    // and instead immediately call processContext() before any data is
+    // received.
+    //
+    // TNonblockingServer always waits until receiving the full request before
+    // calling processContext().
+#if 0
+    ConnContext* context = reinterpret_cast<ConnContext*>(serverContext);
+    log_->append(EventLog::ET_PROCESS, context->id, 0);
+#endif
+  }
+
+ protected:
+  uint32_t nextId_;
+  boost::shared_ptr<EventLog> log_;
+};
+
+class ProcessorEventHandler : public TProcessorEventHandler {
+ public:
+  ProcessorEventHandler(const boost::shared_ptr<EventLog>& log) :
+      nextId_(1),
+      log_(log) {}
+
+  void* getContext(const char* fnName, void* serverContext) {
+    ConnContext* connContext = reinterpret_cast<ConnContext*>(serverContext);
+
+    CallContext* context = new CallContext(connContext, nextId_, fnName);
+    ++nextId_;
+
+    log_->append(EventLog::ET_CALL_STARTED, connContext->id, context->id,
+                 fnName);
+    return context;
+  }
+
+  void freeContext(void* ctx, const char* fnName) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_CALL_FINISHED, context->connContext->id,
+                 context->id, fnName);
+    delete context;
+  }
+
+  void preRead(void* ctx, const char* fnName) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_PRE_READ, context->connContext->id, context->id,
+                 fnName);
+  }
+
+  void postRead(void* ctx, const char* fnName, uint32_t bytes) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_POST_READ, context->connContext->id, context->id,
+                 fnName);
+  }
+
+  void preWrite(void* ctx, const char* fnName) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_PRE_WRITE, context->connContext->id, context->id,
+                 fnName);
+  }
+
+  void postWrite(void* ctx, const char* fnName, uint32_t bytes) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_POST_WRITE, context->connContext->id,
+                 context->id, fnName);
+  }
+
+  void asyncComplete(void* ctx, const char* fnName) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_ASYNC_COMPLETE, context->connContext->id,
+                 context->id, fnName);
+  }
+
+  void handlerError(void* ctx, const char* fnName) {
+    CallContext* context = reinterpret_cast<CallContext*>(ctx);
+    checkName(context, fnName);
+    log_->append(EventLog::ET_HANDLER_ERROR, context->connContext->id,
+                 context->id, fnName);
+  }
+
+ protected:
+  void checkName(const CallContext* context, const char* fnName) {
+    // Note: we can't use BOOST_CHECK_EQUAL here, since the handler runs in a
+    // different thread from the test functions.  Just abort if the names are
+    // different
+    if (context->name != fnName) {
+      fprintf(stderr, "call context name mismatch: \"%s\" != \"%s\"\n",
+              context->name.c_str(), fnName);
+      fflush(stderr);
+      abort();
+    }
+  }
+
+  uint32_t nextId_;
+  boost::shared_ptr<EventLog> log_;
+};
+
+}}} // apache::thrift::test
+
+#endif // _THRIFT_PROCESSOR_TEST_HANDLERS_H_
diff --git a/lib/cpp/test/processor/ProcessorTest.cpp b/lib/cpp/test/processor/ProcessorTest.cpp
new file mode 100755
index 0000000..58b82fb
--- /dev/null
+++ b/lib/cpp/test/processor/ProcessorTest.cpp
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * This file contains tests that ensure TProcessorEventHandler and
+ * TServerEventHandler are invoked properly by the various server
+ * implementations.
+ */
+
+#include <tr1/functional>
+#include <boost/test/unit_test.hpp>
+
+#include <thrift/concurrency/PosixThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/server/TThreadedServer.h>
+#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/server/TNonblockingServer.h>
+#include <thrift/server/TSimpleServer.h>
+#include <thrift/transport/TSocket.h>
+
+#include "EventLog.h"
+#include "ServerThread.h"
+#include "Handlers.h"
+#include "gen-cpp/ChildService.h"
+
+using namespace std;
+using namespace boost;
+using namespace apache::thrift;
+using namespace apache::thrift::concurrency;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::server;
+using namespace apache::thrift::transport;
+
+using namespace apache::thrift::test;
+
+/*
+ * Traits classes that encapsulate how to create various types of servers.
+ */
+
+class TSimpleServerTraits {
+ public:
+  typedef TSimpleServer ServerType;
+
+  shared_ptr<TSimpleServer> createServer(
+      const shared_ptr<TProcessor>& processor,
+      uint16_t port,
+      const shared_ptr<TTransportFactory>& transportFactory,
+      const shared_ptr<TProtocolFactory>& protocolFactory) {
+    shared_ptr<TServerSocket> socket(new TServerSocket(port));
+    return shared_ptr<TSimpleServer>(new TSimpleServer(
+          processor, socket, transportFactory, protocolFactory));
+  }
+};
+
+class TThreadedServerTraits {
+ public:
+  typedef TThreadedServer ServerType;
+
+  shared_ptr<TThreadedServer> createServer(
+      const shared_ptr<TProcessor>& processor,
+      uint16_t port,
+      const shared_ptr<TTransportFactory>& transportFactory,
+      const shared_ptr<TProtocolFactory>& protocolFactory) {
+    shared_ptr<TServerSocket> socket(new TServerSocket(port));
+    return shared_ptr<TThreadedServer>(new TThreadedServer(
+          processor, socket, transportFactory, protocolFactory));
+  }
+};
+
+class TThreadPoolServerTraits {
+ public:
+  typedef TThreadPoolServer ServerType;
+
+  shared_ptr<TThreadPoolServer> createServer(
+      const shared_ptr<TProcessor>& processor,
+      uint16_t port,
+      const shared_ptr<TTransportFactory>& transportFactory,
+      const shared_ptr<TProtocolFactory>& protocolFactory) {
+    shared_ptr<TServerSocket> socket(new TServerSocket(port));
+
+    shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory);
+    shared_ptr<ThreadManager> threadManager =
+      ThreadManager::newSimpleThreadManager(8);
+    threadManager->threadFactory(threadFactory);
+    threadManager->start();
+
+    return shared_ptr<TThreadPoolServer>(new TThreadPoolServer(
+          processor, socket, transportFactory, protocolFactory,
+          threadManager));
+  }
+};
+
+class TNonblockingServerTraits {
+ public:
+  typedef TNonblockingServer ServerType;
+
+  shared_ptr<TNonblockingServer> createServer(
+      const shared_ptr<TProcessor>& processor,
+      uint16_t port,
+      const shared_ptr<TTransportFactory>& transportFactory,
+      const shared_ptr<TProtocolFactory>& protocolFactory) {
+    // TNonblockingServer automatically uses TFramedTransport.
+    // Raise an exception if the supplied transport factory is not a
+    // TFramedTransportFactory
+    TFramedTransportFactory* framedFactory =
+      dynamic_cast<TFramedTransportFactory*>(transportFactory.get());
+    if (framedFactory == NULL) {
+      throw TException("TNonblockingServer must use TFramedTransport");
+    }
+
+    shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory);
+    shared_ptr<ThreadManager> threadManager =
+      ThreadManager::newSimpleThreadManager(8);
+    threadManager->threadFactory(threadFactory);
+    threadManager->start();
+
+    return shared_ptr<TNonblockingServer>(new TNonblockingServer(
+          processor, protocolFactory, port, threadManager));
+  }
+};
+
+class TNonblockingServerNoThreadsTraits {
+ public:
+  typedef TNonblockingServer ServerType;
+
+  shared_ptr<TNonblockingServer> createServer(
+      const shared_ptr<TProcessor>& processor,
+      uint16_t port,
+      const shared_ptr<TTransportFactory>& transportFactory,
+      const shared_ptr<TProtocolFactory>& protocolFactory) {
+    // TNonblockingServer automatically uses TFramedTransport.
+    // Raise an exception if the supplied transport factory is not a
+    // TFramedTransportFactory
+    TFramedTransportFactory* framedFactory =
+      dynamic_cast<TFramedTransportFactory*>(transportFactory.get());
+    if (framedFactory == NULL) {
+      throw TException("TNonblockingServer must use TFramedTransport");
+    }
+
+    // Use a NULL ThreadManager
+    shared_ptr<ThreadManager> threadManager;
+    return shared_ptr<TNonblockingServer>(new TNonblockingServer(
+          processor, protocolFactory, port, threadManager));
+  }
+};
+
+/*
+ * Traits classes for controlling if we instantiate templated or generic
+ * protocol factories, processors, clients, etc.
+ *
+ * The goal is to allow the outer test code to select which server type is
+ * being tested, and whether or not we are testing the templated classes, or
+ * the generic classes.
+ *
+ * Each specific test case can control whether we create a child or parent
+ * server, and whether we use TFramedTransport or TBufferedTransport.
+ */
+
+class UntemplatedTraits {
+ public:
+  typedef TBinaryProtocolFactory ProtocolFactory;
+  typedef TBinaryProtocol Protocol;
+
+  typedef ParentServiceProcessor ParentProcessor;
+  typedef ChildServiceProcessor ChildProcessor;
+  typedef ParentServiceClient ParentClient;
+  typedef ChildServiceClient ChildClient;
+};
+
+class TemplatedTraits {
+ public:
+  typedef TBinaryProtocolFactoryT<TBufferBase> ProtocolFactory;
+  typedef TBinaryProtocolT<TBufferBase> Protocol;
+
+  typedef ParentServiceProcessorT<Protocol> ParentProcessor;
+  typedef ChildServiceProcessorT<Protocol> ChildProcessor;
+  typedef ParentServiceClientT<Protocol> ParentClient;
+  typedef ChildServiceClientT<Protocol> ChildClient;
+};
+
+
+template<typename TemplateTraits_>
+class ParentServiceTraits {
+ public:
+  typedef typename TemplateTraits_::ParentProcessor Processor;
+  typedef typename TemplateTraits_::ParentClient Client;
+  typedef ParentHandler Handler;
+
+  typedef typename TemplateTraits_::ProtocolFactory ProtocolFactory;
+  typedef typename TemplateTraits_::Protocol Protocol;
+};
+
+template<typename TemplateTraits_>
+class ChildServiceTraits {
+ public:
+  typedef typename TemplateTraits_::ChildProcessor Processor;
+  typedef typename TemplateTraits_::ChildClient Client;
+  typedef ChildHandler Handler;
+
+  typedef typename TemplateTraits_::ProtocolFactory ProtocolFactory;
+  typedef typename TemplateTraits_::Protocol Protocol;
+};
+
+// TODO: It would be nicer if the TTransportFactory types defined a typedef,
+// to allow us to figure out the exact transport type without having to pass it
+// in as a separate template parameter here.
+//
+// It would also be niec if they used covariant return types.  Unfortunately,
+// since they return shared_ptr instead of raw pointers, covariant return types
+// won't work.
+template<typename ServerTraits_, typename ServiceTraits_,
+         typename TransportFactory_ = TFramedTransportFactory,
+         typename Transport_ = TFramedTransport>
+class ServiceState : public ServerState {
+ public:
+  typedef typename ServiceTraits_::Processor Processor;
+  typedef typename ServiceTraits_::Client Client;
+  typedef typename ServiceTraits_::Handler Handler;
+
+  ServiceState() :
+      port_(0),
+      log_(new EventLog),
+      handler_(new Handler(log_)),
+      processor_(new Processor(handler_)),
+      transportFactory_(new TransportFactory_),
+      protocolFactory_(new typename ServiceTraits_::ProtocolFactory),
+      serverEventHandler_(new ServerEventHandler(log_)),
+      processorEventHandler_(new ProcessorEventHandler(log_)) {
+    processor_->setEventHandler(processorEventHandler_);
+  }
+
+  shared_ptr<TServer> createServer(uint16_t port) {
+    ServerTraits_ serverTraits;
+    return serverTraits.createServer(processor_, port, transportFactory_,
+                                     protocolFactory_);
+  }
+
+  shared_ptr<TServerEventHandler> getServerEventHandler() {
+    return serverEventHandler_;
+  }
+
+  void bindSuccessful(uint16_t port) {
+    port_ = port;
+  }
+
+  uint16_t getPort() const {
+    return port_;
+  }
+
+  const shared_ptr<EventLog>& getLog() const {
+    return log_;
+  }
+
+  const shared_ptr<Handler>& getHandler() const {
+    return handler_;
+  }
+
+  shared_ptr<Client> createClient() {
+    typedef typename ServiceTraits_::Protocol Protocol;
+
+    shared_ptr<TSocket> socket(new TSocket("127.0.0.1", port_));
+    shared_ptr<Transport_> transport(new Transport_(socket));
+    shared_ptr<Protocol> protocol(new Protocol(transport));
+    transport->open();
+
+    shared_ptr<Client> client(new Client(protocol));
+    return client;
+  }
+
+ private:
+  uint16_t port_;
+  shared_ptr<EventLog> log_;
+  shared_ptr<Handler> handler_;
+  shared_ptr<Processor> processor_;
+  shared_ptr<TTransportFactory> transportFactory_;
+  shared_ptr<TProtocolFactory> protocolFactory_;
+  shared_ptr<TServerEventHandler> serverEventHandler_;
+  shared_ptr<TProcessorEventHandler> processorEventHandler_;
+};
+
+
+/**
+ * Check that there are no more events in the log
+ */
+void checkNoEvents(const shared_ptr<EventLog>& log) {
+  // Wait for an event with a very short timeout period.  We don't expect
+  // anything to be present, so we will normally wait for the full timeout.
+  // On the other hand, a non-zero timeout is nice since it does give a short
+  // window for events to arrive in case there is a problem.
+  Event event = log->waitForEvent(10);
+  BOOST_CHECK_EQUAL(EventLog::ET_LOG_END, event.type);
+}
+
+/**
+ * Check for the events that should be logged when a new connection is created.
+ *
+ * Returns the connection ID allocated by the server.
+ */
+uint32_t checkNewConnEvents(const shared_ptr<EventLog>& log) {
+  // Check for an ET_CONN_CREATED event
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CONN_CREATED, event.type);
+
+  // Some servers call the processContext() hook immediately.
+  // Others (TNonblockingServer) only call it once a full request is received.
+  // We don't check for it yet, to allow either behavior.
+
+  return event.connectionId;
+}
+
+/**
+ * Check for the events that should be logged when a connection is closed.
+ */
+void checkCloseEvents(const shared_ptr<EventLog>& log, uint32_t connId) {
+  // Check for an ET_CONN_DESTROYED event
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CONN_DESTROYED, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+
+  // Make sure there are no more events
+  checkNoEvents(log);
+}
+
+/**
+ * Check for the events that should be logged when a call is received
+ * and the handler is invoked.
+ *
+ * It does not check for anything after the handler invocation.
+ *
+ * Returns the call ID allocated by the server.
+ */
+uint32_t checkCallHandlerEvents(const shared_ptr<EventLog>& log,
+                                uint32_t connId,
+                                EventType callType,
+                                const string& callName) {
+  // Call started
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_STARTED, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+  uint32_t callId = event.callId;
+
+  // Pre-read
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_PRE_READ, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // Post-read
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_POST_READ, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // Handler invocation
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(callType, event.type);
+  // The handler doesn't have any connection or call context,
+  // so the connectionId and callId in this event aren't valid
+
+  return callId;
+}
+
+/**
+ * Check for the events that should be after a handler returns.
+ */
+void checkCallPostHandlerEvents(const shared_ptr<EventLog>& log,
+                                uint32_t connId,
+                                uint32_t callId,
+                                const string& callName) {
+  // Pre-write
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_PRE_WRITE, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // Post-write
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_POST_WRITE, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // Call finished
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // It is acceptable for servers to call processContext() again immediately
+  // to start waiting on the next request.  However, some servers wait before
+  // getting either a partial request or the full request before calling
+  // processContext().  We don't check for the next call to processContext()
+  // yet.
+}
+
+/**
+ * Check for the events that should be logged when a call is made.
+ *
+ * This just calls checkCallHandlerEvents() followed by
+ * checkCallPostHandlerEvents().
+ *
+ * Returns the call ID allocated by the server.
+ */
+uint32_t checkCallEvents(const shared_ptr<EventLog>& log,
+                         uint32_t connId,
+                         EventType callType,
+                         const string& callName) {
+  uint32_t callId = checkCallHandlerEvents(log, connId, callType, callName);
+  checkCallPostHandlerEvents(log, connId, callId, callName);
+
+  return callId;
+}
+
+/*
+ * Test functions
+ */
+
+template<typename State_>
+void testParentService(const shared_ptr<State_>& state) {
+  shared_ptr<typename State_::Client> client = state->createClient();
+
+  int32_t gen = client->getGeneration();
+  int32_t newGen = client->incrementGeneration();
+  BOOST_CHECK_EQUAL(gen + 1, newGen);
+  newGen = client->getGeneration();
+  BOOST_CHECK_EQUAL(gen + 1, newGen);
+
+  client->addString("foo");
+  client->addString("bar");
+  client->addString("asdf");
+
+  vector<string> strings;
+  client->getStrings(strings);
+  BOOST_REQUIRE_EQUAL(3, strings.size());
+  BOOST_REQUIRE_EQUAL("foo", strings[0]);
+  BOOST_REQUIRE_EQUAL("bar", strings[1]);
+  BOOST_REQUIRE_EQUAL("asdf", strings[2]);
+}
+
+template<typename State_>
+void testChildService(const shared_ptr<State_>& state) {
+  shared_ptr<typename State_::Client> client = state->createClient();
+
+  // Test calling some of the parent methids via the a child client
+  int32_t gen = client->getGeneration();
+  int32_t newGen = client->incrementGeneration();
+  BOOST_CHECK_EQUAL(gen + 1, newGen);
+  newGen = client->getGeneration();
+  BOOST_CHECK_EQUAL(gen + 1, newGen);
+
+  // Test some of the child methods
+  client->setValue(10);
+  BOOST_CHECK_EQUAL(10, client->getValue());
+  BOOST_CHECK_EQUAL(10, client->setValue(99));
+  BOOST_CHECK_EQUAL(99, client->getValue());
+}
+
+template<typename ServerTraits, typename TemplateTraits>
+void testBasicService() {
+  typedef ServiceState< ServerTraits, ParentServiceTraits<TemplateTraits> >
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  testParentService(state);
+}
+
+template<typename ServerTraits, typename TemplateTraits>
+void testInheritedService() {
+  typedef ServiceState< ServerTraits, ChildServiceTraits<TemplateTraits> >
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  testParentService(state);
+  testChildService(state);
+}
+
+/**
+ * Test to make sure that the TServerEventHandler and TProcessorEventHandler
+ * methods are invoked in the correct order with the actual events.
+ */
+template<typename ServerTraits, typename TemplateTraits>
+void testEventSequencing() {
+  // We use TBufferedTransport for this test, instead of TFramedTransport.
+  // This way the server will start processing data as soon as it is received,
+  // instead of waiting for the full request.  This is necessary so we can
+  // separate the preRead() and postRead() events.
+  typedef ServiceState< ServerTraits, ChildServiceTraits<TemplateTraits>,
+                        TBufferedTransportFactory, TBufferedTransport>
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  const shared_ptr<EventLog>& log = state->getLog();
+
+  // Make sure we're at the end of the log
+  checkNoEvents(log);
+
+  state->getHandler()->prepareTriggeredCall();
+
+  // Make sure createContext() is called after a connection has been
+  // established.  We open a plain socket instead of creating a client.
+  shared_ptr<TSocket> socket(new TSocket("127.0.0.1", state->getPort()));
+  socket->open();
+
+  // Make sure the proper events occurred after a new connection
+  uint32_t connId = checkNewConnEvents(log);
+
+  // Send a message header.  We manually construct the request so that we
+  // can test the timing for the preRead() call.
+  string requestName = "getDataWait";
+  string eventName = "ParentService.getDataWait";
+  int32_t seqid = time(NULL);
+  TBinaryProtocol protocol(socket);
+  protocol.writeMessageBegin(requestName, T_CALL, seqid);
+  socket->flush();
+
+  // Make sure we saw the call started and pre-read events
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_STARTED, event.type);
+  BOOST_CHECK_EQUAL(eventName, event.message);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  uint32_t callId = event.callId;
+
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_PRE_READ, event.type);
+  BOOST_CHECK_EQUAL(eventName, event.message);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+
+  // Make sure there are no new events
+  checkNoEvents(log);
+
+  // Send the rest of the request
+  protocol.writeStructBegin("ParentService_getDataNotified_pargs");
+  protocol.writeFieldBegin("length", apache::thrift::protocol::T_I32, 1);
+  protocol.writeI32(8*1024*1024);
+  protocol.writeFieldEnd();
+  protocol.writeFieldStop();
+  protocol.writeStructEnd();
+  protocol.writeMessageEnd();
+  socket->writeEnd();
+  socket->flush();
+
+  // We should then see postRead()
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_POST_READ, event.type);
+  BOOST_CHECK_EQUAL(eventName, event.message);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+
+  // Then the handler should be invoked
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_GET_DATA_WAIT, event.type);
+
+  // The handler won't respond until we notify it.
+  // Make sure there are no more events.
+  checkNoEvents(log);
+
+  // Notify the handler that it should return
+  // We just use a global lock for now, since it is easiest
+  state->getHandler()->triggerPendingCalls();
+
+  // The handler will log a separate event before it returns
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type);
+
+  // We should then see preWrite()
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_PRE_WRITE, event.type);
+  BOOST_CHECK_EQUAL(eventName, event.message);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+
+  // We requested more data than can be buffered, and we aren't reading it,
+  // so the server shouldn't be able to finish its write yet.
+  // Make sure there are no more events.
+  checkNoEvents(log);
+
+  // Read the response header
+  std::string responseName;
+  int32_t responseSeqid = 0;
+  apache::thrift::protocol::TMessageType responseType;
+  protocol.readMessageBegin(responseName, responseType, responseSeqid);
+  BOOST_CHECK_EQUAL(responseSeqid, seqid);
+  BOOST_CHECK_EQUAL(requestName, responseName);
+  BOOST_CHECK_EQUAL(responseType, T_REPLY);
+  // Read the body.  We just ignore it for now.
+  protocol.skip(T_STRUCT);
+
+  // Now that we have read, the server should have finished sending the data
+  // and called the postWrite() handler
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_POST_WRITE, event.type);
+  BOOST_CHECK_EQUAL(eventName, event.message);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+
+  // Call finished should be last
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type);
+  BOOST_CHECK_EQUAL(eventName, event.message);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+
+  // There should be no more events
+  checkNoEvents(log);
+
+  // Close the connection, and make sure we get a connection destroyed event
+  socket->close();
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CONN_DESTROYED, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+
+  // There should be no more events
+  checkNoEvents(log);
+}
+
+template<typename ServerTraits, typename TemplateTraits>
+void testSeparateConnections() {
+  typedef ServiceState< ServerTraits, ChildServiceTraits<TemplateTraits> >
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  const shared_ptr<EventLog>& log = state->getLog();
+
+  // Create a client
+  shared_ptr<typename State::Client> client1 = state->createClient();
+
+  // Make sure the expected events were logged
+  uint32_t client1Id = checkNewConnEvents(log);
+
+  // Create a second client
+  shared_ptr<typename State::Client> client2 = state->createClient();
+
+  // Make sure the expected events were logged
+  uint32_t client2Id = checkNewConnEvents(log);
+
+  // The two connections should have different IDs
+  BOOST_CHECK_NE(client1Id, client2Id);
+
+  // Make a call, and check for the proper events
+  int32_t value = 5;
+  client1->setValue(value);
+  uint32_t call1 = checkCallEvents(log, client1Id, EventLog::ET_CALL_SET_VALUE,
+                                     "ChildService.setValue");
+
+  // Make a call with client2
+  int32_t v = client2->getValue();
+  BOOST_CHECK_EQUAL(value, v);
+  checkCallEvents(log, client2Id, EventLog::ET_CALL_GET_VALUE,
+                  "ChildService.getValue");
+
+  // Make another call with client1
+  v = client1->getValue();
+  BOOST_CHECK_EQUAL(value, v);
+  uint32_t call2 = checkCallEvents(log, client1Id, EventLog::ET_CALL_GET_VALUE,
+                                     "ChildService.getValue");
+  BOOST_CHECK_NE(call1, call2);
+
+  // Close the second client, and check for the appropriate events
+  client2.reset();
+  checkCloseEvents(log, client2Id);
+}
+
+template<typename ServerTraits, typename TemplateTraits>
+void testOnewayCall() {
+  typedef ServiceState< ServerTraits, ChildServiceTraits<TemplateTraits> >
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  const shared_ptr<EventLog>& log = state->getLog();
+
+  // Create a client
+  shared_ptr<typename State::Client> client = state->createClient();
+  uint32_t connId = checkNewConnEvents(log);
+
+  // Make a oneway call
+  // It should return immediately, even though the server's handler
+  // won't return right away
+  state->getHandler()->prepareTriggeredCall();
+  client->onewayWait();
+  string callName = "ParentService.onewayWait";
+  uint32_t callId = checkCallHandlerEvents(log, connId,
+                                           EventLog::ET_CALL_ONEWAY_WAIT,
+                                           callName);
+
+  // There shouldn't be any more events
+  checkNoEvents(log);
+
+  // Trigger the handler to return
+  state->getHandler()->triggerPendingCalls();
+
+  // The handler will log an ET_WAIT_RETURN event when it wakes up
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type);
+
+  // Now we should see the async complete event, then call finished
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_ASYNC_COMPLETE, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // Destroy the client, and check for connection closed events
+  client.reset();
+  checkCloseEvents(log, connId);
+
+  checkNoEvents(log);
+}
+
+template<typename ServerTraits, typename TemplateTraits>
+void testExpectedError() {
+  typedef ServiceState< ServerTraits, ChildServiceTraits<TemplateTraits> >
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  const shared_ptr<EventLog>& log = state->getLog();
+
+  // Create a client
+  shared_ptr<typename State::Client> client = state->createClient();
+  uint32_t connId = checkNewConnEvents(log);
+
+  // Send the exceptionWait() call
+  state->getHandler()->prepareTriggeredCall();
+  string message = "test 1234 test";
+  client->send_exceptionWait(message);
+  string callName = "ParentService.exceptionWait";
+  uint32_t callId = checkCallHandlerEvents(log, connId,
+                                           EventLog::ET_CALL_EXCEPTION_WAIT,
+                                           callName);
+
+  // There shouldn't be any more events
+  checkNoEvents(log);
+
+  // Trigger the handler to return
+  state->getHandler()->triggerPendingCalls();
+
+  // The handler will log an ET_WAIT_RETURN event when it wakes up
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type);
+
+  // Now receive the response
+  try {
+    client->recv_exceptionWait();
+    BOOST_FAIL("expected MyError to be thrown");
+  } catch (const MyError& e) {
+    BOOST_CHECK_EQUAL(message, e.message);
+  }
+
+  // Now we should see the events for a normal call finish
+  checkCallPostHandlerEvents(log, connId, callId, callName);
+
+  // There shouldn't be any more events
+  checkNoEvents(log);
+
+  // Destroy the client, and check for connection closed events
+  client.reset();
+  checkCloseEvents(log, connId);
+
+  checkNoEvents(log);
+}
+
+template<typename ServerTraits, typename TemplateTraits>
+void testUnexpectedError() {
+  typedef ServiceState< ServerTraits, ChildServiceTraits<TemplateTraits> >
+    State;
+
+  // Start the server
+  shared_ptr<State> state(new State);
+  ServerThread serverThread(state, true);
+
+  const shared_ptr<EventLog>& log = state->getLog();
+
+  // Create a client
+  shared_ptr<typename State::Client> client = state->createClient();
+  uint32_t connId = checkNewConnEvents(log);
+
+  // Send the unexpectedExceptionWait() call
+  state->getHandler()->prepareTriggeredCall();
+  string message = "1234 test 5678";
+  client->send_unexpectedExceptionWait(message);
+  string callName = "ParentService.unexpectedExceptionWait";
+  uint32_t callId = checkCallHandlerEvents(
+      log, connId, EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, callName);
+
+  // There shouldn't be any more events
+  checkNoEvents(log);
+
+  // Trigger the handler to return
+  state->getHandler()->triggerPendingCalls();
+
+  // The handler will log an ET_WAIT_RETURN event when it wakes up
+  Event event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type);
+
+  // Now receive the response
+  try {
+    client->recv_unexpectedExceptionWait();
+    BOOST_FAIL("expected TApplicationError to be thrown");
+  } catch (const TApplicationException& e) {
+  }
+
+  // Now we should see a handler error event
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_HANDLER_ERROR, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // pre-write and post-write events aren't generated after a handler error
+  // (Even for non-oneway calls where a response is written.)
+  //
+  // A call finished event is logged when the call context is destroyed
+  event = log->waitForEvent();
+  BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type);
+  BOOST_CHECK_EQUAL(connId, event.connectionId);
+  BOOST_CHECK_EQUAL(callId, event.callId);
+  BOOST_CHECK_EQUAL(callName, event.message);
+
+  // There shouldn't be any more events
+  checkNoEvents(log);
+
+  // Destroy the client, and check for connection closed events
+  client.reset();
+  checkCloseEvents(log, connId);
+
+  checkNoEvents(log);
+}
+
+
+// Macro to define simple tests that can be used with all server types
+#define DEFINE_SIMPLE_TESTS(Server, Template) \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_basicService) { \
+    testBasicService<Server##Traits, Template##Traits>(); \
+  } \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_inheritedService) { \
+    testInheritedService<Server##Traits, Template##Traits>(); \
+  } \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_oneway) { \
+    testOnewayCall<Server##Traits, Template##Traits>(); \
+  } \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_exception) { \
+    testExpectedError<Server##Traits, Template##Traits>(); \
+  } \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_unexpectedException) { \
+    testUnexpectedError<Server##Traits, Template##Traits>(); \
+  }
+
+// Tests that require the server to process multiple connections concurrently
+// (i.e., not TSimpleServer)
+#define DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_separateConnections) { \
+    testSeparateConnections<Server##Traits, Template##Traits>(); \
+  }
+
+// The testEventSequencing() test manually generates a request for the server,
+// and doesn't work with TFramedTransport.  Therefore we can't test it with
+// TNonblockingServer.
+#define DEFINE_NOFRAME_TESTS(Server, Template) \
+  BOOST_AUTO_TEST_CASE(Server##_##Template##_eventSequencing) { \
+    testEventSequencing<Server##Traits, Template##Traits>(); \
+  }
+
+#define DEFINE_TNONBLOCKINGSERVER_TESTS(Server, Template) \
+  DEFINE_SIMPLE_TESTS(Server, Template) \
+  DEFINE_CONCURRENT_SERVER_TESTS(Server, Template)
+
+#define DEFINE_ALL_SERVER_TESTS(Server, Template) \
+  DEFINE_SIMPLE_TESTS(Server, Template) \
+  DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) \
+  DEFINE_NOFRAME_TESTS(Server, Template)
+
+DEFINE_ALL_SERVER_TESTS(TThreadedServer, Templated)
+DEFINE_ALL_SERVER_TESTS(TThreadedServer, Untemplated)
+DEFINE_ALL_SERVER_TESTS(TThreadPoolServer, Templated)
+DEFINE_ALL_SERVER_TESTS(TThreadPoolServer, Untemplated)
+
+DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServer, Templated)
+DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServer, Untemplated)
+DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServerNoThreads, Templated)
+DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServerNoThreads, Untemplated)
+
+DEFINE_SIMPLE_TESTS(TSimpleServer, Templated);
+DEFINE_SIMPLE_TESTS(TSimpleServer, Untemplated);
+DEFINE_NOFRAME_TESTS(TSimpleServer, Templated);
+DEFINE_NOFRAME_TESTS(TSimpleServer, Untemplated);
+
+// TODO: We should test TEventServer in the future.
+// For now, it is known not to work correctly with TProcessorEventHandler.
+
+unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) {
+  unit_test::framework::master_test_suite().p_name.value =
+    "ProcessorTest";
+
+  return NULL;
+}
diff --git a/lib/cpp/test/processor/ServerThread.cpp b/lib/cpp/test/processor/ServerThread.cpp
new file mode 100755
index 0000000..9f2087c
--- /dev/null
+++ b/lib/cpp/test/processor/ServerThread.cpp
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef _THRIFT_TEST_SERVERTHREAD_TCC_
+#define _THRIFT_TEST_SERVERTHREAD_TCC_ 1
+
+#include "ServerThread.h"
+
+#include <thrift/concurrency/PosixThreadFactory.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/server/TThreadPoolServer.h>
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TServerSocket.h>
+
+namespace apache { namespace thrift { namespace test {
+
+void ServerThread::start() {
+  assert(!running_);
+  running_ = true;
+
+  // Start the other thread
+  concurrency::PosixThreadFactory threadFactory;
+  threadFactory.setDetached(false);
+  thread_ = threadFactory.newThread(helper_);
+
+  thread_->start();
+
+  // Wait on the other thread to tell us that it has successfully
+  // bound to the port and started listening (or until an error occurs).
+  concurrency::Synchronized s(serverMonitor_);
+  while (!serving_ && !error_) {
+    serverMonitor_.waitForever();
+  }
+
+  if (error_) {
+    throw transport::TTransportException(
+        transport::TTransportException::NOT_OPEN,
+        "failed to bind on server socket");
+  }
+}
+
+void ServerThread::stop() {
+  if (!running_) {
+    return;
+  }
+
+  // Tell the server to stop
+  server_->stop();
+  running_ = false;
+
+  // Wait for the server thread to exit
+  //
+  // Note: this only works if all client connections have closed.  The servers
+  // generally wait for everything to be closed before exiting; there currently
+  // isn't a way to tell them to just exit now, and shut down existing
+  // connections.
+  thread_->join();
+}
+
+void ServerThread::run() {
+  /*
+   * Try binding to several ports, in case the one we want is already in use.
+   */
+  port_ = 12345;
+  unsigned int maxRetries = 10;
+  for (unsigned int n = 0; n < maxRetries; ++n) {
+    // Create the server
+    server_ = serverState_->createServer(port_);
+    // Install our helper as the server event handler, so that our
+    // preServe() method will be called once we've successfully bound to
+    // the port and are about to start listening.
+    server_->setServerEventHandler(helper_);
+
+    try {
+      // Try to serve requests
+      server_->serve();
+    } catch (const TException& x) {
+      // TNonblockingServer throws a generic TException if it fails to bind.
+      // If we get a TException, we'll optimistically assume the bind failed.
+      ++port_;
+      continue;
+    }
+
+    // Seriously?  serve() is pretty lame.  If it fails to start serving it
+    // just returns rather than throwing an exception.
+    //
+    // We have to use our preServe() hook to tell if serve() successfully
+    // started serving and is returning because stop() is called, or if it just
+    // failed to start serving in the first place.
+    concurrency::Synchronized s(serverMonitor_);
+    if (serving_) {
+      // Oh good, we started serving and are exiting because
+      // we're trying to stop.
+      serving_ = false;
+      return;
+    } else {
+      // We never started serving, probably because we failed to bind to the
+      // port.  Increment the port number and try again.
+      ++port_;
+      continue;
+    }
+  }
+
+  // We failed to bind on any port.
+  concurrency::Synchronized s(serverMonitor_);
+  error_ = true;
+  serverMonitor_.notify();
+}
+
+void ServerThread::preServe() {
+  // We bound to the port successfully, and are about to start serving requests
+  serverState_->bindSuccessful(port_);
+
+  // Set the real server event handler (replacing ourself)
+  boost::shared_ptr<server::TServerEventHandler> serverEventHandler =
+    serverState_->getServerEventHandler();
+  server_->setServerEventHandler(serverEventHandler);
+
+  // Notify the main thread that we have successfully started serving requests
+  concurrency::Synchronized s(serverMonitor_);
+  serving_ = true;
+  serverMonitor_.notify();
+
+  // Invoke preServe() on the real event handler, since we ate
+  // the original preServe() event.
+  if (serverEventHandler) {
+    serverEventHandler->preServe();
+  }
+}
+
+}}} // apache::thrift::test
+
+#endif // _THRIFT_TEST_SERVERTHREAD_TCC_
diff --git a/lib/cpp/test/processor/ServerThread.h b/lib/cpp/test/processor/ServerThread.h
new file mode 100755
index 0000000..0dd5127
--- /dev/null
+++ b/lib/cpp/test/processor/ServerThread.h
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef _THRIFT_TEST_SERVERTHREAD_H_
+#define _THRIFT_TEST_SERVERTHREAD_H_ 1
+
+#include <thrift/TProcessor.h>
+#include <thrift/protocol/TProtocol.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/TTransport.h>
+
+#include "EventLog.h"
+
+namespace apache { namespace thrift { namespace test {
+
+/**
+ * A helper class to tell ServerThread how to create the server
+ */
+class ServerState {
+ public:
+  virtual ~ServerState() {}
+
+  /**
+   * Create a server to listen on the specified port.
+   *
+   * If the server returned fails to bind to the specified port when serve() is
+   * called on it, createServer() may be called again on a different port.
+   */
+  virtual boost::shared_ptr<server::TServer> createServer(uint16_t port) = 0;
+
+  /**
+   * Get the TServerEventHandler to set on the server.
+   *
+   * This is only called after the server successfully binds and is about to
+   * start serving traffic.  It is invoked from the server thread, rather than
+   * the main thread.
+   */
+  virtual boost::shared_ptr<server::TServerEventHandler>
+      getServerEventHandler() {
+    return boost::shared_ptr<server::TServerEventHandler>();
+  }
+
+  /**
+   * This method is called in the server thread after server binding succeeds.
+   *
+   * Subclasses may override this method if they wish to record the final
+   * port that was used for the server.
+   */
+  virtual void bindSuccessful(uint16_t port) {
+  }
+};
+
+/**
+ * ServerThread starts a thrift server running in a separate thread.
+ */
+class ServerThread {
+ public:
+  ServerThread(const boost::shared_ptr<ServerState>& state, bool autoStart) :
+      helper_(new Helper(this)),
+      port_(0),
+      running_(false),
+      serving_(false),
+      error_(false),
+      serverState_(state) {
+    if (autoStart) {
+      start();
+    }
+  }
+
+  void start();
+  void stop();
+
+  uint16_t getPort() const {
+    return port_;
+  }
+
+  ~ServerThread() {
+    if (running_) {
+      try {
+        stop();
+      } catch (...) {
+        GlobalOutput.printf("error shutting down server");
+      }
+    }
+  }
+
+ protected:
+  // Annoying.  thrift forces us to use shared_ptr, so we have to use
+  // a helper class that we can allocate on the heap and give to thrift.
+  // It would be simpler if we could just make Runnable and TServerEventHandler
+  // private base classes of ServerThread.
+  class Helper : public concurrency::Runnable,
+                 public server::TServerEventHandler {
+   public:
+    Helper(ServerThread* serverThread)
+      : serverThread_(serverThread) {}
+
+    void run() {
+      serverThread_->run();
+    }
+
+    void preServe() {
+      serverThread_->preServe();
+    }
+
+   private:
+    ServerThread* serverThread_;
+  };
+
+  void run();
+  void preServe();
+
+  boost::shared_ptr<Helper> helper_;
+
+  uint16_t port_;
+  bool running_;
+  bool serving_;
+  bool error_;
+  concurrency::Monitor serverMonitor_;
+
+  boost::shared_ptr<ServerState> serverState_;
+  boost::shared_ptr<server::TServer> server_;
+  boost::shared_ptr<concurrency::Thread> thread_;
+};
+
+}}} // apache::thrift::test
+
+#endif // _THRIFT_TEST_SERVERTHREAD_H_
diff --git a/lib/cpp/test/processor/proc.thrift b/lib/cpp/test/processor/proc.thrift
new file mode 100644
index 0000000..ac3c5f9
--- /dev/null
+++ b/lib/cpp/test/processor/proc.thrift
@@ -0,0 +1,22 @@
+namespace cpp apache.thrift.test
+
+exception MyError {
+  1: string message
+}
+
+service ParentService {
+  i32 incrementGeneration()
+  i32 getGeneration()
+  void addString(1: string s)
+  list<string> getStrings()
+
+  binary getDataWait(1: i32 length)
+  oneway void onewayWait()
+  void exceptionWait(1: string message) throws (2: MyError error)
+  void unexpectedExceptionWait(1: string message)
+}
+
+service ChildService extends ParentService {
+  i32 setValue(1: i32 value)
+  i32 getValue()
+}