From 2be7f2410294a809f6803b8e6e3067e51f997679 Mon Sep 17 00:00:00 2001 From: Roger Meier Date: Thu, 10 May 2012 09:01:45 +0000 Subject: [PATCH] THRIFT-1336 thrift: added server and processor test code Patch: Dave Watson - rework made by roger git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1336544 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 38 +- .../src/thrift/processor/test/EventLog.cpp | 129 +++ lib/cpp/src/thrift/processor/test/EventLog.h | 94 ++ lib/cpp/src/thrift/processor/test/Handlers.h | 341 +++++++ .../thrift/processor/test/ProcessorTest.cpp | 941 ++++++++++++++++++ .../thrift/processor/test/ServerThread.cpp | 148 +++ .../src/thrift/processor/test/ServerThread.h | 143 +++ lib/cpp/src/thrift/processor/test/proc.thrift | 22 + 8 files changed, 1855 insertions(+), 1 deletion(-) mode change 100644 => 100755 lib/cpp/Makefile.am create mode 100644 lib/cpp/src/thrift/processor/test/EventLog.cpp create mode 100644 lib/cpp/src/thrift/processor/test/EventLog.h create mode 100644 lib/cpp/src/thrift/processor/test/Handlers.h create mode 100755 lib/cpp/src/thrift/processor/test/ProcessorTest.cpp create mode 100644 lib/cpp/src/thrift/processor/test/ServerThread.cpp create mode 100644 lib/cpp/src/thrift/processor/test/ServerThread.h create mode 100644 lib/cpp/src/thrift/processor/test/proc.thrift diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am old mode 100644 new mode 100755 index 21480aba..21f70dd9 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -208,7 +208,8 @@ include_qt_HEADERS = \ src/thrift/qt/TQTcpServer.h -noinst_PROGRAMS = concurrency_test +noinst_PROGRAMS = concurrency_test \ + processor_test concurrency_test_SOURCES = \ src/thrift/concurrency/test/Tests.cpp \ @@ -218,6 +219,41 @@ concurrency_test_SOURCES = \ concurrency_test_LDADD = libthrift.la +processor_test_SOURCES = src/thrift/processor/test/ProcessorTest.cpp \ + src/thrift/processor/test/EventLog.cpp \ + src/thrift/processor/test/ServerThread.cpp \ + src/thrift/processor/test/EventLog.h \ + src/thrift/processor/test/Handlers.h \ + src/thrift/processor/test/ServerThread.h \ + src/thrift/processor/test/gen-cpp/ChildService.h + +processor_test_LDADD = libprocessortest.la \ + libthrift.la \ + libthriftnb.la \ + $(BOOST_LDFLAGS) \ + -levent \ + $(BOOST_ROOT_PATH)/lib/libboost_unit_test_framework.a + +check_PROGRAMS = \ + concurrency_test \ + processor_test + +TESTS = \ + $(check_PROGRAMS) + +noinst_LTLIBRARIES = libprocessortest.la + +THRIFT = $(top_builddir)/compiler/cpp/thrift + +gen-cpp/ChildService.cpp: $(top_srcdir)/lib/cpp/src/thrift/processor/test/proc.thrift + $(THRIFT) --gen cpp:templates,cob_style $< + +nodist_libprocessortest_la_SOURCES = \ + gen-cpp/ChildService.h \ + gen-cpp/ChildService.cpp \ + gen-cpp/ParentService.h \ + gen-cpp/ParentService.cpp + WINDOWS_DIST = \ README_WINDOWS \ src/windows \ diff --git a/lib/cpp/src/thrift/processor/test/EventLog.cpp b/lib/cpp/src/thrift/processor/test/EventLog.cpp new file mode 100644 index 00000000..7d27569a --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/EventLog.cpp @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "processor/test/EventLog.h" + +#include + +using namespace std; +using namespace apache::thrift::concurrency; + +namespace { + +void debug(const char* fmt, ...) { + // Comment out this return to enable debug logs from the test code. + return; + + va_list ap; + va_start(ap, fmt); + vfprintf(stderr, fmt, ap); + va_end(ap); + + fprintf(stderr, "\n"); +} + +} + +namespace apache { namespace thrift { namespace test { + +uint32_t EventLog::nextId_ = 0; + +#define EVENT_TYPE(value) EventType EventLog::value = #value +EVENT_TYPE(ET_LOG_END); +EVENT_TYPE(ET_CONN_CREATED); +EVENT_TYPE(ET_CONN_DESTROYED); +EVENT_TYPE(ET_CALL_STARTED); +EVENT_TYPE(ET_CALL_FINISHED); +EVENT_TYPE(ET_PROCESS); +EVENT_TYPE(ET_PRE_READ); +EVENT_TYPE(ET_POST_READ); +EVENT_TYPE(ET_PRE_WRITE); +EVENT_TYPE(ET_POST_WRITE); +EVENT_TYPE(ET_ASYNC_COMPLETE); +EVENT_TYPE(ET_HANDLER_ERROR); + +EVENT_TYPE(ET_CALL_INCREMENT_GENERATION); +EVENT_TYPE(ET_CALL_GET_GENERATION); +EVENT_TYPE(ET_CALL_ADD_STRING); +EVENT_TYPE(ET_CALL_GET_STRINGS); +EVENT_TYPE(ET_CALL_GET_DATA_WAIT); +EVENT_TYPE(ET_CALL_ONEWAY_WAIT); +EVENT_TYPE(ET_CALL_EXCEPTION_WAIT); +EVENT_TYPE(ET_CALL_UNEXPECTED_EXCEPTION_WAIT); +EVENT_TYPE(ET_CALL_SET_VALUE); +EVENT_TYPE(ET_CALL_GET_VALUE); +EVENT_TYPE(ET_WAIT_RETURN); + +EventLog::EventLog() { + id_ = nextId_++; + debug("New log: %d", id_); +} + +void EventLog::append(EventType type, uint32_t connectionId, uint32_t callId, + const string& message) { + Synchronized s(monitor_); + debug("%d <-- %u, %u, %s \"%s\"", id_, connectionId, callId, type, + message.c_str()); + + Event e(type, connectionId, callId, message); + events_.push_back(e); + + monitor_.notify(); +} + +Event EventLog::waitForEvent(int64_t timeout) { + Synchronized s(monitor_); + + try { + while (events_.empty()) { + monitor_.wait(timeout); + } + } catch (TimedOutException ex) { + return Event(ET_LOG_END, 0, 0, ""); + } + + Event event = events_.front(); + events_.pop_front(); + return event; +} + +Event EventLog::waitForConnEvent(uint32_t connId, int64_t timeout) { + Synchronized s(monitor_); + + EventList::iterator it = events_.begin(); + while (true) { + try { + // TODO: it would be nicer to honor timeout for the duration of this + // call, rather than restarting it for each call to wait(). It shouldn't + // be a big problem in practice, though. + while (it == events_.end()) { + monitor_.wait(timeout); + } + } catch (TimedOutException ex) { + return Event(ET_LOG_END, 0, 0, ""); + } + + if (it->connectionId == connId) { + Event event = *it; + events_.erase(it); + return event; + } + } +} + +}}} // apache::thrift::test diff --git a/lib/cpp/src/thrift/processor/test/EventLog.h b/lib/cpp/src/thrift/processor/test/EventLog.h new file mode 100644 index 00000000..007b6660 --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/EventLog.h @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TEST_EVENTLOG_H_ +#define _THRIFT_TEST_EVENTLOG_H_ 1 + +#include "concurrency/Monitor.h" + +namespace apache { namespace thrift { namespace test { + +// Initially I made EventType an enum, but using char* results +// in much more readable error messages when there is a mismatch. +// It also lets users of EventLog easily define their own new types. +// Comparing the literal pointer values should be safe, barring any strange +// linking setup that results in duplicate symbols. +typedef const char* EventType; + +struct Event { + Event(EventType type, uint32_t connectionId, uint32_t callId, + const std::string& message) : + type(type), + connectionId(connectionId), + callId(callId), + message(message) {} + + EventType type; + uint32_t connectionId; + uint32_t callId; + std::string message; +}; + +class EventLog { + public: + static EventType ET_LOG_END; + static EventType ET_CONN_CREATED; + static EventType ET_CONN_DESTROYED; + static EventType ET_CALL_STARTED; + static EventType ET_CALL_FINISHED; + static EventType ET_PROCESS; + static EventType ET_PRE_READ; + static EventType ET_POST_READ; + static EventType ET_PRE_WRITE; + static EventType ET_POST_WRITE; + static EventType ET_ASYNC_COMPLETE; + static EventType ET_HANDLER_ERROR; + + static EventType ET_CALL_INCREMENT_GENERATION; + static EventType ET_CALL_GET_GENERATION; + static EventType ET_CALL_ADD_STRING; + static EventType ET_CALL_GET_STRINGS; + static EventType ET_CALL_GET_DATA_WAIT; + static EventType ET_CALL_ONEWAY_WAIT; + static EventType ET_CALL_UNEXPECTED_EXCEPTION_WAIT; + static EventType ET_CALL_EXCEPTION_WAIT; + static EventType ET_WAIT_RETURN; + static EventType ET_CALL_SET_VALUE; + static EventType ET_CALL_GET_VALUE; + + EventLog(); + + void append(EventType type, uint32_t connectionId, uint32_t callId, + const std::string& message = ""); + + Event waitForEvent(int64_t timeout = 500); + Event waitForConnEvent(uint32_t connId, int64_t timeout = 500); + + protected: + typedef std::list EventList; + + concurrency::Monitor monitor_; + EventList events_; + uint32_t id_; + + static uint32_t nextId_; +}; + +}}} // apache::thrift::test + +#endif // _THRIFT_TEST_EVENTLOG_H_ diff --git a/lib/cpp/src/thrift/processor/test/Handlers.h b/lib/cpp/src/thrift/processor/test/Handlers.h new file mode 100644 index 00000000..a4f5070e --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/Handlers.h @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_PROCESSOR_TEST_HANDLERS_H_ +#define _THRIFT_PROCESSOR_TEST_HANDLERS_H_ 1 + +#include "processor/test/EventLog.h" +#include "gen-cpp/ParentService.h" +#include "gen-cpp/ChildService.h" + +namespace apache { namespace thrift { namespace test { + +class ParentHandler : virtual public ParentServiceIf { + public: + ParentHandler(const boost::shared_ptr& log) : + triggerMonitor(&mutex_), + generation_(0), + wait_(false), + log_(log) { } + + int32_t incrementGeneration() { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_INCREMENT_GENERATION, 0, 0); + return ++generation_; + } + + int32_t getGeneration() { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_GENERATION, 0, 0); + return generation_; + } + + void addString(const std::string& s) { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_ADD_STRING, 0, 0); + strings_.push_back(s); + } + + void getStrings(std::vector& _return) { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_STRINGS, 0, 0); + _return = strings_; + } + + void getDataWait(std::string& _return, int32_t length) { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_DATA_WAIT, 0, 0); + + blockUntilTriggered(); + + _return.append(length, 'a'); + } + + void onewayWait() { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_ONEWAY_WAIT, 0, 0); + + blockUntilTriggered(); + } + + void exceptionWait(const std::string& message) { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_EXCEPTION_WAIT, 0, 0); + + blockUntilTriggered(); + + MyError e; + e.message = message; + throw e; + } + + void unexpectedExceptionWait(const std::string& message) { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, 0, 0); + + blockUntilTriggered(); + + MyError e; + e.message = message; + throw e; + } + + /** + * After prepareTriggeredCall() is invoked, calls to any of the *Wait() + * functions won't return until triggerPendingCalls() is invoked + * + * This has to be a separate function invoked by the main test thread + * in order to to avoid race conditions. + */ + void prepareTriggeredCall() { + concurrency::Guard g(mutex_); + wait_ = true; + } + + /** + * Wake up all calls waiting in blockUntilTriggered() + */ + void triggerPendingCalls() { + concurrency::Guard g(mutex_); + wait_ = false; + triggerMonitor.notifyAll(); + } + + protected: + /** + * blockUntilTriggered() won't return until triggerPendingCalls() is invoked + * in another thread. + * + * This should only be called when already holding mutex_. + */ + void blockUntilTriggered() { + while (wait_) { + triggerMonitor.waitForever(); + } + + // Log an event when we return + log_->append(EventLog::ET_WAIT_RETURN, 0, 0); + } + + concurrency::Mutex mutex_; + concurrency::Monitor triggerMonitor; + int32_t generation_; + bool wait_; + std::vector strings_; + boost::shared_ptr log_; +}; + +class ChildHandler : public ParentHandler, virtual public ChildServiceIf { + public: + ChildHandler(const boost::shared_ptr& log) : + ParentHandler(log), + value_(0) {} + + int32_t setValue(int32_t value) { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_SET_VALUE, 0, 0); + + int32_t oldValue = value_; + value_ = value; + return oldValue; + } + + int32_t getValue() { + concurrency::Guard g(mutex_); + log_->append(EventLog::ET_CALL_GET_VALUE, 0, 0); + + return value_; + } + + protected: + int32_t value_; +}; + +struct ConnContext { + public: + ConnContext(boost::shared_ptr in, + boost::shared_ptr out, + uint32_t id) : + input(in), + output(out), + id(id) {} + + boost::shared_ptr input; + boost::shared_ptr output; + uint32_t id; +}; + +struct CallContext { + public: + CallContext(ConnContext *context, uint32_t id, const std::string& name) : + connContext(context), + name(name), + id(id) {} + + ConnContext *connContext; + std::string name; + uint32_t id; +}; + +class ServerEventHandler : public server::TServerEventHandler { + public: + ServerEventHandler(const boost::shared_ptr& log) : + nextId_(1), + log_(log) {} + + virtual void preServe() {} + + virtual void* createContext(boost::shared_ptr input, + boost::shared_ptr output) { + ConnContext* context = new ConnContext(input, output, nextId_); + ++nextId_; + log_->append(EventLog::ET_CONN_CREATED, context->id, 0); + return context; + } + + virtual void deleteContext(void* serverContext, + boost::shared_ptrinput, + boost::shared_ptroutput) { + ConnContext* context = reinterpret_cast(serverContext); + + if (input != context->input) { + abort(); + } + if (output != context->output) { + abort(); + } + + log_->append(EventLog::ET_CONN_DESTROYED, context->id, 0); + + delete context; + } + + virtual void processContext( + void* serverContext, + boost::shared_ptr transport) { + // TODO: We currently don't test the behavior of the processContext() + // calls. The various server implementations call processContext() at + // slightly different times, and it is too annoying to try and account for + // their various differences. + // + // TThreadedServer, TThreadPoolServer, and TSimpleServer usually wait until + // they see the first byte of a request before calling processContext(). + // However, they don't wait for the first byte of the very first request, + // and instead immediately call processContext() before any data is + // received. + // + // TNonblockingServer always waits until receiving the full request before + // calling processContext(). +#if 0 + ConnContext* context = reinterpret_cast(serverContext); + log_->append(EventLog::ET_PROCESS, context->id, 0); +#endif + } + + protected: + uint32_t nextId_; + boost::shared_ptr log_; +}; + +class ProcessorEventHandler : public TProcessorEventHandler { + public: + ProcessorEventHandler(const boost::shared_ptr& log) : + nextId_(1), + log_(log) {} + + void* getContext(const char* fnName, void* serverContext) { + ConnContext* connContext = reinterpret_cast(serverContext); + + CallContext* context = new CallContext(connContext, nextId_, fnName); + ++nextId_; + + log_->append(EventLog::ET_CALL_STARTED, connContext->id, context->id, + fnName); + return context; + } + + void freeContext(void* ctx, const char* fnName) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_CALL_FINISHED, context->connContext->id, + context->id, fnName); + delete context; + } + + void preRead(void* ctx, const char* fnName) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_PRE_READ, context->connContext->id, context->id, + fnName); + } + + void postRead(void* ctx, const char* fnName, uint32_t bytes) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_POST_READ, context->connContext->id, context->id, + fnName); + } + + void preWrite(void* ctx, const char* fnName) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_PRE_WRITE, context->connContext->id, context->id, + fnName); + } + + void postWrite(void* ctx, const char* fnName, uint32_t bytes) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_POST_WRITE, context->connContext->id, + context->id, fnName); + } + + void asyncComplete(void* ctx, const char* fnName) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_ASYNC_COMPLETE, context->connContext->id, + context->id, fnName); + } + + void handlerError(void* ctx, const char* fnName) { + CallContext* context = reinterpret_cast(ctx); + checkName(context, fnName); + log_->append(EventLog::ET_HANDLER_ERROR, context->connContext->id, + context->id, fnName); + } + + protected: + void checkName(const CallContext* context, const char* fnName) { + // Note: we can't use BOOST_CHECK_EQUAL here, since the handler runs in a + // different thread from the test functions. Just abort if the names are + // different + if (context->name != fnName) { + fprintf(stderr, "call context name mismatch: \"%s\" != \"%s\"\n", + context->name.c_str(), fnName); + fflush(stderr); + abort(); + } + } + + uint32_t nextId_; + boost::shared_ptr log_; +}; + +}}} // apache::thrift::test + +#endif // _THRIFT_PROCESSOR_TEST_HANDLERS_H_ diff --git a/lib/cpp/src/thrift/processor/test/ProcessorTest.cpp b/lib/cpp/src/thrift/processor/test/ProcessorTest.cpp new file mode 100755 index 00000000..fd43d14f --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/ProcessorTest.cpp @@ -0,0 +1,941 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * This file contains tests that ensure TProcessorEventHandler and + * TServerEventHandler are invoked properly by the various server + * implementations. + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include "gen-cpp/ChildService.h" + +using namespace std; +using namespace boost; +using namespace apache::thrift; +using namespace apache::thrift::concurrency; +using namespace apache::thrift::protocol; +using namespace apache::thrift::server; +using namespace apache::thrift::transport; + +using namespace apache::thrift::test; + +/* + * Traits classes that encapsulate how to create various types of servers. + */ + +class TSimpleServerTraits { + public: + typedef TSimpleServer ServerType; + + shared_ptr createServer( + const shared_ptr& processor, + uint16_t port, + const shared_ptr& transportFactory, + const shared_ptr& protocolFactory) { + shared_ptr socket(new TServerSocket(port)); + return shared_ptr(new TSimpleServer( + processor, socket, transportFactory, protocolFactory)); + } +}; + +class TThreadedServerTraits { + public: + typedef TThreadedServer ServerType; + + shared_ptr createServer( + const shared_ptr& processor, + uint16_t port, + const shared_ptr& transportFactory, + const shared_ptr& protocolFactory) { + shared_ptr socket(new TServerSocket(port)); + return shared_ptr(new TThreadedServer( + processor, socket, transportFactory, protocolFactory)); + } +}; + +class TThreadPoolServerTraits { + public: + typedef TThreadPoolServer ServerType; + + shared_ptr createServer( + const shared_ptr& processor, + uint16_t port, + const shared_ptr& transportFactory, + const shared_ptr& protocolFactory) { + shared_ptr socket(new TServerSocket(port)); + + shared_ptr threadFactory(new PosixThreadFactory); + shared_ptr threadManager = + ThreadManager::newSimpleThreadManager(8); + threadManager->threadFactory(threadFactory); + threadManager->start(); + + return shared_ptr(new TThreadPoolServer( + processor, socket, transportFactory, protocolFactory, + threadManager)); + } +}; + +class TNonblockingServerTraits { + public: + typedef TNonblockingServer ServerType; + + shared_ptr createServer( + const shared_ptr& processor, + uint16_t port, + const shared_ptr& transportFactory, + const shared_ptr& protocolFactory) { + // TNonblockingServer automatically uses TFramedTransport. + // Raise an exception if the supplied transport factory is not a + // TFramedTransportFactory + TFramedTransportFactory* framedFactory = + dynamic_cast(transportFactory.get()); + if (framedFactory == NULL) { + throw TException("TNonblockingServer must use TFramedTransport"); + } + + shared_ptr threadFactory(new PosixThreadFactory); + shared_ptr threadManager = + ThreadManager::newSimpleThreadManager(8); + threadManager->threadFactory(threadFactory); + threadManager->start(); + + return shared_ptr(new TNonblockingServer( + processor, protocolFactory, port, threadManager)); + } +}; + +class TNonblockingServerNoThreadsTraits { + public: + typedef TNonblockingServer ServerType; + + shared_ptr createServer( + const shared_ptr& processor, + uint16_t port, + const shared_ptr& transportFactory, + const shared_ptr& protocolFactory) { + // TNonblockingServer automatically uses TFramedTransport. + // Raise an exception if the supplied transport factory is not a + // TFramedTransportFactory + TFramedTransportFactory* framedFactory = + dynamic_cast(transportFactory.get()); + if (framedFactory == NULL) { + throw TException("TNonblockingServer must use TFramedTransport"); + } + + // Use a NULL ThreadManager + shared_ptr threadManager; + return shared_ptr(new TNonblockingServer( + processor, protocolFactory, port, threadManager)); + } +}; + +/* + * Traits classes for controlling if we instantiate templated or generic + * protocol factories, processors, clients, etc. + * + * The goal is to allow the outer test code to select which server type is + * being tested, and whether or not we are testing the templated classes, or + * the generic classes. + * + * Each specific test case can control whether we create a child or parent + * server, and whether we use TFramedTransport or TBufferedTransport. + */ + +class UntemplatedTraits { + public: + typedef TBinaryProtocolFactory ProtocolFactory; + typedef TBinaryProtocol Protocol; + + typedef ParentServiceProcessor ParentProcessor; + typedef ChildServiceProcessor ChildProcessor; + typedef ParentServiceClient ParentClient; + typedef ChildServiceClient ChildClient; +}; + +class TemplatedTraits { + public: + typedef TBinaryProtocolFactoryT ProtocolFactory; + typedef TBinaryProtocolT Protocol; + + typedef ParentServiceProcessorT ParentProcessor; + typedef ChildServiceProcessorT ChildProcessor; + typedef ParentServiceClientT ParentClient; + typedef ChildServiceClientT ChildClient; +}; + + +template +class ParentServiceTraits { + public: + typedef typename TemplateTraits_::ParentProcessor Processor; + typedef typename TemplateTraits_::ParentClient Client; + typedef ParentHandler Handler; + + typedef typename TemplateTraits_::ProtocolFactory ProtocolFactory; + typedef typename TemplateTraits_::Protocol Protocol; +}; + +template +class ChildServiceTraits { + public: + typedef typename TemplateTraits_::ChildProcessor Processor; + typedef typename TemplateTraits_::ChildClient Client; + typedef ChildHandler Handler; + + typedef typename TemplateTraits_::ProtocolFactory ProtocolFactory; + typedef typename TemplateTraits_::Protocol Protocol; +}; + +// TODO: It would be nicer if the TTransportFactory types defined a typedef, +// to allow us to figure out the exact transport type without having to pass it +// in as a separate template parameter here. +// +// It would also be niec if they used covariant return types. Unfortunately, +// since they return shared_ptr instead of raw pointers, covariant return types +// won't work. +template +class ServiceState : public ServerState { + public: + typedef typename ServiceTraits_::Processor Processor; + typedef typename ServiceTraits_::Client Client; + typedef typename ServiceTraits_::Handler Handler; + + ServiceState() : + port_(0), + log_(new EventLog), + handler_(new Handler(log_)), + processor_(new Processor(handler_)), + transportFactory_(new TransportFactory_), + protocolFactory_(new typename ServiceTraits_::ProtocolFactory), + serverEventHandler_(new ServerEventHandler(log_)), + processorEventHandler_(new ProcessorEventHandler(log_)) { + processor_->setEventHandler(processorEventHandler_); + } + + shared_ptr createServer(uint16_t port) { + ServerTraits_ serverTraits; + return serverTraits.createServer(processor_, port, transportFactory_, + protocolFactory_); + } + + shared_ptr getServerEventHandler() { + return serverEventHandler_; + } + + void bindSuccessful(uint16_t port) { + port_ = port; + } + + uint16_t getPort() const { + return port_; + } + + const shared_ptr& getLog() const { + return log_; + } + + const shared_ptr& getHandler() const { + return handler_; + } + + shared_ptr createClient() { + typedef typename ServiceTraits_::Protocol Protocol; + + shared_ptr socket(new TSocket("127.0.0.1", port_)); + shared_ptr transport(new Transport_(socket)); + shared_ptr protocol(new Protocol(transport)); + transport->open(); + + shared_ptr client(new Client(protocol)); + return client; + } + + private: + uint16_t port_; + shared_ptr log_; + shared_ptr handler_; + shared_ptr processor_; + shared_ptr transportFactory_; + shared_ptr protocolFactory_; + shared_ptr serverEventHandler_; + shared_ptr processorEventHandler_; +}; + + +/** + * Check that there are no more events in the log + */ +void checkNoEvents(const shared_ptr& log) { + // Wait for an event with a very short timeout period. We don't expect + // anything to be present, so we will normally wait for the full timeout. + // On the other hand, a non-zero timeout is nice since it does give a short + // window for events to arrive in case there is a problem. + Event event = log->waitForEvent(10); + BOOST_CHECK_EQUAL(EventLog::ET_LOG_END, event.type); +} + +/** + * Check for the events that should be logged when a new connection is created. + * + * Returns the connection ID allocated by the server. + */ +uint32_t checkNewConnEvents(const shared_ptr& log) { + // Check for an ET_CONN_CREATED event + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CONN_CREATED, event.type); + + // Some servers call the processContext() hook immediately. + // Others (TNonblockingServer) only call it once a full request is received. + // We don't check for it yet, to allow either behavior. + + return event.connectionId; +} + +/** + * Check for the events that should be logged when a connection is closed. + */ +void checkCloseEvents(const shared_ptr& log, uint32_t connId) { + // Check for an ET_CONN_DESTROYED event + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CONN_DESTROYED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + + // Make sure there are no more events + checkNoEvents(log); +} + +/** + * Check for the events that should be logged when a call is received + * and the handler is invoked. + * + * It does not check for anything after the handler invocation. + * + * Returns the call ID allocated by the server. + */ +uint32_t checkCallHandlerEvents(const shared_ptr& log, + uint32_t connId, + EventType callType, + const string& callName) { + // Call started + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_STARTED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callName, event.message); + uint32_t callId = event.callId; + + // Pre-read + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_READ, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Post-read + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_READ, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Handler invocation + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(callType, event.type); + // The handler doesn't have any connection or call context, + // so the connectionId and callId in this event aren't valid + + return callId; +} + +/** + * Check for the events that should be after a handler returns. + */ +void checkCallPostHandlerEvents(const shared_ptr& log, + uint32_t connId, + uint32_t callId, + const string& callName) { + // Pre-write + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_WRITE, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Post-write + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_WRITE, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Call finished + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // It is acceptable for servers to call processContext() again immediately + // to start waiting on the next request. However, some servers wait before + // getting either a partial request or the full request before calling + // processContext(). We don't check for the next call to processContext() + // yet. +} + +/** + * Check for the events that should be logged when a call is made. + * + * This just calls checkCallHandlerEvents() followed by + * checkCallPostHandlerEvents(). + * + * Returns the call ID allocated by the server. + */ +uint32_t checkCallEvents(const shared_ptr& log, + uint32_t connId, + EventType callType, + const string& callName) { + uint32_t callId = checkCallHandlerEvents(log, connId, callType, callName); + checkCallPostHandlerEvents(log, connId, callId, callName); + + return callId; +} + +/* + * Test functions + */ + +template +void testParentService(const shared_ptr& state) { + shared_ptr client = state->createClient(); + + int32_t gen = client->getGeneration(); + int32_t newGen = client->incrementGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + newGen = client->getGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + + client->addString("foo"); + client->addString("bar"); + client->addString("asdf"); + + vector strings; + client->getStrings(strings); + BOOST_REQUIRE_EQUAL(3, strings.size()); + BOOST_REQUIRE_EQUAL("foo", strings[0]); + BOOST_REQUIRE_EQUAL("bar", strings[1]); + BOOST_REQUIRE_EQUAL("asdf", strings[2]); +} + +template +void testChildService(const shared_ptr& state) { + shared_ptr client = state->createClient(); + + // Test calling some of the parent methids via the a child client + int32_t gen = client->getGeneration(); + int32_t newGen = client->incrementGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + newGen = client->getGeneration(); + BOOST_CHECK_EQUAL(gen + 1, newGen); + + // Test some of the child methods + client->setValue(10); + BOOST_CHECK_EQUAL(10, client->getValue()); + BOOST_CHECK_EQUAL(10, client->setValue(99)); + BOOST_CHECK_EQUAL(99, client->getValue()); +} + +template +void testBasicService() { + typedef ServiceState< ServerTraits, ParentServiceTraits > + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + testParentService(state); +} + +template +void testInheritedService() { + typedef ServiceState< ServerTraits, ChildServiceTraits > + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + testParentService(state); + testChildService(state); +} + +/** + * Test to make sure that the TServerEventHandler and TProcessorEventHandler + * methods are invoked in the correct order with the actual events. + */ +template +void testEventSequencing() { + // We use TBufferedTransport for this test, instead of TFramedTransport. + // This way the server will start processing data as soon as it is received, + // instead of waiting for the full request. This is necessary so we can + // separate the preRead() and postRead() events. + typedef ServiceState< ServerTraits, ChildServiceTraits, + TBufferedTransportFactory, TBufferedTransport> + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + const shared_ptr& log = state->getLog(); + + // Make sure we're at the end of the log + checkNoEvents(log); + + state->getHandler()->prepareTriggeredCall(); + + // Make sure createContext() is called after a connection has been + // established. We open a plain socket instead of creating a client. + shared_ptr socket(new TSocket("127.0.0.1", state->getPort())); + socket->open(); + + // Make sure the proper events occurred after a new connection + uint32_t connId = checkNewConnEvents(log); + + // Send a message header. We manually construct the request so that we + // can test the timing for the preRead() call. + string requestName = "getDataWait"; + string eventName = "ParentService.getDataWait"; + int32_t seqid = time(NULL); + TBinaryProtocol protocol(socket); + protocol.writeMessageBegin(requestName, T_CALL, seqid); + socket->flush(); + + // Make sure we saw the call started and pre-read events + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_STARTED, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + uint32_t callId = event.callId; + + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_READ, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // Make sure there are no new events + checkNoEvents(log); + + // Send the rest of the request + protocol.writeStructBegin("ParentService_getDataNotified_pargs"); + protocol.writeFieldBegin("length", apache::thrift::protocol::T_I32, 1); + protocol.writeI32(8*1024*1024); + protocol.writeFieldEnd(); + protocol.writeFieldStop(); + protocol.writeStructEnd(); + protocol.writeMessageEnd(); + socket->writeEnd(); + socket->flush(); + + // We should then see postRead() + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_READ, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // Then the handler should be invoked + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_GET_DATA_WAIT, event.type); + + // The handler won't respond until we notify it. + // Make sure there are no more events. + checkNoEvents(log); + + // Notify the handler that it should return + // We just use a global lock for now, since it is easiest + state->getHandler()->triggerPendingCalls(); + + // The handler will log a separate event before it returns + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // We should then see preWrite() + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_PRE_WRITE, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // We requested more data than can be buffered, and we aren't reading it, + // so the server shouldn't be able to finish its write yet. + // Make sure there are no more events. + checkNoEvents(log); + + // Read the response header + std::string responseName; + int32_t responseSeqid = 0; + apache::thrift::protocol::TMessageType responseType; + protocol.readMessageBegin(responseName, responseType, responseSeqid); + BOOST_CHECK_EQUAL(responseSeqid, seqid); + BOOST_CHECK_EQUAL(requestName, responseName); + BOOST_CHECK_EQUAL(responseType, T_REPLY); + // Read the body. We just ignore it for now. + protocol.skip(T_STRUCT); + + // Now that we have read, the server should have finished sending the data + // and called the postWrite() handler + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_POST_WRITE, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // Call finished should be last + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(eventName, event.message); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + + // There should be no more events + checkNoEvents(log); + + // Close the connection, and make sure we get a connection destroyed event + socket->close(); + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CONN_DESTROYED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + + // There should be no more events + checkNoEvents(log); +} + +template +void testSeparateConnections() { + typedef ServiceState< ServerTraits, ChildServiceTraits > + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + const shared_ptr& log = state->getLog(); + + // Create a client + shared_ptr client1 = state->createClient(); + + // Make sure the expected events were logged + uint32_t client1Id = checkNewConnEvents(log); + + // Create a second client + shared_ptr client2 = state->createClient(); + + // Make sure the expected events were logged + uint32_t client2Id = checkNewConnEvents(log); + + // The two connections should have different IDs + BOOST_CHECK_NE(client1Id, client2Id); + + // Make a call, and check for the proper events + int32_t value = 5; + client1->setValue(value); + uint32_t call1 = checkCallEvents(log, client1Id, EventLog::ET_CALL_SET_VALUE, + "ChildService.setValue"); + + // Make a call with client2 + int32_t v = client2->getValue(); + BOOST_CHECK_EQUAL(value, v); + checkCallEvents(log, client2Id, EventLog::ET_CALL_GET_VALUE, + "ChildService.getValue"); + + // Make another call with client1 + v = client1->getValue(); + BOOST_CHECK_EQUAL(value, v); + uint32_t call2 = checkCallEvents(log, client1Id, EventLog::ET_CALL_GET_VALUE, + "ChildService.getValue"); + BOOST_CHECK_NE(call1, call2); + + // Close the second client, and check for the appropriate events + client2.reset(); + checkCloseEvents(log, client2Id); +} + +template +void testOnewayCall() { + typedef ServiceState< ServerTraits, ChildServiceTraits > + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + const shared_ptr& log = state->getLog(); + + // Create a client + shared_ptr client = state->createClient(); + uint32_t connId = checkNewConnEvents(log); + + // Make a oneway call + // It should return immediately, even though the server's handler + // won't return right away + state->getHandler()->prepareTriggeredCall(); + client->onewayWait(); + string callName = "ParentService.onewayWait"; + uint32_t callId = checkCallHandlerEvents(log, connId, + EventLog::ET_CALL_ONEWAY_WAIT, + callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Trigger the handler to return + state->getHandler()->triggerPendingCalls(); + + // The handler will log an ET_WAIT_RETURN event when it wakes up + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // Now we should see the async complete event, then call finished + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_ASYNC_COMPLETE, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // Destroy the client, and check for connection closed events + client.reset(); + checkCloseEvents(log, connId); + + checkNoEvents(log); +} + +template +void testExpectedError() { + typedef ServiceState< ServerTraits, ChildServiceTraits > + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + const shared_ptr& log = state->getLog(); + + // Create a client + shared_ptr client = state->createClient(); + uint32_t connId = checkNewConnEvents(log); + + // Send the exceptionWait() call + state->getHandler()->prepareTriggeredCall(); + string message = "test 1234 test"; + client->send_exceptionWait(message); + string callName = "ParentService.exceptionWait"; + uint32_t callId = checkCallHandlerEvents(log, connId, + EventLog::ET_CALL_EXCEPTION_WAIT, + callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Trigger the handler to return + state->getHandler()->triggerPendingCalls(); + + // The handler will log an ET_WAIT_RETURN event when it wakes up + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // Now receive the response + try { + client->recv_exceptionWait(); + BOOST_FAIL("expected MyError to be thrown"); + } catch (const MyError& e) { + BOOST_CHECK_EQUAL(message, e.message); + } + + // Now we should see the events for a normal call finish + checkCallPostHandlerEvents(log, connId, callId, callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Destroy the client, and check for connection closed events + client.reset(); + checkCloseEvents(log, connId); + + checkNoEvents(log); +} + +template +void testUnexpectedError() { + typedef ServiceState< ServerTraits, ChildServiceTraits > + State; + + // Start the server + shared_ptr state(new State); + ServerThread serverThread(state, true); + + const shared_ptr& log = state->getLog(); + + // Create a client + shared_ptr client = state->createClient(); + uint32_t connId = checkNewConnEvents(log); + + // Send the unexpectedExceptionWait() call + state->getHandler()->prepareTriggeredCall(); + string message = "1234 test 5678"; + client->send_unexpectedExceptionWait(message); + string callName = "ParentService.unexpectedExceptionWait"; + uint32_t callId = checkCallHandlerEvents( + log, connId, EventLog::ET_CALL_UNEXPECTED_EXCEPTION_WAIT, callName); + + // There shouldn't be any more events + checkNoEvents(log); + + // Trigger the handler to return + state->getHandler()->triggerPendingCalls(); + + // The handler will log an ET_WAIT_RETURN event when it wakes up + Event event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_WAIT_RETURN, event.type); + + // Now receive the response + try { + client->recv_unexpectedExceptionWait(); + BOOST_FAIL("expected TApplicationError to be thrown"); + } catch (const TApplicationException& e) { + } + + // Now we should see a handler error event + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_HANDLER_ERROR, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // pre-write and post-write events aren't generated after a handler error + // (Even for non-oneway calls where a response is written.) + // + // A call finished event is logged when the call context is destroyed + event = log->waitForEvent(); + BOOST_CHECK_EQUAL(EventLog::ET_CALL_FINISHED, event.type); + BOOST_CHECK_EQUAL(connId, event.connectionId); + BOOST_CHECK_EQUAL(callId, event.callId); + BOOST_CHECK_EQUAL(callName, event.message); + + // There shouldn't be any more events + checkNoEvents(log); + + // Destroy the client, and check for connection closed events + client.reset(); + checkCloseEvents(log, connId); + + checkNoEvents(log); +} + + +// Macro to define simple tests that can be used with all server types +#define DEFINE_SIMPLE_TESTS(Server, Template) \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_basicService) { \ + testBasicService(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_inheritedService) { \ + testInheritedService(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_oneway) { \ + testOnewayCall(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_exception) { \ + testExpectedError(); \ + } \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_unexpectedException) { \ + testUnexpectedError(); \ + } + +// Tests that require the server to process multiple connections concurrently +// (i.e., not TSimpleServer) +#define DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_separateConnections) { \ + testSeparateConnections(); \ + } + +// The testEventSequencing() test manually generates a request for the server, +// and doesn't work with TFramedTransport. Therefore we can't test it with +// TNonblockingServer. +#define DEFINE_NOFRAME_TESTS(Server, Template) \ + BOOST_AUTO_TEST_CASE(Server##_##Template##_eventSequencing) { \ + testEventSequencing(); \ + } + +#define DEFINE_TNONBLOCKINGSERVER_TESTS(Server, Template) \ + DEFINE_SIMPLE_TESTS(Server, Template) \ + DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) + +#define DEFINE_ALL_SERVER_TESTS(Server, Template) \ + DEFINE_SIMPLE_TESTS(Server, Template) \ + DEFINE_CONCURRENT_SERVER_TESTS(Server, Template) \ + DEFINE_NOFRAME_TESTS(Server, Template) + +DEFINE_ALL_SERVER_TESTS(TThreadedServer, Templated) +DEFINE_ALL_SERVER_TESTS(TThreadedServer, Untemplated) +DEFINE_ALL_SERVER_TESTS(TThreadPoolServer, Templated) +DEFINE_ALL_SERVER_TESTS(TThreadPoolServer, Untemplated) + +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServer, Templated) +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServer, Untemplated) +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServerNoThreads, Templated) +DEFINE_TNONBLOCKINGSERVER_TESTS(TNonblockingServerNoThreads, Untemplated) + +DEFINE_SIMPLE_TESTS(TSimpleServer, Templated); +DEFINE_SIMPLE_TESTS(TSimpleServer, Untemplated); +DEFINE_NOFRAME_TESTS(TSimpleServer, Templated); +DEFINE_NOFRAME_TESTS(TSimpleServer, Untemplated); + +// TODO: We should test TEventServer in the future. +// For now, it is known not to work correctly with TProcessorEventHandler. + +unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) { + unit_test::framework::master_test_suite().p_name.value = + "ProcessorTest"; + + return NULL; +} diff --git a/lib/cpp/src/thrift/processor/test/ServerThread.cpp b/lib/cpp/src/thrift/processor/test/ServerThread.cpp new file mode 100644 index 00000000..d90fdf3e --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/ServerThread.cpp @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TEST_SERVERTHREAD_TCC_ +#define _THRIFT_TEST_SERVERTHREAD_TCC_ 1 + +#include "processor/test/ServerThread.h" + +#include "concurrency/PosixThreadFactory.h" +#include "concurrency/ThreadManager.h" +#include "server/TThreadPoolServer.h" +#include "transport/TBufferTransports.h" +#include "transport/TServerSocket.h" + +namespace apache { namespace thrift { namespace test { + +void ServerThread::start() { + assert(!running_); + running_ = true; + + // Start the other thread + concurrency::PosixThreadFactory threadFactory; + threadFactory.setDetached(false); + thread_ = threadFactory.newThread(helper_); + + thread_->start(); + + // Wait on the other thread to tell us that it has successfully + // bound to the port and started listening (or until an error occurs). + concurrency::Synchronized s(serverMonitor_); + while (!serving_ && !error_) { + serverMonitor_.waitForever(); + } + + if (error_) { + throw transport::TTransportException( + transport::TTransportException::NOT_OPEN, + "failed to bind on server socket"); + } +} + +void ServerThread::stop() { + if (!running_) { + return; + } + + // Tell the server to stop + server_->stop(); + running_ = false; + + // Wait for the server thread to exit + // + // Note: this only works if all client connections have closed. The servers + // generally wait for everything to be closed before exiting; there currently + // isn't a way to tell them to just exit now, and shut down existing + // connections. + thread_->join(); +} + +void ServerThread::run() { + /* + * Try binding to several ports, in case the one we want is already in use. + */ + port_ = 12345; + unsigned int maxRetries = 10; + for (unsigned int n = 0; n < maxRetries; ++n) { + // Create the server + server_ = serverState_->createServer(port_); + // Install our helper as the server event handler, so that our + // preServe() method will be called once we've successfully bound to + // the port and are about to start listening. + server_->setServerEventHandler(helper_); + + try { + // Try to serve requests + server_->serve(); + } catch (const TException& x) { + // TNonblockingServer throws a generic TException if it fails to bind. + // If we get a TException, we'll optimistically assume the bind failed. + ++port_; + continue; + } + + // Seriously? serve() is pretty lame. If it fails to start serving it + // just returns rather than throwing an exception. + // + // We have to use our preServe() hook to tell if serve() successfully + // started serving and is returning because stop() is called, or if it just + // failed to start serving in the first place. + concurrency::Synchronized s(serverMonitor_); + if (serving_) { + // Oh good, we started serving and are exiting because + // we're trying to stop. + serving_ = false; + return; + } else { + // We never started serving, probably because we failed to bind to the + // port. Increment the port number and try again. + ++port_; + continue; + } + } + + // We failed to bind on any port. + concurrency::Synchronized s(serverMonitor_); + error_ = true; + serverMonitor_.notify(); +} + +void ServerThread::preServe() { + // We bound to the port successfully, and are about to start serving requests + serverState_->bindSuccessful(port_); + + // Set the real server event handler (replacing ourself) + boost::shared_ptr serverEventHandler = + serverState_->getServerEventHandler(); + server_->setServerEventHandler(serverEventHandler); + + // Notify the main thread that we have successfully started serving requests + concurrency::Synchronized s(serverMonitor_); + serving_ = true; + serverMonitor_.notify(); + + // Invoke preServe() on the real event handler, since we ate + // the original preServe() event. + if (serverEventHandler) { + serverEventHandler->preServe(); + } +} + +}}} // apache::thrift::test + +#endif // _THRIFT_TEST_SERVERTHREAD_TCC_ diff --git a/lib/cpp/src/thrift/processor/test/ServerThread.h b/lib/cpp/src/thrift/processor/test/ServerThread.h new file mode 100644 index 00000000..76ceded8 --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/ServerThread.h @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TEST_SERVERTHREAD_H_ +#define _THRIFT_TEST_SERVERTHREAD_H_ 1 + +#include "TProcessor.h" +#include "protocol/TProtocol.h" +#include "server/TServer.h" +#include "transport/TTransport.h" + +#include "processor/test/EventLog.h" + +namespace apache { namespace thrift { namespace test { + +/** + * A helper class to tell ServerThread how to create the server + */ +class ServerState { + public: + virtual ~ServerState() {} + + /** + * Create a server to listen on the specified port. + * + * If the server returned fails to bind to the specified port when serve() is + * called on it, createServer() may be called again on a different port. + */ + virtual boost::shared_ptr createServer(uint16_t port) = 0; + + /** + * Get the TServerEventHandler to set on the server. + * + * This is only called after the server successfully binds and is about to + * start serving traffic. It is invoked from the server thread, rather than + * the main thread. + */ + virtual boost::shared_ptr + getServerEventHandler() { + return boost::shared_ptr(); + } + + /** + * This method is called in the server thread after server binding succeeds. + * + * Subclasses may override this method if they wish to record the final + * port that was used for the server. + */ + virtual void bindSuccessful(uint16_t port) { + } +}; + +/** + * ServerThread starts a thrift server running in a separate thread. + */ +class ServerThread { + public: + ServerThread(const boost::shared_ptr& state, bool autoStart) : + helper_(new Helper(this)), + port_(0), + running_(false), + serving_(false), + error_(false), + serverState_(state) { + if (autoStart) { + start(); + } + } + + void start(); + void stop(); + + uint16_t getPort() const { + return port_; + } + + ~ServerThread() { + if (running_) { + try { + stop(); + } catch (...) { + GlobalOutput.printf("error shutting down server"); + } + } + } + + protected: + // Annoying. thrift forces us to use shared_ptr, so we have to use + // a helper class that we can allocate on the heap and give to thrift. + // It would be simpler if we could just make Runnable and TServerEventHandler + // private base classes of ServerThread. + class Helper : public concurrency::Runnable, + public server::TServerEventHandler { + public: + Helper(ServerThread* serverThread) + : serverThread_(serverThread) {} + + void run() { + serverThread_->run(); + } + + void preServe() { + serverThread_->preServe(); + } + + private: + ServerThread* serverThread_; + }; + + void run(); + void preServe(); + + boost::shared_ptr helper_; + + uint16_t port_; + bool running_; + bool serving_; + bool error_; + concurrency::Monitor serverMonitor_; + + boost::shared_ptr serverState_; + boost::shared_ptr server_; + boost::shared_ptr thread_; +}; + +}}} // apache::thrift::test + +#endif // _THRIFT_TEST_SERVERTHREAD_H_ diff --git a/lib/cpp/src/thrift/processor/test/proc.thrift b/lib/cpp/src/thrift/processor/test/proc.thrift new file mode 100644 index 00000000..ac3c5f95 --- /dev/null +++ b/lib/cpp/src/thrift/processor/test/proc.thrift @@ -0,0 +1,22 @@ +namespace cpp apache.thrift.test + +exception MyError { + 1: string message +} + +service ParentService { + i32 incrementGeneration() + i32 getGeneration() + void addString(1: string s) + list getStrings() + + binary getDataWait(1: i32 length) + oneway void onewayWait() + void exceptionWait(1: string message) throws (2: MyError error) + void unexpectedExceptionWait(1: string message) +} + +service ChildService extends ParentService { + i32 setValue(1: i32 value) + i32 getValue() +} -- 2.17.1