From 068f4169b9c2cd9f93ad32e99c5b0d5b786e0bc1 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 9 Mar 2010 05:19:45 +0000 Subject: [PATCH] cpp: TNonblockingServer: Allow unrun tasks to expire after a time limit Enhance ThreadManager to allow a expiration time interval to be assigned to tasks, and expire those tasks after that time limit has passed. Enhance TNonblockingServer to utilize this capability so it can be used for overload resilience. Note: expired entries are only removed from the queue head, so the mechanism in ThreadManager may not do what you expect if you have heterogeneous expiration times. That's not an issue with TNonblockingServer (which will give all tasks the same limit) and might not be in other cases where most tasks have the same limit and the rest execute quickly. A full-up timeout queue would be more complex and have greater overhead than that used here. It's unnecessary for the task at hand so I didn't go that route... The TNonblocking interface is simple: a setTaskExpireTime() accepts a 64-bit millisecond argument. 0 means infinite. A getTaskExpireTime() accessor completes the interface. The ThreadManager interface involves an added argument to add() for the expiration interval and a setExpireCallback() function for setting a callback that is called for expired tasks (for this project this is necessary to shut down the associated connection). git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920673 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/concurrency/ThreadManager.cpp | 64 +++++++++++++++++++++-- lib/cpp/src/concurrency/ThreadManager.h | 28 +++++++++- lib/cpp/src/server/TNonblockingServer.cpp | 18 +++++++ lib/cpp/src/server/TNonblockingServer.h | 41 ++++++++++++--- 4 files changed, 139 insertions(+), 12 deletions(-) diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp index 52473c7e..a02ad742 100644 --- a/lib/cpp/src/concurrency/ThreadManager.cpp +++ b/lib/cpp/src/concurrency/ThreadManager.cpp @@ -20,6 +20,7 @@ #include "ThreadManager.h" #include "Exception.h" #include "Monitor.h" +#include "Util.h" #include @@ -54,6 +55,7 @@ class ThreadManager::Impl : public ThreadManager { workerMaxCount_(0), idleCount_(0), pendingTaskCountMax_(0), + expiredCount_(0), state_(ThreadManager::UNINITIALIZED), monitor_(&mutex_), maxMonitor_(&mutex_) {} @@ -108,6 +110,13 @@ class ThreadManager::Impl : public ThreadManager { return pendingTaskCountMax_; } + size_t expiredTaskCount() { + Synchronized s(monitor_); + size_t result = expiredCount_; + expiredCount_ = 0; + return result; + } + void pendingTaskCountMax(const size_t value) { Synchronized s(monitor_); pendingTaskCountMax_ = value; @@ -115,12 +124,16 @@ class ThreadManager::Impl : public ThreadManager { bool canSleep(); - void add(shared_ptr value, int64_t timeout); + void add(shared_ptr value, int64_t timeout, int64_t expiration); void remove(shared_ptr task); shared_ptr removeNextPending(); + void removeExpiredTasks(); + + void setExpireCallback(ExpireCallback expireCallback); + private: void stopImpl(bool join); @@ -128,6 +141,8 @@ private: size_t workerMaxCount_; size_t idleCount_; size_t pendingTaskCountMax_; + size_t expiredCount_; + ExpireCallback expireCallback_; ThreadManager::STATE state_; shared_ptr threadFactory_; @@ -156,9 +171,10 @@ class ThreadManager::Task : public Runnable { COMPLETE }; - Task(shared_ptr runnable) : + Task(shared_ptr runnable, int64_t expiration=0LL) : runnable_(runnable), - state_(WAITING) {} + state_(WAITING), + expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {} ~Task() {} @@ -173,10 +189,15 @@ class ThreadManager::Task : public Runnable { return runnable_; } + int64_t getExpireTime() const { + return expireTime_; + } + private: shared_ptr runnable_; friend class ThreadManager::Worker; STATE state_; + int64_t expireTime_; }; class ThreadManager::Worker: public Runnable { @@ -262,6 +283,8 @@ class ThreadManager::Worker: public Runnable { } if (active) { + manager_->removeExpiredTasks(); + if (!manager_->tasks_.empty()) { task = manager_->tasks_.front(); manager_->tasks_.pop(); @@ -435,13 +458,16 @@ void ThreadManager::Impl::removeWorker(size_t value) { return idMap_.find(id) == idMap_.end(); } - void ThreadManager::Impl::add(shared_ptr value, int64_t timeout) { + void ThreadManager::Impl::add(shared_ptr value, + int64_t timeout, + int64_t expiration) { Guard g(mutex_); if (state_ != ThreadManager::STARTED) { throw IllegalStateException(); } + removeExpiredTasks(); if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) { if (canSleep() && timeout >= 0) { while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) { @@ -453,7 +479,7 @@ void ThreadManager::Impl::removeWorker(size_t value) { } } - tasks_.push(shared_ptr(new ThreadManager::Task(value))); + tasks_.push(shared_ptr(new ThreadManager::Task(value, expiration))); // If idle thread is available notify it, otherwise all worker threads are // running and will get around to this task in time. @@ -485,6 +511,34 @@ boost::shared_ptr ThreadManager::Impl::removeNextPending() { return task->getRunnable(); } +void ThreadManager::Impl::removeExpiredTasks() { + int64_t now = 0LL; // we won't ask for the time untile we need it + + // note that this loop breaks at the first non-expiring task + while (!tasks_.empty()) { + shared_ptr task = tasks_.front(); + if (task->getExpireTime() == 0LL) { + break; + } + if (now == 0LL) { + now = Util::currentTime(); + } + if (task->getExpireTime() > now) { + break; + } + if (expireCallback_) { + expireCallback_(task->getRunnable()); + } + tasks_.pop(); + expiredCount_++; + } +} + + +void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) { + expireCallback_ = expireCallback; +} + class SimpleThreadManager : public ThreadManager::Impl { public: diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h index cbf08c07..95c49068 100644 --- a/lib/cpp/src/concurrency/ThreadManager.h +++ b/lib/cpp/src/concurrency/ThreadManager.h @@ -21,6 +21,7 @@ #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1 #include +#include #include #include "Thread.h" @@ -56,6 +57,9 @@ class ThreadManager { ThreadManager() {} public: + class Task; + typedef std::tr1::function)> ExpireCallback; + virtual ~ThreadManager() {} /** @@ -124,6 +128,11 @@ class ThreadManager { */ virtual size_t pendingTaskCountMax() const = 0; + /** + * Gets the number of tasks which have been expired without being run. + */ + virtual size_t expiredTaskCount() = 0; + /** * Adds a task to be executed at some time in the future by a worker thread. * @@ -138,10 +147,14 @@ class ThreadManager { * is specified. Specific cases: * timeout = 0 : Wait forever to queue task. * timeout = -1 : Return immediately if pending task count exceeds specified max + * @param expiration when nonzero, the number of milliseconds the task is valid + * to be run; if exceeded, the task will be dropped off the queue and not run. * * @throws TooManyPendingTasksException Pending task count exceeds max pending task count */ - virtual void add(boost::shared_ptrtask, int64_t timeout=0LL) = 0; + virtual void add(boost::shared_ptrtask, + int64_t timeout=0LL, + int64_t expiration=0LL) = 0; /** * Removes a pending task @@ -155,6 +168,19 @@ class ThreadManager { */ virtual boost::shared_ptr removeNextPending() = 0; + /** + * Remove tasks from front of task queue that have expired. + */ + virtual void removeExpiredTasks() = 0; + + /** + * Set a callback to be called when a task is expired and not run. + * + * @param expireCallback a function called with the shared_ptr for + * the expired task. + */ + virtual void setExpireCallback(ExpireCallback expireCallback) = 0; + static boost::shared_ptr newThreadManager(); /** diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index eb071a90..649994d9 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -770,6 +770,16 @@ void TNonblockingServer::registerEvents(event_base* base) { } } +void TNonblockingServer::setThreadManager(boost::shared_ptr threadManager) { + threadManager_ = threadManager; + if (threadManager != NULL) { + threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1)); + threadPoolProcessing_ = true; + } else { + threadPoolProcessing_ = false; + } +} + bool TNonblockingServer::serverOverloaded() { size_t activeConnections = numTConnections_ - connectionStack_.size(); if (numActiveProcessors_ > maxActiveProcessors_ || @@ -807,6 +817,14 @@ bool TNonblockingServer::drainPendingTask() { return false; } +void TNonblockingServer::expireClose(boost::shared_ptr task) { + TConnection* connection = + static_cast(task.get())->getTConnection(); + assert(connection && connection->getServer() + && connection->getState() == APP_WAIT_TASK); + connection->forceClose(); +} + /** * Main workhorse function, starts up the server listening on a port and * loops over the libevent handler. diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index cf5aa188..41a2bf58 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -113,6 +113,9 @@ class TNonblockingServer : public TServer { /// Limit for number of open connections size_t maxConnections_; + /// Time in milliseconds before an unperformed task expires (0 == infinite). + int64_t taskExpireTime_; + /** * Hysteresis for overload state. This is the fraction of the overload * value that needs to be reached before the overload state is cleared; @@ -173,6 +176,7 @@ class TNonblockingServer : public TServer { connectionStackLimit_(CONNECTION_STACK_LIMIT), maxActiveProcessors_(MAX_ACTIVE_PROCESSORS), maxConnections_(MAX_CONNECTIONS), + taskExpireTime_(0), overloadHysteresis_(0.8), overloadAction_(T_OVERLOAD_NO_ACTION), idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT), @@ -194,6 +198,7 @@ class TNonblockingServer : public TServer { connectionStackLimit_(CONNECTION_STACK_LIMIT), maxActiveProcessors_(MAX_ACTIVE_PROCESSORS), maxConnections_(MAX_CONNECTIONS), + taskExpireTime_(0), overloadHysteresis_(0.8), overloadAction_(T_OVERLOAD_NO_ACTION), idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT), @@ -224,12 +229,13 @@ class TNonblockingServer : public TServer { connectionStackLimit_(CONNECTION_STACK_LIMIT), maxActiveProcessors_(MAX_ACTIVE_PROCESSORS), maxConnections_(MAX_CONNECTIONS), + taskExpireTime_(0), overloadHysteresis_(0.8), overloadAction_(T_OVERLOAD_NO_ACTION), idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT), overloaded_(false), nConnectionsDropped_(0), - nTotalConnectionsDropped_(0) { + nTotalConnectionsDropped_(0) { setInputTransportFactory(inputTransportFactory); setOutputTransportFactory(outputTransportFactory); setInputProtocolFactory(inputProtocolFactory); @@ -239,10 +245,7 @@ class TNonblockingServer : public TServer { ~TNonblockingServer() {} - void setThreadManager(boost::shared_ptr threadManager) { - threadManager_ = threadManager; - threadPoolProcessing_ = (threadManager != NULL); - } + void setThreadManager(boost::shared_ptr threadManager); boost::shared_ptr getThreadManager() { return threadManager_; @@ -271,7 +274,7 @@ class TNonblockingServer : public TServer { } void addTask(boost::shared_ptr task) { - threadManager_->add(task); + threadManager_->add(task, 0LL, taskExpireTime_); } event_base* getEventBase() const { @@ -405,6 +408,24 @@ class TNonblockingServer : public TServer { overloadAction_ = overloadAction; } + /** + * Get the time in milliseconds after which a task expires (0 == infinite). + * + * @return a 64-bit time in milliseconds. + */ + int64_t getTaskExpireTime() const { + return taskExpireTime_; + } + + /** + * Set the time in milliseconds after which a task expires (0 == infinite). + * + * @param taskExpireTime a 64-bit time in milliseconds. + */ + void setTaskExpireTime(int64_t taskExpireTime) { + taskExpireTime_ = taskExpireTime; + } + /** * Determine if the server is currently overloaded. * This function checks the maximums for open connections and connections @@ -462,6 +483,14 @@ class TNonblockingServer : public TServer { */ void returnConnection(TConnection* connection); + /** + * Callback function that the threadmanager calls when a task reaches + * its expiration time. It is needed to clean up the expired connection. + * + * @param task the runnable associated with the expired task. + */ + void expireClose(boost::shared_ptr task); + /** * C-callable event handler for listener events. Provides a callback * that libevent can understand which invokes server->handleEvent(). -- 2.17.1