From b3cb62959edab16c0ee8ccfd3cf7bc45d63ef703 Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Thu, 1 Feb 2007 22:55:00 +0000 Subject: [PATCH] Adding threaded server to Thrift Summary: Spawns a new thread for each client connection Reviewed By: marc git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664965 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 4 +- lib/cpp/src/server/TThreadPoolServer.h | 5 +- lib/cpp/src/server/TThreadedServer.cpp | 110 +++++++++++++++++++++++++ lib/cpp/src/server/TThreadedServer.h | 37 +++++++++ 4 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 lib/cpp/src/server/TThreadedServer.cpp create mode 100644 lib/cpp/src/server/TThreadedServer.h diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 20abe527..45d7bfa5 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -17,7 +17,8 @@ libthrift_sources = src/concurrency/Mutex.cpp \ src/transport/TServerSocket.cpp \ src/transport/TTransportUtils.cpp \ src/server/TSimpleServer.cpp \ - src/server/TThreadPoolServer.cpp + src/server/TThreadPoolServer.cpp \ + src/server/TThreadedServer.cpp libthriftnb_sources = src/server/TNonblockingServer.cpp @@ -69,6 +70,7 @@ include_server_HEADERS = \ src/server/TServer.h \ src/server/TSimpleServer.h \ src/server/TThreadPoolServer.h \ + src/server/TThreadedServer.h \ src/server/TNonblockingServer.h bin_PROGRAMS = concurrency_test diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h index e7e71484..7eb87d94 100644 --- a/lib/cpp/src/server/TThreadPoolServer.h +++ b/lib/cpp/src/server/TThreadPoolServer.h @@ -14,8 +14,7 @@ using namespace facebook::thrift::transport; using namespace boost; class TThreadPoolServer : public TServer { -public: - + public: class Task; TThreadPoolServer(shared_ptr processor, @@ -36,7 +35,7 @@ public: virtual void serve(); -protected: + protected: shared_ptr threadManager_; diff --git a/lib/cpp/src/server/TThreadedServer.cpp b/lib/cpp/src/server/TThreadedServer.cpp new file mode 100644 index 00000000..a06117ef --- /dev/null +++ b/lib/cpp/src/server/TThreadedServer.cpp @@ -0,0 +1,110 @@ +#include "server/TThreadedServer.h" +#include "transport/TTransportException.h" +#include "concurrency/PosixThreadFactory.h" + +#include +#include +#include +#include + +namespace facebook { namespace thrift { namespace server { + +using namespace std; +using namespace facebook::thrift; +using namespace facebook::thrift::transport; +using namespace facebook::thrift::concurrency; + +class TThreadedServer::Task: public Runnable { + +public: + + Task(shared_ptr processor, + shared_ptr input, + shared_ptr output) : + processor_(processor), + input_(input), + output_(output) { + } + + ~Task() {} + + void run() { + try { + while (processor_->process(input_, output_)) { + if (!input_->getTransport()->peek()) { + break; + } + } + } catch (TTransportException& ttx) { + cerr << "TThreadedServer client died: " << ttx.what() << endl; + } catch (TException& x) { + cerr << "TThreadedServer exception: " << x.what() << endl; + } catch (...) { + cerr << "TThreadedServer uncaught exception." << endl; + } + input_->getTransport()->close(); + output_->getTransport()->close(); + } + + private: + shared_ptr processor_; + shared_ptr input_; + shared_ptr output_; + +}; + + +TThreadedServer::TThreadedServer(shared_ptr processor, + shared_ptr serverTransport, + shared_ptr transportFactory, + shared_ptr protocolFactory): + TServer(processor, serverTransport, transportFactory, protocolFactory) { + threadFactory_ = shared_ptr(new PosixThreadFactory()); +} + +TThreadedServer::~TThreadedServer() {} + +void TThreadedServer::serve() { + + shared_ptr client; + shared_ptr inputTransport; + shared_ptr outputTransport; + shared_ptr inputProtocol; + shared_ptr outputProtocol; + + try { + // Start the server listening + serverTransport_->listen(); + } catch (TTransportException& ttx) { + cerr << "TThreadedServer::run() listen(): " << ttx.what() << endl; + return; + } + + while (true) { + try { + // Fetch client from server + client = serverTransport_->accept(); + // Make IO transports + inputTransport = inputTransportFactory_->getTransport(client); + outputTransport = outputTransportFactory_->getTransport(client); + inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + + TThreadedServer::Task* t = new TThreadedServer::Task(processor_, + inputProtocol, + outputProtocol); + + // Create a thread for this task + shared_ptr thread = + shared_ptr(threadFactory_->newThread(shared_ptr(t))); + + // Start the thread! + thread->start(); + + } catch (TTransportException& ttx) { + break; + } + } +} + +}}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TThreadedServer.h b/lib/cpp/src/server/TThreadedServer.h new file mode 100644 index 00000000..fb7ef64b --- /dev/null +++ b/lib/cpp/src/server/TThreadedServer.h @@ -0,0 +1,37 @@ +#ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_ +#define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1 + +#include +#include +#include + +#include + +namespace facebook { namespace thrift { namespace server { + +using namespace facebook::thrift::transport; +using namespace facebook::thrift::concurrency; +using namespace boost; + +class TThreadedServer : public TServer { + + public: + class Task; + + TThreadedServer(shared_ptr processor, + shared_ptr serverTransport, + shared_ptr transportFactory, + shared_ptr protocolFactory); + + virtual ~TThreadedServer(); + + virtual void serve(); + + protected: + shared_ptr threadFactory_; + +}; + +}}} // facebook::thrift::server + +#endif // #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_ -- 2.17.1