blob: 20193533f2da3194e26aa76240918240cc5580af [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier12d70532011-12-14 23:35:28 +000020#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
Marc Slemko66949872006-07-15 01:52:39 +000023#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +000024#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +000025
David Reissaf296952008-06-10 22:54:40 +000026#if GOOGLE_PERFTOOLS_REGISTER_THREAD
27# include <google/profiler.h>
28#endif
29
Marc Slemko66949872006-07-15 01:52:39 +000030#include <assert.h>
31#include <pthread.h>
32
Marc Slemko6f038a72006-08-03 18:58:09 +000033#include <iostream>
34
35#include <boost/weak_ptr.hpp>
36
T Jake Lucianib5e62212009-01-31 22:36:20 +000037namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000038
David Reissd4a269c2007-08-23 02:37:19 +000039using boost::shared_ptr;
40using boost::weak_ptr;
Marc Slemko6f038a72006-08-03 18:58:09 +000041
Mark Sleef5f2be42006-09-05 21:05:31 +000042/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000043 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000044 *
Mark Sleef5f2be42006-09-05 21:05:31 +000045 * @version $Id:$
46 */
Marc Slemko66949872006-07-15 01:52:39 +000047class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000048 public:
Marc Slemko66949872006-07-15 01:52:39 +000049
Mark Sleef5f2be42006-09-05 21:05:31 +000050 enum STATE {
51 uninitialized,
52 starting,
53 started,
54 stopping,
55 stopped
Marc Slemko66949872006-07-15 01:52:39 +000056 };
57
58 static const int MB = 1024 * 1024;
59
Marc Slemko8a40a762006-07-19 17:46:50 +000060 static void* threadMain(void* arg);
61
Mark Sleef5f2be42006-09-05 21:05:31 +000062 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000063 pthread_t pthread_;
64 STATE state_;
65 int policy_;
66 int priority_;
67 int stackSize_;
68 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000069 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000070
Mark Sleef5f2be42006-09-05 21:05:31 +000071 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000072
Marc Slemko67606e52007-06-04 21:01:19 +000073 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Roger Meier84e4a3c2011-09-16 20:58:44 +000074
75#ifndef _WIN32
Mark Slee2f6404d2006-10-10 01:37:40 +000076 pthread_(0),
Roger Meier84e4a3c2011-09-16 20:58:44 +000077#endif // _WIN32
78
Marc Slemko3a3b53b2007-05-22 23:59:54 +000079 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000080 policy_(policy),
81 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000082 stackSize_(stackSize),
83 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000084
85 this->Thread::runnable(runnable);
86 }
Marc Slemko66949872006-07-15 01:52:39 +000087
Marc Slemko67606e52007-06-04 21:01:19 +000088 ~PthreadThread() {
89 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000090 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000091 be leaked. */
92 if(!detached_) {
93 try {
94 join();
95 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000096 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000097 }
98 }
99 }
Marc Slemko6f038a72006-08-03 18:58:09 +0000100
Marc Slemko66949872006-07-15 01:52:39 +0000101 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000103 return;
104 }
105
Marc Slemko66949872006-07-15 01:52:39 +0000106 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +0000107 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000108 throw SystemResourceException("pthread_attr_init failed");
109 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +0000110
Marc Slemkoa6479032007-06-05 22:20:14 +0000111 if(pthread_attr_setdetachstate(&thread_attr,
112 detached_ ?
113 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +0000114 PTHREAD_CREATE_JOINABLE) != 0) {
115 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000116 }
Marc Slemko66949872006-07-15 01:52:39 +0000117
118 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +0000119 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
120 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000121 }
Marc Slemko66949872006-07-15 01:52:39 +0000122
123 // Set thread policy
Roger Meier3516e0e2011-09-30 20:23:34 +0000124 #ifdef _WIN32
125 //WIN32 Pthread implementation doesn't seem to support sheduling policies other then PosixThreadFactory::OTHER - runtime error
126 policy_ = PosixThreadFactory::OTHER;
127 #endif
128
Mark Slee2782d6d2007-05-23 04:55:30 +0000129 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
130 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000131 }
Marc Slemko66949872006-07-15 01:52:39 +0000132
133 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000134 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000135
136 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000137 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
138 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000139 }
Marc Slemko66949872006-07-15 01:52:39 +0000140
Mark Sleef5f2be42006-09-05 21:05:31 +0000141 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000142 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000144
Marc Slemko67606e52007-06-04 21:01:19 +0000145 state_ = starting;
146
Mark Slee2782d6d2007-05-23 04:55:30 +0000147 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
148 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000149 }
Marc Slemko66949872006-07-15 01:52:39 +0000150 }
151
152 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000153 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000154 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000155 /* XXX
156 If join fails it is most likely due to the fact
157 that the last reference was the thread itself and cannot
158 join. This results in leaked threads and will eventually
159 cause the process to run out of thread resources.
160 We're beyond the point of throwing an exception. Not clear how
161 best to handle this. */
Jake Farrellb0d95602011-12-06 01:17:26 +0000162 int res = pthread_join(pthread_, &ignore);
163 detached_ = (res == 0);
164 if (res != 0) {
165 GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
166 }
167 } else {
168 GlobalOutput.printf("PthreadThread::join(): detached thread");
Marc Slemko66949872006-07-15 01:52:39 +0000169 }
170 }
171
David Reissfbb14ef2008-12-02 02:32:25 +0000172 Thread::id_t getId() {
Roger Meier84e4a3c2011-09-16 20:58:44 +0000173
174#ifndef _WIN32
David Reissfbb14ef2008-12-02 02:32:25 +0000175 return (Thread::id_t)pthread_;
Roger Meier84e4a3c2011-09-16 20:58:44 +0000176#else
177 return (Thread::id_t)pthread_.p;
178#endif // _WIN32
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000179 }
180
Mark Sleef5f2be42006-09-05 21:05:31 +0000181 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000182
Mark Sleef5f2be42006-09-05 21:05:31 +0000183 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000184
Marc Slemko6f038a72006-08-03 18:58:09 +0000185 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000186 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000188 }
Marc Slemko66949872006-07-15 01:52:39 +0000189};
190
Marc Slemko8a40a762006-07-19 17:46:50 +0000191void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000192 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000193 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
194
Mark Sleef5f2be42006-09-05 21:05:31 +0000195 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000196 return (void*)0;
197 }
198
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000200 return (void*)0;
201 }
202
David Reissaf296952008-06-10 22:54:40 +0000203#if GOOGLE_PERFTOOLS_REGISTER_THREAD
204 ProfilerRegisterThread();
205#endif
206
Roger Meier3faaedf2011-10-02 10:51:45 +0000207 thread->state_ = started;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000208 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000209 if (thread->state_ != stopping && thread->state_ != stopped) {
210 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000211 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000212
Marc Slemko8a40a762006-07-19 17:46:50 +0000213 return (void*)0;
214}
215
Mark Sleef5f2be42006-09-05 21:05:31 +0000216/**
217 * POSIX Thread factory implementation
218 */
Marc Slemko66949872006-07-15 01:52:39 +0000219class PosixThreadFactory::Impl {
220
Mark Sleef5f2be42006-09-05 21:05:31 +0000221 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 POLICY policy_;
223 PRIORITY priority_;
224 int stackSize_;
225 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000226
Mark Sleef5f2be42006-09-05 21:05:31 +0000227 /**
228 * Converts generic posix thread schedule policy enums into pthread
229 * API values.
230 */
Marc Slemko66949872006-07-15 01:52:39 +0000231 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000232 switch (policy) {
233 case OTHER:
234 return SCHED_OTHER;
235 case FIFO:
236 return SCHED_FIFO;
237 case ROUND_ROBIN:
238 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000239 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000240 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000241 }
242
Mark Sleef5f2be42006-09-05 21:05:31 +0000243 /**
244 * Converts relative thread priorities to absolute value based on posix
245 * thread scheduler policy
246 *
247 * The idea is simply to divide up the priority range for the given policy
248 * into the correpsonding relative priority level (lowest..highest) and
249 * then pro-rate accordingly.
250 */
Marc Slemko66949872006-07-15 01:52:39 +0000251 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000252 int pthread_policy = toPthreadPolicy(policy);
David Reisse4ca1792009-05-21 02:28:19 +0000253 int min_priority = 0;
254 int max_priority = 0;
255#ifdef HAVE_SCHED_GET_PRIORITY_MIN
256 min_priority = sched_get_priority_min(pthread_policy);
257#endif
258#ifdef HAVE_SCHED_GET_PRIORITY_MAX
259 max_priority = sched_get_priority_max(pthread_policy);
260#endif
Marc Slemko66949872006-07-15 01:52:39 +0000261 int quanta = (HIGHEST - LOWEST) + 1;
Roger Meier3516e0e2011-09-30 20:23:34 +0000262 float stepsperquanta = (float)(max_priority - min_priority) / quanta;
Marc Slemko66949872006-07-15 01:52:39 +0000263
Mark Slee29050782006-09-29 00:12:30 +0000264 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000265 return (int)(min_priority + stepsperquanta * priority);
266 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000267 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000268 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000269 return (int)(min_priority + stepsperquanta * NORMAL);
270 }
271 }
272
Mark Sleef5f2be42006-09-05 21:05:31 +0000273 public:
Marc Slemko66949872006-07-15 01:52:39 +0000274
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000275 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000276 policy_(policy),
277 priority_(priority),
278 stackSize_(stackSize),
279 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000280
Mark Sleef5f2be42006-09-05 21:05:31 +0000281 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000282 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000283 *
284 * @param runnable A runnable object
285 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000286 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000287 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000288 result->weakRef(result);
289 runnable->thread(result);
290 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000291 }
292
Marc Slemkoa6479032007-06-05 22:20:14 +0000293 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000294
Marc Slemkoa6479032007-06-05 22:20:14 +0000295 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000296
Marc Slemkoa6479032007-06-05 22:20:14 +0000297 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000298
Mark Sleef5f2be42006-09-05 21:05:31 +0000299 /**
300 * Sets priority.
301 *
302 * XXX
303 * Need to handle incremental priorities properly.
304 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000305 void setPriority(PRIORITY value) { priority_ = value; }
306
307 bool isDetached() const { return detached_; }
308
309 void setDetached(bool value) { detached_ = value; }
310
Mark Slee98439152007-08-21 02:39:40 +0000311 Thread::id_t getCurrentThreadId() const {
Roger Meier84e4a3c2011-09-16 20:58:44 +0000312
313#ifndef _WIN32
David Reissffff2b32009-09-01 18:03:07 +0000314 return (Thread::id_t)pthread_self();
Roger Meier84e4a3c2011-09-16 20:58:44 +0000315#else
316 return (Thread::id_t)pthread_self().p;
317#endif // _WIN32
318
Mark Slee98439152007-08-21 02:39:40 +0000319 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000320
Marc Slemko66949872006-07-15 01:52:39 +0000321};
322
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000323PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000324 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000325
Mark Slee2f6404d2006-10-10 01:37:40 +0000326shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000327
Marc Slemkoa6479032007-06-05 22:20:14 +0000328int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000329
Marc Slemkoa6479032007-06-05 22:20:14 +0000330void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000331
Marc Slemkoa6479032007-06-05 22:20:14 +0000332PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000333
Marc Slemkoa6479032007-06-05 22:20:14 +0000334void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000335
Marc Slemkoa6479032007-06-05 22:20:14 +0000336bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
337
338void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
339
340Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000341
T Jake Lucianib5e62212009-01-31 22:36:20 +0000342}}} // apache::thrift::concurrency