From 2f6404d9ae0e5e2b813a2cedcf96edc5c643784f Mon Sep 17 00:00:00 2001 From: Mark Slee Date: Tue, 10 Oct 2006 01:37:40 +0000 Subject: [PATCH] C++ Thrift coding style changes Summary: Make underscore for class members consistent git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664818 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/Makefile.am | 10 +- lib/cpp/src/Thrift.h | 16 +- lib/cpp/src/concurrency/Monitor.cc | 60 +-- lib/cpp/src/concurrency/Monitor.h | 11 +- lib/cpp/src/concurrency/Mutex.cc | 26 +- lib/cpp/src/concurrency/Mutex.h | 10 +- lib/cpp/src/concurrency/PosixThreadFactory.cc | 88 ++-- lib/cpp/src/concurrency/PosixThreadFactory.h | 2 +- lib/cpp/src/concurrency/Thread.h | 6 +- lib/cpp/src/concurrency/ThreadManager.cc | 232 ++++----- lib/cpp/src/concurrency/TimerManager.cc | 142 +++--- lib/cpp/src/concurrency/TimerManager.h | 14 +- lib/cpp/src/server/TNonblockingServer.cc | 476 ++++++++++++++++++ lib/cpp/src/server/TNonblockingServer.h | 195 +++++++ lib/cpp/src/server/TServer.h | 4 + lib/cpp/src/server/TThreadPoolServer.cc | 24 +- lib/cpp/src/transport/TMemoryBuffer.cc | 45 ++ lib/cpp/src/transport/TMemoryBuffer.h | 116 +++++ lib/cpp/src/transport/TTransport.h | 7 +- 19 files changed, 1173 insertions(+), 311 deletions(-) create mode 100644 lib/cpp/src/server/TNonblockingServer.cc create mode 100644 lib/cpp/src/server/TNonblockingServer.h create mode 100644 lib/cpp/src/transport/TMemoryBuffer.cc create mode 100644 lib/cpp/src/transport/TMemoryBuffer.h diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 4de829f1..2cb47593 100644 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -1,7 +1,7 @@ lib_LTLIBRARIES = libthrift.la common_cxxflags = -Wall -Isrc $(BOOST_CPPFLAGS) -common_ldflags = -Wall $(BOOST_LDFLAGS) +common_ldflags = -Wall $(BOOST_LDFLAGS) -levent # Define the source file for the module @@ -12,10 +12,12 @@ libthrift_sources = src/concurrency/Monitor.cc \ src/protocol/TBinaryProtocol.cc \ src/transport/TBufferedTransport.cc \ src/transport/TFramedTransport.cc \ + src/transport/TMemoryBuffer.cc \ src/transport/TSocket.cc \ src/transport/TServerSocket.cc \ src/server/TSimpleServer.cc \ - src/server/TThreadPoolServer.cc + src/server/TThreadPoolServer.cc \ + src/server/TNonblockingServer.cc libthrift_la_SOURCES = $(libthrift_sources) @@ -50,6 +52,7 @@ include_transport_HEADERS = \ src/transport/TBufferedTransport.h \ src/transport/TFramedTransport.h \ src/transport/TNullTransport.h \ + src/transport/TMemoryBuffer.h \ src/transport/TServerSocket.h \ src/transport/TServerTransport.h \ src/transport/TSocket.h \ @@ -62,7 +65,8 @@ include_serverdir = $(include_thriftdir)/server include_server_HEADERS = \ src/server/TServer.h \ src/server/TSimpleServer.h \ - src/server/TThreadPoolServer.h + src/server/TThreadPoolServer.h \ + src/server/TNonblockingServer.h bin_PROGRAMS = concurrency_test diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h index 38226dcc..c21b0a5e 100644 --- a/lib/cpp/src/Thrift.h +++ b/lib/cpp/src/Thrift.h @@ -13,13 +13,19 @@ namespace facebook { namespace thrift { class Exception : public std::exception { -private: - const std::string _message; - public: - Exception(const std::string message) : _message(message) {} + Exception(const std::string message) : + message_(message) {} + ~Exception() throw () {} - const char* what() {return _message.c_str();} + + const char* what() { + return message_.c_str(); + } + +private: + const std::string message_; + }; }} // facebook::thrift diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc index 57532a3c..518c77f1 100644 --- a/lib/cpp/src/concurrency/Monitor.cc +++ b/lib/cpp/src/concurrency/Monitor.cc @@ -22,14 +22,14 @@ class Monitor::Impl { public: Impl() : - mutexInitialized(false), - condInitialized(false) { + mutexInitialized_(false), + condInitialized_(false) { try { - assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); - mutexInitialized = true; - assert(pthread_cond_init(&_pthread_cond, NULL) == 0); - condInitialized = true; + assert(pthread_mutex_init(&pthread_mutex_, NULL) == 0); + mutexInitialized_ = true; + assert(pthread_cond_init(&pthread_cond_, NULL) == 0); + condInitialized_ = true; } catch(...) { cleanup(); } @@ -37,21 +37,23 @@ class Monitor::Impl { ~Impl() { cleanup(); } - void lock() const { pthread_mutex_lock(&_pthread_mutex); } + void lock() const { pthread_mutex_lock(&pthread_mutex_); } - void unlock() const { pthread_mutex_unlock(&_pthread_mutex); } + void unlock() const { pthread_mutex_unlock(&pthread_mutex_); } void wait(long long timeout) const { // XXX Need to assert that caller owns mutex assert(timeout >= 0LL); if (timeout == 0LL) { - assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0); + assert(pthread_cond_wait(&pthread_cond_, &pthread_mutex_) == 0); } else { struct timespec abstime; long long now = Util::currentTime(); Util::toTimespec(abstime, now + timeout); - int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime); + int result = pthread_cond_timedwait(&pthread_cond_, + &pthread_mutex_, + &abstime); if (result == ETIMEDOUT) { assert(Util::currentTime() >= (now + timeout)); } @@ -60,46 +62,46 @@ class Monitor::Impl { void notify() { // XXX Need to assert that caller owns mutex - assert(pthread_cond_signal(&_pthread_cond) == 0); + assert(pthread_cond_signal(&pthread_cond_) == 0); } void notifyAll() { // XXX Need to assert that caller owns mutex - assert(pthread_cond_broadcast(&_pthread_cond) == 0); + assert(pthread_cond_broadcast(&pthread_cond_) == 0); } private: void cleanup() { - if (mutexInitialized) { - mutexInitialized = false; - assert(pthread_mutex_destroy(&_pthread_mutex) == 0); + if (mutexInitialized_) { + mutexInitialized_ = false; + assert(pthread_mutex_destroy(&pthread_mutex_) == 0); } - if (condInitialized) { - condInitialized = false; - assert(pthread_cond_destroy(&_pthread_cond) == 0); + if (condInitialized_) { + condInitialized_ = false; + assert(pthread_cond_destroy(&pthread_cond_) == 0); } } - mutable pthread_mutex_t _pthread_mutex; - mutable bool mutexInitialized; - mutable pthread_cond_t _pthread_cond; - mutable bool condInitialized; + mutable pthread_mutex_t pthread_mutex_; + mutable bool mutexInitialized_; + mutable pthread_cond_t pthread_cond_; + mutable bool condInitialized_; }; -Monitor::Monitor() : _impl(new Monitor::Impl()) {} +Monitor::Monitor() : impl_(new Monitor::Impl()) {} -Monitor::~Monitor() { delete _impl; } +Monitor::~Monitor() { delete impl_; } -void Monitor::lock() const { _impl->lock(); } +void Monitor::lock() const { impl_->lock(); } -void Monitor::unlock() const { _impl->unlock(); } +void Monitor::unlock() const { impl_->unlock(); } -void Monitor::wait(long long timeout) const { _impl->wait(timeout); } +void Monitor::wait(long long timeout) const { impl_->wait(timeout); } -void Monitor::notify() const { _impl->notify(); } +void Monitor::notify() const { impl_->notify(); } -void Monitor::notifyAll() const { _impl->notifyAll(); } +void Monitor::notifyAll() const { impl_->notifyAll(); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h index 62a83446..e67b71bd 100644 --- a/lib/cpp/src/concurrency/Monitor.h +++ b/lib/cpp/src/concurrency/Monitor.h @@ -39,22 +39,23 @@ class Monitor { class Impl; - Impl* _impl; + Impl* impl_; }; class Synchronized { public: - Synchronized(const Monitor& value) : _monitor(value) { - _monitor.lock(); + Synchronized(const Monitor& value) : + monitor_(value) { + monitor_.lock(); } ~Synchronized() { - _monitor.unlock(); + monitor_.unlock(); } private: - const Monitor& _monitor; + const Monitor& monitor_; }; diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc index 416341e1..1f116a3f 100644 --- a/lib/cpp/src/concurrency/Mutex.cc +++ b/lib/cpp/src/concurrency/Mutex.cc @@ -13,32 +13,32 @@ namespace facebook { namespace thrift { namespace concurrency { */ class Mutex::impl { public: - impl() : initialized(false) { - assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0); - initialized = true; + impl() : initialized_(false) { + assert(pthread_mutex_init(&pthread_mutex_, NULL) == 0); + initialized_ = true; } ~impl() { - if (initialized) { - initialized = false; - assert(pthread_mutex_destroy(&_pthread_mutex) == 0); + if (initialized_) { + initialized_ = false; + assert(pthread_mutex_destroy(&pthread_mutex_) == 0); } } - void lock() const { pthread_mutex_lock(&_pthread_mutex); } + void lock() const { pthread_mutex_lock(&pthread_mutex_); } - void unlock() const { pthread_mutex_unlock(&_pthread_mutex); } + void unlock() const { pthread_mutex_unlock(&pthread_mutex_); } private: - mutable pthread_mutex_t _pthread_mutex; - mutable bool initialized; + mutable pthread_mutex_t pthread_mutex_; + mutable bool initialized_; }; -Mutex::Mutex() : _impl(new Mutex::impl()) {} +Mutex::Mutex() : impl_(new Mutex::impl()) {} -void Mutex::lock() const { _impl->lock(); } +void Mutex::lock() const { impl_->lock(); } -void Mutex::unlock() const { _impl->unlock(); } +void Mutex::unlock() const { impl_->unlock(); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h index 9eceb49d..de52bbd6 100644 --- a/lib/cpp/src/concurrency/Mutex.h +++ b/lib/cpp/src/concurrency/Mutex.h @@ -18,20 +18,20 @@ class Mutex { private: class impl; - impl* _impl; + impl* impl_; }; class MutexMonitor { public: - MutexMonitor(const Mutex& value) : _mutex(value) { - _mutex.lock(); + MutexMonitor(const Mutex& value) : mutex_(value) { + mutex_.lock(); } ~MutexMonitor() { - _mutex.unlock(); + mutex_.unlock(); } private: - const Mutex& _mutex; + const Mutex& mutex_; }; diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc index 130976c9..74a3ec31 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.cc +++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc @@ -33,21 +33,21 @@ class PthreadThread: public Thread { static void* threadMain(void* arg); private: - pthread_t _pthread; - STATE _state; - int _policy; - int _priority; - int _stackSize; - weak_ptr _self; + pthread_t pthread_; + STATE state_; + int policy_; + int priority_; + int stackSize_; + weak_ptr self_; public: PthreadThread(int policy, int priority, int stackSize, shared_ptr runnable) : - _pthread(0), - _state(uninitialized), - _policy(policy), - _priority(priority), - _stackSize(stackSize) { + pthread_(0), + state_(uninitialized), + policy_(policy), + priority_(priority), + stackSize_(stackSize) { this->Thread::runnable(runnable); } @@ -55,38 +55,38 @@ class PthreadThread: public Thread { ~PthreadThread() {} void start() { - if (_state != uninitialized) { + if (state_ != uninitialized) { return; } - _state = starting; + state_ = starting; pthread_attr_t thread_attr; assert(pthread_attr_init(&thread_attr) == 0); assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); // Set thread stack size - assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); + assert(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) == 0); // Set thread policy - assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); + assert(pthread_attr_setschedpolicy(&thread_attr, policy_) == 0); struct sched_param sched_param; - sched_param.sched_priority = _priority; + sched_param.sched_priority = priority_; // Set thread priority assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); // Create reference shared_ptr* selfRef = new shared_ptr(); - *selfRef = _self.lock(); - assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0); + *selfRef = self_.lock(); + assert(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) == 0); } void join() { - if (_state != stopped) { + if (state_ != stopped) { void* ignore; - pthread_join(_pthread, &ignore); + pthread_join(pthread_, &ignore); } } @@ -96,7 +96,7 @@ class PthreadThread: public Thread { void weakRef(shared_ptr self) { assert(self.get() == this); - _self = weak_ptr(self); + self_ = weak_ptr(self); } }; @@ -109,14 +109,14 @@ void* PthreadThread::threadMain(void* arg) { return (void*)0; } - if (thread->_state != starting) { + if (thread->state_ != starting) { return (void*)0; } - thread->_state = starting; + thread->state_ = starting; thread->runnable()->run(); - if (thread->_state != stopping && thread->_state != stopped) { - thread->_state = stopping; + if (thread->state_ != stopping && thread->state_ != stopped) { + thread->state_ = stopping; } return (void*)0; @@ -128,10 +128,10 @@ void* PthreadThread::threadMain(void* arg) { class PosixThreadFactory::Impl { private: - POLICY _policy; - PRIORITY _priority; - int _stackSize; - bool _detached; + POLICY policy_; + PRIORITY priority_; + int stackSize_; + bool detached_; /** * Converts generic posix thread schedule policy enums into pthread @@ -173,10 +173,10 @@ class PosixThreadFactory::Impl { public: Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : - _policy(policy), - _priority(priority), - _stackSize(stackSize), - _detached(detached) {} + policy_(policy), + priority_(priority), + stackSize_(stackSize), + detached_(detached) {} /** * Creates a new POSIX thread to run the runnable object @@ -184,17 +184,17 @@ class PosixThreadFactory::Impl { * @param runnable A runnable object */ shared_ptr newThread(shared_ptr runnable) const { - shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable)); + shared_ptr result = shared_ptr(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable)); result->weakRef(result); runnable->thread(result); return result; } - int stackSize() const { return _stackSize; } + int stackSize() const { return stackSize_; } - void stackSize(int value) { _stackSize = value; } + void stackSize(int value) { stackSize_ = value; } - PRIORITY priority() const { return _priority; } + PRIORITY priority() const { return priority_; } /** * Sets priority. @@ -202,20 +202,20 @@ class PosixThreadFactory::Impl { * XXX * Need to handle incremental priorities properly. */ - void priority(PRIORITY value) { _priority = value; } + void priority(PRIORITY value) { priority_ = value; } }; PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : - _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} + impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} -shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const { return _impl->newThread(runnable); } +shared_ptr PosixThreadFactory::newThread(shared_ptr runnable) const { return impl_->newThread(runnable); } -int PosixThreadFactory::stackSize() const { return _impl->stackSize(); } +int PosixThreadFactory::stackSize() const { return impl_->stackSize(); } -void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); } +void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); } -PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); } +PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); } -void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); } +void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h index a56999c8..4ad99339 100644 --- a/lib/cpp/src/concurrency/PosixThreadFactory.h +++ b/lib/cpp/src/concurrency/PosixThreadFactory.h @@ -79,7 +79,7 @@ class PosixThreadFactory : public ThreadFactory { private: class Impl; - shared_ptr _impl; + shared_ptr impl_; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h index 24d59082..600b5089 100644 --- a/lib/cpp/src/concurrency/Thread.h +++ b/lib/cpp/src/concurrency/Thread.h @@ -26,16 +26,16 @@ class Runnable { * Gets the thread object that is hosting this runnable object - can return * an empty shared pointer if no references remain on thet thread object */ - virtual shared_ptr thread() { return _thread.lock(); } + virtual shared_ptr thread() { return thread_.lock(); } /** * Sets the thread that is executing this object. This is only meant for * use by concrete implementations of Thread. */ - virtual void thread(shared_ptr value) { _thread = value; } + virtual void thread(shared_ptr value) { thread_ = value; } private: - weak_ptr _thread; + weak_ptr thread_; }; /** diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc index 7d6fef75..895d1cda 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cc +++ b/lib/cpp/src/concurrency/ThreadManager.cc @@ -32,10 +32,10 @@ class ThreadManager::Impl : public ThreadManager { public: Impl() : - _workerCount(0), - _workerMaxCount(0), - _idleCount(0), - _state(ThreadManager::UNINITIALIZED) {} + workerCount_(0), + workerMaxCount_(0), + idleCount_(0), + state_(ThreadManager::UNINITIALIZED) {} ~Impl() { stop(); } @@ -43,37 +43,41 @@ class ThreadManager::Impl : public ThreadManager { void stop(); - const ThreadManager::STATE state() const { return _state; } + const ThreadManager::STATE state() const { + return state_; + } shared_ptr threadFactory() const { - Synchronized s(_monitor); - return _threadFactory; + Synchronized s(monitor_); + return threadFactory_; } void threadFactory(shared_ptr value) { - Synchronized s(_monitor); - _threadFactory = value; + Synchronized s(monitor_); + threadFactory_ = value; } void addWorker(size_t value); void removeWorker(size_t value); - size_t idleWorkerCount() const { return _idleCount; } + size_t idleWorkerCount() const { + return idleCount_; + } size_t workerCount() const { - Synchronized s(_monitor); - return _workerCount; + Synchronized s(monitor_); + return workerCount_; } size_t pendingTaskCount() const { - Synchronized s(_monitor); - return _tasks.size(); + Synchronized s(monitor_); + return tasks_.size(); } size_t totalTaskCount() const { - Synchronized s(_monitor); - return _tasks.size() + _workerCount - _idleCount; + Synchronized s(monitor_); + return tasks_.size() + workerCount_ - idleCount_; } void add(shared_ptr value); @@ -81,21 +85,21 @@ class ThreadManager::Impl : public ThreadManager { void remove(shared_ptr task); private: - size_t _workerCount; - size_t _workerMaxCount; - size_t _idleCount; - ThreadManager::STATE _state; - shared_ptr _threadFactory; + size_t workerCount_; + size_t workerMaxCount_; + size_t idleCount_; + ThreadManager::STATE state_; + shared_ptr threadFactory_; friend class ThreadManager::Task; - std::queue > _tasks; - Monitor _monitor; - Monitor _workerMonitor; + std::queue > tasks_; + Monitor monitor_; + Monitor workerMonitor_; friend class ThreadManager::Worker; - std::set > _workers; - std::set > _deadWorkers; + std::set > workers_; + std::set > deadWorkers_; }; class ThreadManager::Task : public Runnable { @@ -109,22 +113,22 @@ class ThreadManager::Task : public Runnable { }; Task(shared_ptr runnable) : - _runnable(runnable), - _state(WAITING) {} + runnable_(runnable), + state_(WAITING) {} ~Task() {} void run() { - if (_state == EXECUTING) { - _runnable->run(); - _state = COMPLETE; + if (state_ == EXECUTING) { + runnable_->run(); + state_ = COMPLETE; } } private: - shared_ptr _runnable; + shared_ptr runnable_; friend class ThreadManager::Worker; - STATE _state; + STATE state_; }; class ThreadManager::Worker: public Runnable { @@ -138,13 +142,15 @@ class ThreadManager::Worker: public Runnable { public: Worker(ThreadManager::Impl* manager) : - _manager(manager), - _state(UNINITIALIZED), - _idle(false) {} + manager_(manager), + state_(UNINITIALIZED), + idle_(false) {} ~Worker() {} - bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; } + bool isActive() const { + return manager_->workerCount_ <= manager_->workerMaxCount_; + } /** * Worker entry point @@ -164,17 +170,17 @@ class ThreadManager::Worker: public Runnable { * since that is what the manager blocks on for worker add/remove */ { - Synchronized s(_manager->_monitor); - active = _manager->_workerCount < _manager->_workerMaxCount; + Synchronized s(manager_->monitor_); + active = manager_->workerCount_ < manager_->workerMaxCount_; if (active) { - _manager->_workerCount++; - notifyManager = _manager->_workerCount == _manager->_workerMaxCount; + manager_->workerCount_++; + notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; } } if (notifyManager) { - Synchronized s(_manager->_workerMonitor); - _manager->_workerMonitor.notify(); + Synchronized s(manager_->workerMonitor_); + manager_->workerMonitor_.notify(); notifyManager = false; } @@ -191,34 +197,34 @@ class ThreadManager::Worker: public Runnable { * the manager will see it. */ { - Synchronized s(_manager->_monitor); + Synchronized s(manager_->monitor_); active = isActive(); - while (active && _manager->_tasks.empty()) { - _manager->_idleCount++; - _idle = true; - _manager->_monitor.wait(); + while (active && manager_->tasks_.empty()) { + manager_->idleCount_++; + idle_ = true; + manager_->monitor_.wait(); active = isActive(); - _idle = false; - _manager->_idleCount--; + idle_ = false; + manager_->idleCount_--; } if (active) { - if (!_manager->_tasks.empty()) { - task = _manager->_tasks.front(); - _manager->_tasks.pop(); - if (task->_state == ThreadManager::Task::WAITING) { - task->_state = ThreadManager::Task::EXECUTING; + if (!manager_->tasks_.empty()) { + task = manager_->tasks_.front(); + manager_->tasks_.pop(); + if (task->state_ == ThreadManager::Task::WAITING) { + task->state_ = ThreadManager::Task::EXECUTING; } } } else { - _idle = true; - _manager->_workerCount--; - notifyManager = _manager->_workerCount == _manager->_workerMaxCount; + idle_ = true; + manager_->workerCount_--; + notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; } } if (task != NULL) { - if (task->_state == ThreadManager::Task::EXECUTING) { + if (task->state_ == ThreadManager::Task::EXECUTING) { try { task->run(); } catch(...) { @@ -229,10 +235,10 @@ class ThreadManager::Worker: public Runnable { } { - Synchronized s(_manager->_workerMonitor); - _manager->_deadWorkers.insert(this->thread()); + Synchronized s(manager_->workerMonitor_); + manager_->deadWorkers_.insert(this->thread()); if (notifyManager) { - _manager->_workerMonitor.notify(); + manager_->workerMonitor_.notify(); } } @@ -240,10 +246,10 @@ class ThreadManager::Worker: public Runnable { } private: - ThreadManager::Impl* _manager; + ThreadManager::Impl* manager_; friend class ThreadManager::Impl; - STATE _state; - bool _idle; + STATE state_; + bool idle_; }; @@ -252,68 +258,68 @@ class ThreadManager::Worker: public Runnable { for (size_t ix = 0; ix < value; ix++) { class ThreadManager::Worker; shared_ptr worker = shared_ptr(new ThreadManager::Worker(this)); - newThreads.insert(_threadFactory->newThread(worker)); + newThreads.insert(threadFactory_->newThread(worker)); } { - Synchronized s(_monitor); - _workerMaxCount+= value; - _workers.insert(newThreads.begin(), newThreads.end()); + Synchronized s(monitor_); + workerMaxCount_ += value; + workers_.insert(newThreads.begin(), newThreads.end()); } for (std::set >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { shared_ptr worker = dynamic_pointer_cast((*ix)->runnable()); - worker->_state = ThreadManager::Worker::STARTING; + worker->state_ = ThreadManager::Worker::STARTING; (*ix)->start(); } { - Synchronized s(_workerMonitor); - while (_workerCount != _workerMaxCount) { - _workerMonitor.wait(); + Synchronized s(workerMonitor_); + while (workerCount_ != workerMaxCount_) { + workerMonitor_.wait(); } } } void ThreadManager::Impl::start() { - if (_state == ThreadManager::STOPPED) { + if (state_ == ThreadManager::STOPPED) { return; } { - Synchronized s(_monitor); - if (_state == ThreadManager::UNINITIALIZED) { - if (_threadFactory == NULL) { + Synchronized s(monitor_); + if (state_ == ThreadManager::UNINITIALIZED) { + if (threadFactory_ == NULL) { throw InvalidArgumentException(); } - _state = ThreadManager::STARTED; - _monitor.notifyAll(); + state_ = ThreadManager::STARTED; + monitor_.notifyAll(); } - while (_state == STARTING) { - _monitor.wait(); + while (state_ == STARTING) { + monitor_.wait(); } } } void ThreadManager::Impl::stop() { bool doStop = false; - if (_state == ThreadManager::STOPPED) { + if (state_ == ThreadManager::STOPPED) { return; } { - Synchronized s(_monitor); - if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) { + Synchronized s(monitor_); + if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) { doStop = true; - _state = ThreadManager::STOPPING; + state_ = ThreadManager::STOPPING; } } if (doStop) { - removeWorker(_workerCount); - _state = ThreadManager::STOPPING; + removeWorker(workerCount_); + state_ = ThreadManager::STOPPING; } // XXX @@ -324,56 +330,56 @@ void ThreadManager::Impl::stop() { void ThreadManager::Impl::removeWorker(size_t value) { std::set > removedThreads; { - Synchronized s(_monitor); - if (value > _workerMaxCount) { + Synchronized s(monitor_); + if (value > workerMaxCount_) { throw InvalidArgumentException(); } - _workerMaxCount-= value; + workerMaxCount_ -= value; - if (_idleCount < value) { - for (size_t ix = 0; ix < _idleCount; ix++) { - _monitor.notify(); + if (idleCount_ < value) { + for (size_t ix = 0; ix < idleCount_; ix++) { + monitor_.notify(); } } else { - _monitor.notifyAll(); + monitor_.notifyAll(); } } { - Synchronized s(_workerMonitor); + Synchronized s(workerMonitor_); - while (_workerCount != _workerMaxCount) { - _workerMonitor.wait(); + while (workerCount_ != workerMaxCount_) { + workerMonitor_.wait(); } - for (std::set >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { - _workers.erase(*ix); + for (std::set >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) { + workers_.erase(*ix); } - _deadWorkers.clear(); + deadWorkers_.clear(); } } void ThreadManager::Impl::add(shared_ptr value) { - Synchronized s(_monitor); + Synchronized s(monitor_); - if (_state != ThreadManager::STARTED) { + if (state_ != ThreadManager::STARTED) { throw IllegalStateException(); } - _tasks.push(shared_ptr(new ThreadManager::Task(value))); + tasks_.push(shared_ptr(new ThreadManager::Task(value))); // If idle thread is available notify it, otherwise all worker threads are // running and will get around to this task in time. - if (_idleCount > 0) { - _monitor.notify(); + if (idleCount_ > 0) { + monitor_.notify(); } } void ThreadManager::Impl::remove(shared_ptr task) { - Synchronized s(_monitor); - if (_state != ThreadManager::STARTED) { + Synchronized s(monitor_); + if (state_ != ThreadManager::STARTED) { throw IllegalStateException(); } } @@ -382,19 +388,19 @@ class SimpleThreadManager : public ThreadManager::Impl { public: SimpleThreadManager(size_t workerCount=4) : - _workerCount(workerCount), - _firstTime(true) { + workerCount_(workerCount), + firstTime_(true) { } void start() { ThreadManager::Impl::start(); - addWorker(_workerCount); + addWorker(workerCount_); } private: - const size_t _workerCount; - bool _firstTime; - Monitor _monitor; + const size_t workerCount_; + bool firstTime_; + Monitor monitor_; }; diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc index f48df4eb..42864401 100644 --- a/lib/cpp/src/concurrency/TimerManager.cc +++ b/lib/cpp/src/concurrency/TimerManager.cc @@ -28,8 +28,8 @@ class TimerManager::Task : public Runnable { }; Task(shared_ptr runnable) : - _runnable(runnable), - _state(WAITING) {} + runnable_(runnable), + state_(WAITING) {} ~Task() { //debug @@ -37,24 +37,24 @@ class TimerManager::Task : public Runnable { } void run() { - if (_state == EXECUTING) { - _runnable->run(); - _state = COMPLETE; + if (state_ == EXECUTING) { + runnable_->run(); + state_ = COMPLETE; } } private: - shared_ptr _runnable; + shared_ptr runnable_; class TimerManager::Dispatcher; friend class TimerManager::Dispatcher; - STATE _state; + STATE state_; }; class TimerManager::Dispatcher: public Runnable { public: Dispatcher(TimerManager* manager) : - _manager(manager) {} + manager_(manager) {} ~Dispatcher() { // debug @@ -64,45 +64,45 @@ class TimerManager::Dispatcher: public Runnable { /** * Dispatcher entry point * - * As long as dispatcher thread is running, pull tasks off the task _taskMap + * As long as dispatcher thread is running, pull tasks off the task taskMap_ * and execute. */ void run() { { - Synchronized s(_manager->_monitor); - if (_manager->_state == TimerManager::STARTING) { - _manager->_state = TimerManager::STARTED; - _manager->_monitor.notifyAll(); + Synchronized s(manager_->monitor_); + if (manager_->state_ == TimerManager::STARTING) { + manager_->state_ = TimerManager::STARTED; + manager_->monitor_.notifyAll(); } } do { std::set > expiredTasks; { - Synchronized s(_manager->_monitor); + Synchronized s(manager_->monitor_); task_iterator expiredTaskEnd; long long now = Util::currentTime(); - while (_manager->_state == TimerManager::STARTED && - (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) { + while (manager_->state_ == TimerManager::STARTED && + (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) { long long timeout = 0LL; - if (!_manager->_taskMap.empty()) { - timeout = _manager->_taskMap.begin()->first - now; + if (!manager_->taskMap_.empty()) { + timeout = manager_->taskMap_.begin()->first - now; } - assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0)); - _manager->_monitor.wait(timeout); + assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0)); + manager_->monitor_.wait(timeout); now = Util::currentTime(); } - if (_manager->_state == TimerManager::STARTED) { - for (task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { + if (manager_->state_ == TimerManager::STARTED) { + for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { shared_ptr task = ix->second; expiredTasks.insert(task); - if (task->_state == TimerManager::Task::WAITING) { - task->_state = TimerManager::Task::EXECUTING; + if (task->state_ == TimerManager::Task::WAITING) { + task->state_ = TimerManager::Task::EXECUTING; } - _manager->_taskCount--; + manager_->taskCount_--; } - _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd); + manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); } } @@ -110,27 +110,27 @@ class TimerManager::Dispatcher: public Runnable { (*ix)->run(); } - } while (_manager->_state == TimerManager::STARTED); + } while (manager_->state_ == TimerManager::STARTED); { - Synchronized s(_manager->_monitor); - if (_manager->_state == TimerManager::STOPPING) { - _manager->_state = TimerManager::STOPPED; - _manager->_monitor.notify(); + Synchronized s(manager_->monitor_); + if (manager_->state_ == TimerManager::STOPPING) { + manager_->state_ = TimerManager::STOPPED; + manager_->monitor_.notify(); } } return; } private: - TimerManager* _manager; + TimerManager* manager_; friend class TimerManager; }; TimerManager::TimerManager() : - _taskCount(0), - _state(TimerManager::UNINITIALIZED), - _dispatcher(shared_ptr(new Dispatcher(this))) { + taskCount_(0), + state_(TimerManager::UNINITIALIZED), + dispatcher_(shared_ptr(new Dispatcher(this))) { } @@ -140,7 +140,7 @@ TimerManager::~TimerManager() { // the monitor here, since stop already takes care of reentrancy. std::cerr << "TimerManager::dtor[" << this << "]" << std::endl; - if (_state != STOPPED) { + if (state_ != STOPPED) { try { stop(); } catch(...) { @@ -154,69 +154,69 @@ TimerManager::~TimerManager() { void TimerManager::start() { bool doStart = false; { - Synchronized s(_monitor); - if (_threadFactory == NULL) { + Synchronized s(monitor_); + if (threadFactory_ == NULL) { throw InvalidArgumentException(); } - if (_state == TimerManager::UNINITIALIZED) { - _state = TimerManager::STARTING; + if (state_ == TimerManager::UNINITIALIZED) { + state_ = TimerManager::STARTING; doStart = true; } } if (doStart) { - _dispatcherThread = _threadFactory->newThread(_dispatcher); - _dispatcherThread->start(); + dispatcherThread_ = threadFactory_->newThread(dispatcher_); + dispatcherThread_->start(); } { - Synchronized s(_monitor); - while (_state == TimerManager::STARTING) { - _monitor.wait(); + Synchronized s(monitor_); + while (state_ == TimerManager::STARTING) { + monitor_.wait(); } - assert(_state != TimerManager::STARTING); + assert(state_ != TimerManager::STARTING); } } void TimerManager::stop() { bool doStop = false; { - Synchronized s(_monitor); - if (_state == TimerManager::UNINITIALIZED) { - _state = TimerManager::STOPPED; - } else if (_state != STOPPING && _state != STOPPED) { + Synchronized s(monitor_); + if (state_ == TimerManager::UNINITIALIZED) { + state_ = TimerManager::STOPPED; + } else if (state_ != STOPPING && state_ != STOPPED) { doStop = true; - _state = STOPPING; - _monitor.notifyAll(); + state_ = STOPPING; + monitor_.notifyAll(); } - while (_state != STOPPED) { - _monitor.wait(); + while (state_ != STOPPED) { + monitor_.wait(); } } if (doStop) { // Clean up any outstanding tasks - for (task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) { - _taskMap.erase(ix); + for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) { + taskMap_.erase(ix); } // Remove dispatcher's reference to us. - _dispatcher->_manager = NULL; + dispatcher_->manager_ = NULL; } } shared_ptr TimerManager::threadFactory() const { - Synchronized s(_monitor); - return _threadFactory; + Synchronized s(monitor_); + return threadFactory_; } void TimerManager::threadFactory(shared_ptr value) { - Synchronized s(_monitor); - _threadFactory = value; + Synchronized s(monitor_); + threadFactory_ = value; } size_t TimerManager::taskCount() const { - return _taskCount; + return taskCount_; } void TimerManager::add(shared_ptr task, long long timeout) { @@ -224,19 +224,19 @@ void TimerManager::add(shared_ptr task, long long timeout) { timeout += now; { - Synchronized s(_monitor); - if (_state != TimerManager::STARTED) { + Synchronized s(monitor_); + if (state_ != TimerManager::STARTED) { throw IllegalStateException(); } - _taskCount++; - _taskMap.insert(std::pair >(timeout, shared_ptr(new Task(task)))); + taskCount_++; + taskMap_.insert(std::pair >(timeout, shared_ptr(new Task(task)))); // If the task map was empty, or if we have an expiration that is earlier // than any previously seen, kick the dispatcher so it can update its // timeout - if (_taskCount == 1 || timeout < _taskMap.begin()->first) { - _monitor.notify(); + if (taskCount_ == 1 || timeout < taskMap_.begin()->first) { + monitor_.notify(); } } } @@ -257,13 +257,13 @@ void TimerManager::add(shared_ptr task, const struct timespec& value) void TimerManager::remove(shared_ptr task) { - Synchronized s(_monitor); - if (_state != TimerManager::STARTED) { + Synchronized s(monitor_); + if (state_ != TimerManager::STARTED) { throw IllegalStateException(); } } -const TimerManager::STATE TimerManager::state() const { return _state; } +const TimerManager::STATE TimerManager::state() const { return state_; } }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h index 50a0c13f..78782d9d 100644 --- a/lib/cpp/src/concurrency/TimerManager.h +++ b/lib/cpp/src/concurrency/TimerManager.h @@ -86,17 +86,17 @@ class TimerManager { virtual const STATE state() const; private: - shared_ptr _threadFactory; + shared_ptr threadFactory_; class Task; friend class Task; - std::multimap > _taskMap; - size_t _taskCount; - Monitor _monitor; - STATE _state; + std::multimap > taskMap_; + size_t taskCount_; + Monitor monitor_; + STATE state_; class Dispatcher; friend class Dispatcher; - shared_ptr _dispatcher; - shared_ptr _dispatcherThread; + shared_ptr dispatcher_; + shared_ptr dispatcherThread_; }; }}} // facebook::thrift::concurrency diff --git a/lib/cpp/src/server/TNonblockingServer.cc b/lib/cpp/src/server/TNonblockingServer.cc new file mode 100644 index 00000000..14fb5bc1 --- /dev/null +++ b/lib/cpp/src/server/TNonblockingServer.cc @@ -0,0 +1,476 @@ +#include "TNonblockingServer.h" + +#include +#include +#include +#include +#include +#include + +namespace facebook { namespace thrift { namespace server { + +void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { + socket_ = socket; + server_ = s; + appState_ = APP_INIT; + eventFlags_ = 0; + + readBufferPos_ = 0; + readWant_ = 0; + + writeBuffer_ = NULL; + writeBufferSize_ = 0; + writeBufferPos_ = 0; + + socketState_ = SOCKET_RECV; + appState_ = APP_INIT; + + // Set flags, which also registers the event + setFlags(eventFlags); +} + +void TConnection::workSocket() { + int flags; + + switch (socketState_) { + case SOCKET_RECV: + // It is an error to be in this state if we already have all the data + assert(readBufferPos_ < readWant_); + + // How much space is availble, and how much will we fetch + uint32_t avail = readBufferSize_ - readBufferPos_; + uint32_t fetch = readWant_ - readBufferPos_; + + // Double the buffer size until it is big enough + if (fetch > avail) { + while (fetch > avail) { + readBufferSize_ *= 2; + } + readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_); + if (readBuffer_ == NULL) { + perror("TConnection::workSocket() realloc"); + close(); + return; + } + } + + // Read from the socket + int got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0); + + if (got > 0) { + // Move along in the buffer + readBufferPos_ += got; + + // Check that we did not overdo it + assert(readBufferPos_ <= readWant_); + + // We are done reading, move onto the next state + if (readBufferPos_ == readWant_) { + transition(); + } + return; + } else if (got == -1) { + // Blocking errors are okay, just move on + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } + + if (errno != ECONNRESET) { + perror("TConnection::workSocket() recv -1"); + } + } + + // Whenever we get down here it means a remote disconnect + close(); + + return; + + case SOCKET_SEND: + // Should never have position past size + assert(writeBufferPos_ <= writeBufferSize_); + + // If there is no data to send, then let us move on + if (writeBufferPos_ == writeBufferSize_) { + fprintf(stderr, "WARNING: Send state with no data to send\n"); + transition(); + return; + } + + flags = 0; + #ifdef MSG_NOSIGNAL + // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we + // check for the EPIPE return condition and close the socket in that case + flags |= MSG_NOSIGNAL; + #endif // ifdef MSG_NOSIGNAL + + int left = writeBufferSize_ - writeBufferPos_; + int sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags); + + if (sent <= 0) { + // Blocking errors are okay, just move on + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } + if (errno != EPIPE) { + perror("TConnection::workSocket() send -1"); + } + close(); + return; + } + + writeBufferPos_ += sent; + + // Did we overdo it? + assert(writeBufferPos_ <= writeBufferSize_); + + // We are done! + if (writeBufferPos_ == writeBufferSize_) { + transition(); + } + + return; + + default: + fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_); + assert(0); + } +} + +/** + * This is called when the application transitions from one state into + * another. This means that it has finished writing the data that it needed + * to, or finished receiving the data that it needed to. + */ +void TConnection::transition() { + // Switch upon the state that we are currently in and move to a new state + switch (appState_) { + + case APP_READ_REQUEST: + // We are done reading the request, package the read buffer into transport + // and get back some data from the dispatch function + inputTransport_->resetBuffer(readBuffer_, readBufferPos_); + outputTransport_->resetBuffer(); + + try { + // Invoke the processor + server_->getProcessor()->process(inputTransport_, outputTransport_); + } catch (TTransportException &x) { + fprintf(stderr, "Server::process %s\n", x.getMessage().c_str()); + close(); + return; + } catch (...) { + fprintf(stderr, "Server::process() unknown exception\n"); + close(); + return; + } + + + // Get the result of the operation + outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_); + + // If the function call generated return data, then move into the send + // state and get going + if (writeBufferSize_ > 0) { + + // Move into write state + writeBufferPos_ = 0; + socketState_ = SOCKET_SEND; + appState_ = APP_SEND_RESULT; + + // Socket into write mode + setWrite(); + + // Try to work the socket immediately + workSocket(); + + return; + } + + // In this case, the request was asynchronous and we should fall through + // right back into the read frame header state + + case APP_SEND_RESULT: + + // N.B.: We also intentionally fall through here into the INIT state! + + case APP_INIT: + + // Clear write buffer variables + writeBuffer_ = NULL; + writeBufferPos_ = 0; + writeBufferSize_ = 0; + + // Set up read buffer for getting 4 bytes + readBufferPos_ = 0; + readWant_ = 4; + + // Into read4 state we go + socketState_ = SOCKET_RECV; + appState_ = APP_READ_FRAME_SIZE; + + // Register read event + setRead(); + + // Try to work the socket right away + workSocket(); + + return; + + case APP_READ_FRAME_SIZE: + // We just read the request length, deserialize it + int sz = *(int32_t*)readBuffer_; + sz = (int32_t)ntohl(sz); + + if (sz <= 0) { + fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz); + close(); + return; + } + + // Reset the read buffer + readWant_ = (uint32_t)sz; + readBufferPos_= 0; + + // Move into read request state + appState_ = APP_READ_REQUEST; + + // Work the socket right away + workSocket(); + + return; + + default: + fprintf(stderr, "Totally Fucked. Application State %d\n", appState_); + assert(0); + } +} + +void TConnection::setFlags(short eventFlags) { + // Catch the do nothing case + if (eventFlags_ == eventFlags) { + return; + } + + // Delete a previously existing event + if (eventFlags_ != 0) { + if (event_del(&event_) == -1) { + perror("TConnection::setFlags event_del"); + return; + } + } + + // Update in memory structure + eventFlags_ = eventFlags; + + /** + * event_set: + * + * Prepares the event structure &event to be used in future calls to + * event_add() and event_del(). The event will be prepared to call the + * event_handler using the 'sock' file descriptor to monitor events. + * + * The events can be either EV_READ, EV_WRITE, or both, indicating + * that an application can read or write from the file respectively without + * blocking. + * + * The event_handler will be called with the file descriptor that triggered + * the event and the type of event which will be one of: EV_TIMEOUT, + * EV_SIGNAL, EV_READ, EV_WRITE. + * + * The additional flag EV_PERSIST makes an event_add() persistent until + * event_del() has been called. + * + * Once initialized, the &event struct can be used repeatedly with + * event_add() and event_del() and does not need to be reinitialized unless + * the event_handler and/or the argument to it are to be changed. However, + * when an ev structure has been added to libevent using event_add() the + * structure must persist until the event occurs (assuming EV_PERSIST + * is not set) or is removed using event_del(). You may not reuse the same + * ev structure for multiple monitored descriptors; each descriptor needs + * its own ev. + */ + event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this); + + // Add the event + if (event_add(&event_, 0) == -1) { + perror("TConnection::setFlags(): coult not event_add"); + } +} + +/** + * Closes a connection + */ +void TConnection::close() { + // Delete the registered libevent + if (event_del(&event_) == -1) { + perror("TConnection::close() event_del"); + } + + // Close the socket + if (socket_ > 0) { + ::close(socket_); + } + socket_ = 0; + + // Give this object back to the server that owns it + server_->returnConnection(this); +} + +/** + * Creates a new connection either by reusing an object off the stack or + * by allocating a new one entirely + */ +TConnection* TNonblockingServer::createConnection(int socket, short flags) { + // Check the stack + if (connectionStack_.empty()) { + return new TConnection(socket, flags, this); + } else { + TConnection* result = connectionStack_.top(); + connectionStack_.pop(); + result->init(socket, flags, this); + return result; + } +} + +/** + * Returns a connection to the stack + */ +void TNonblockingServer::returnConnection(TConnection* connection) { + connectionStack_.push(connection); +} + +/** + * Server socket had something happen + */ +void TNonblockingServer::handleEvent(int fd, short which) { + // Make sure that libevent didn't fuck up the socket handles + assert(fd == serverSocket_); + + // Server socket accepted a new connection + socklen_t addrLen; + struct sockaddr addr; + addrLen = sizeof(addr); + + // Going to accept a new client socket + int clientSocket; + + // Accept as many new clients as possible, even though libevent signaled only + // one, this helps us to avoid having to go back into the libevent engine so + // many times + while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) { + + // Explicitly set this socket to NONBLOCK mode + int flags; + if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 || + fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) { + perror("thriftServerEventHandler: set O_NONBLOCK"); + close(clientSocket); + return; + } + + // Create a new TConnection for this client socket. + TConnection* clientConnection = + createConnection(clientSocket, EV_READ | EV_PERSIST); + + // Fail fast if we could not create a TConnection object + if (clientConnection == NULL) { + fprintf(stderr, "thriftServerEventHandler: failed TConnection factory"); + close(clientSocket); + return; + } + + // Put this client connection into the proper state + clientConnection->transition(); + } + + // Done looping accept, now we have to make sure the error is due to + // blocking. Any other error is a problem + if (errno != EAGAIN && errno != EWOULDBLOCK) { + perror("thriftServerEventHandler: accept()"); + } +} + +/** + * Main workhorse function, starts up the server listening on a port and + * loops over the libevent handler. + */ +void TNonblockingServer::serve() { + // Initialize libevent + event_init(); + + // Print some libevent stats + fprintf(stderr, + "libevent %s method %s\n", + event_get_version(), + event_get_method()); + + // Create the server socket + serverSocket_ = socket(AF_INET, SOCK_STREAM, 0); + if (serverSocket_ == -1) { + perror("TNonblockingServer::serve() socket() -1"); + return; + } + + // Set socket to nonblocking mode + int flags; + if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 || + fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) { + perror("TNonblockingServer::serve() O_NONBLOCK"); + ::close(serverSocket_); + return; + } + + int one = 1; + struct linger ling = {0, 0}; + + // Set reuseaddr to avoid 2MSL delay on server restart + setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + + // Keepalive to ensure full result flushing + setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one)); + + // Turn linger off to avoid hung sockets + setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling)); + + // Set TCP nodelay if available, MAC OS X Hack + // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html + #ifndef TCP_NOPUSH + setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); + #endif + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port_); + addr.sin_addr.s_addr = INADDR_ANY; + + if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) { + perror("TNonblockingServer::serve() bind"); + close(serverSocket_); + return; + } + + if (listen(serverSocket_, LISTEN_BACKLOG) == -1) { + perror("TNonblockingServer::serve() listen"); + close(serverSocket_); + return; + } + + // Register the server event + struct event serverEvent; + event_set(&serverEvent, + serverSocket_, + EV_READ | EV_PERSIST, + TNonblockingServer::eventHandler, + this); + + // Add the event and start up the server + if (event_add(&serverEvent, 0) == -1) { + perror("TNonblockingServer::serve(): coult not event_add"); + return; + } + + // Run libevent engine, never returns, invokes calls to event_handler + event_loop(0); +} + +}}} // facebook::thrift::server diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h new file mode 100644 index 00000000..565486c3 --- /dev/null +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -0,0 +1,195 @@ +#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ +#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1 + +#include "Thrift.h" +#include "server/TServer.h" +#include "transport/TMemoryBuffer.h" +#include +#include + +# + +namespace facebook { namespace thrift { namespace server { + +using boost::shared_ptr; + +// Forward declaration of class +class TConnection; + +/** + * This is a non-blocking server in C++ for high performance that operates a + * single IO thread. It assumes that all incoming requests are framed with a + * 4 byte length indicator and writes out responses using the same framing. + * + * It does not use the TServerTransport framework, but rather has socket + * operations hardcoded for use with select. + * + * @author Mark Slee + */ +class TNonblockingServer : public TServer { + private: + + // Listen backlog + static const int LISTEN_BACKLOG = 1024; + + // Server socket file descriptor + int serverSocket_; + + // Port server runs on + int port_; + + /** + * This is a stack of all the objects that have been created but that + * are NOT currently in use. When we close a connection, we place it on this + * stack so that the object can be reused later, rather than freeing the + * memory and reallocating a new object later. + */ + std::stack connectionStack_; + + void handleEvent(int fd, short which); + + public: + TNonblockingServer(shared_ptr processor, + shared_ptr options, + int port) : + TServer(processor, options), serverSocket_(0), port_(port) {} + + ~TNonblockingServer() {} + + TConnection* createConnection(int socket, short flags); + + void returnConnection(TConnection* connection); + + static void eventHandler(int fd, short which, void* v) { + ((TNonblockingServer*)v)->handleEvent(fd, which); + } + + void serve(); +}; + +/** + * Two states for sockets, recv and send mode + */ +enum TSocketState { + SOCKET_RECV, + SOCKET_SEND +}; + +/** + * Four states for the nonblocking servr: + * 1) initialize + * 2) read 4 byte frame size + * 3) read frame of data + * 4) send back data (if any) + */ +enum TAppState { + APP_INIT, + APP_READ_FRAME_SIZE, + APP_READ_REQUEST, + APP_SEND_RESULT +}; + +/** + * Represents a connection that is handled via libevent. This connection + * essentially encapsulates a socket that has some associated libevent state. + */ +class TConnection { + private: + + // Server handle + TNonblockingServer* server_; + + // Socket handle + int socket_; + + // Libevent object + struct event event_; + + // Libevent flags + short eventFlags_; + + // Socket mode + TSocketState socketState_; + + // Application state + TAppState appState_; + + // How much data needed to read + uint32_t readWant_; + + // Where in the read buffer are we + uint32_t readBufferPos_; + + // Read buffer + uint8_t* readBuffer_; + + // Read buffer size + uint32_t readBufferSize_; + + // Write buffer + uint8_t* writeBuffer_; + + // Write buffer size + uint32_t writeBufferSize_; + + // How far through writing are we? + uint32_t writeBufferPos_; + + // Transport to read from + shared_ptr inputTransport_; + + // Transport that processor writes to + shared_ptr outputTransport_; + + // Go into read mode + void setRead() { + setFlags(EV_READ | EV_PERSIST); + } + + // Go into write mode + void setWrite() { + setFlags(EV_WRITE | EV_PERSIST); + } + + // Set event flags + void setFlags(short eventFlags); + + // Libevent handlers + void workSocket(); + + // Close this client and reset + void close(); + + public: + + // Constructor + TConnection(int socket, short eventFlags, TNonblockingServer *s) { + readBuffer_ = (uint8_t*)malloc(1024); + if (readBuffer_ == NULL) { + throw new facebook::thrift::Exception("Out of memory."); + } + readBufferSize_ = 1024; + + // Allocate input and output tranpsorts + inputTransport_ = shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); + outputTransport_ = shared_ptr(new TMemoryBuffer()); + + init(socket, eventFlags, s); + } + + // Initialize + void init(int socket, short eventFlags, TNonblockingServer *s); + + // Transition into a new state + void transition(); + + // Handler wrapper + static void eventHandler(int fd, short which, void* v) { + assert(fd = ((TConnection*)v)->socket_); + ((TConnection*)v)->workSocket(); + } +}; + +}}} // facebook::thrift::server + +#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_ diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h index ddb320df..eb23b45a 100644 --- a/lib/cpp/src/server/TServer.h +++ b/lib/cpp/src/server/TServer.h @@ -26,6 +26,10 @@ public: virtual ~TServer() {} virtual void serve() = 0; + shared_ptr getProcessor() { + return processor_; + } + protected: TServer(shared_ptr processor, shared_ptr serverTransport, diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc index 4285b053..43f74635 100644 --- a/lib/cpp/src/server/TThreadPoolServer.cc +++ b/lib/cpp/src/server/TThreadPoolServer.cc @@ -12,19 +12,15 @@ using namespace facebook::thrift::concurrency; using namespace facebook::thrift::transport; class TThreadPoolServer::Task: public Runnable { - - shared_ptr _processor; - shared_ptr _input; - shared_ptr _output; - + public: Task(shared_ptr processor, shared_ptr input, shared_ptr output) : - _processor(processor), - _input(input), - _output(output) { + processor_(processor), + input_(input), + output_(output) { } ~Task() {} @@ -32,16 +28,22 @@ public: void run() { while(true) { try { - _processor->process(_input, _output); + processor_->process(input_, output_); } catch (TTransportException& ttx) { break; } catch(...) { break; } } - _input->close(); - _output->close(); + input_->close(); + output_->close(); } + + private: + shared_ptr processor_; + shared_ptr input_; + shared_ptr output_; + }; TThreadPoolServer::TThreadPoolServer(shared_ptr processor, diff --git a/lib/cpp/src/transport/TMemoryBuffer.cc b/lib/cpp/src/transport/TMemoryBuffer.cc new file mode 100644 index 00000000..084f2978 --- /dev/null +++ b/lib/cpp/src/transport/TMemoryBuffer.cc @@ -0,0 +1,45 @@ +#include "TMemoryBuffer.h" + +namespace facebook { namespace thrift { namespace transport { + +uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) { + // Check avaible data for reading + uint32_t avail = wPos_ - rPos_; + + // Device how much to give + uint32_t give = len; + if (avail < len) { + give = avail; + } + + // Copy into buffer and increment rPos_ + memcpy(buf, buffer_ + rPos_, give); + rPos_ += give; + + return give; +} + +void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) { + // Check available space + uint32_t avail = bufferSize_ - wPos_; + + // Grow the buffer + if (len > avail) { + if (!owner_) { + throw TTransportException("Insufficient space in external MemoryBuffer"); + } + while (len > avail) { + bufferSize_ *= 2; + buffer_ = (uint8_t*)realloc(buffer_, bufferSize_); + if (buffer_ == NULL) { + throw TTransportException("Out of memory."); + } + } + } + + // Copy into the buffer and increment wPos_ + memcpy(buffer_ + wPos_, buf, len); + wPos_ += len; +} + +}}} // facebook::thrift::transport diff --git a/lib/cpp/src/transport/TMemoryBuffer.h b/lib/cpp/src/transport/TMemoryBuffer.h new file mode 100644 index 00000000..397b1865 --- /dev/null +++ b/lib/cpp/src/transport/TMemoryBuffer.h @@ -0,0 +1,116 @@ +#ifndef _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ +#define _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ 1 + +#include "TTransport.h" +#include + +namespace facebook { namespace thrift { namespace transport { + +/** + * A memory buffer is a tranpsort that simply reads from and writes to an + * in memory buffer. Anytime you call write on it, the data is simply placed + * into a buffer, and anytime you call read, data is read from that buffer. + * + * The buffers are allocated using C constructs malloc,realloc, and the size + * doubles as necessary. + * + * @author Mark Slee + */ +class TMemoryBuffer : public TTransport { + public: + TMemoryBuffer() { + owner_ = true; + bufferSize_ = 1024; + buffer_ = (uint8_t*)malloc(bufferSize_); + if (buffer_ == NULL) { + throw TTransportException("Out of memory"); + } + wPos_ = 0; + rPos_ = 0; + } + + TMemoryBuffer(uint32_t sz) { + owner_ = true; + bufferSize_ = sz; + buffer_ = (uint8_t*)malloc(bufferSize_); + if (buffer_ == NULL) { + throw TTransportException("Out of memory"); + } + wPos_ = 0; + rPos_ = 0; + } + + TMemoryBuffer(uint8_t* buf, int sz) { + owner_ = false; + buffer_ = buf; + bufferSize_ = sz; + wPos_ = sz; + rPos_ = 0; + } + + ~TMemoryBuffer() { + if (owner_) { + if (buffer_ != NULL) { + free(buffer_); + buffer_ = NULL; + } + } + } + + bool isOpen() { + return true; + } + + + void open() {} + + void close() {} + + void getBuffer(uint8_t** bufPtr, uint32_t* sz) { + *bufPtr = buffer_; + *sz = bufferSize_; + } + + void resetBuffer() { + wPos_ = 0; + rPos_ = 0; + } + + void resetBuffer(uint8_t* buf, uint32_t sz) { + if (owner_) { + if (buffer_ != NULL) { + free(buffer_); + } + } + owner_ = false; + buffer_ = buf; + bufferSize_ = sz; + wPos_ = sz; + rPos_ = 0; + } + + uint32_t read(uint8_t* buf, uint32_t len); + + void write(const uint8_t* buf, uint32_t len); + + private: + // Data buffer + uint8_t* buffer_; + + // Allocated buffer size + uint32_t bufferSize_; + + // Where the write is at + uint32_t wPos_; + + // Where the reader is at + uint32_t rPos_; + + // Is this object the owner of the buffer? + bool owner_; + +}; + +}}} // facebook::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h index d65d25bf..1a20bd5d 100644 --- a/lib/cpp/src/transport/TTransport.h +++ b/lib/cpp/src/transport/TTransport.h @@ -63,9 +63,14 @@ class TTransport { */ virtual uint32_t readAll(uint8_t* buf, uint32_t len) { uint32_t have = 0; + uint32_t get = 0; while (have < len) { - have += read(buf+have, len-have); + get = read(buf+have, len-have); + if (get <= 0) { + throw TTransportException("No more data to read."); + } + have += get; } return have; -- 2.17.1