#include <assert.h>
#include <pthread.h>
+#include <signal.h>
using boost::shared_ptr;
namespace apache { namespace thrift { namespace concurrency {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+static sig_atomic_t mutexProfilingSampleRate = 0;
+static MutexWaitCallback mutexProfilingCallback = 0;
+
+volatile static sig_atomic_t mutexProfilingCounter = 0;
+
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback) {
+ mutexProfilingSampleRate = profilingSampleRate;
+ mutexProfilingCallback = callback;
+}
+
+#define PROFILE_MUTEX_START_LOCK() \
+ int64_t _lock_startTime = maybeGetProfilingStartTime();
+
+#define PROFILE_MUTEX_NOT_LOCKED() \
+ do { \
+ if (_lock_startTime > 0) { \
+ int64_t endTime = Util::currentTimeUsec(); \
+ (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_LOCKED() \
+ do { \
+ profileTime_ = _lock_startTime; \
+ if (profileTime_ > 0) { \
+ profileTime_ = Util::currentTimeUsec() - profileTime_; \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_START_UNLOCK() \
+ int64_t _temp_profileTime = profileTime_; \
+ profileTime_ = 0;
+
+#define PROFILE_MUTEX_UNLOCKED() \
+ do { \
+ if (_temp_profileTime > 0) { \
+ (*mutexProfilingCallback)(this, _temp_profileTime); \
+ } \
+ } while (0)
+
+static inline int64_t maybeGetProfilingStartTime() {
+ if (mutexProfilingSampleRate && mutexProfilingCallback) {
+ // This block is unsynchronized, but should produce a reasonable sampling
+ // rate on most architectures. The main race conditions are the gap
+ // between the decrement and the test, the non-atomicity of decrement, and
+ // potential caching of different values at different CPUs.
+ //
+ // - if two decrements race, the likeliest result is that the counter
+ // decrements slowly (perhaps much more slowly) than intended.
+ //
+ // - many threads could potentially decrement before resetting the counter
+ // to its large value, causing each additional incoming thread to
+ // profile every call. This situation is unlikely to persist for long
+ // as the critical gap is quite short, but profiling could be bursty.
+ sig_atomic_t localValue = --mutexProfilingCounter;
+ if (localValue <= 0) {
+ mutexProfilingCounter = mutexProfilingSampleRate;
+ return Util::currentTimeUsec();
+ }
+ }
+
+ return 0;
+}
+
+#else
+# define PROFILE_MUTEX_START_LOCK()
+# define PROFILE_MUTEX_NOT_LOCKED()
+# define PROFILE_MUTEX_LOCKED()
+# define PROFILE_MUTEX_START_UNLOCK()
+# define PROFILE_MUTEX_UNLOCKED()
+#endif // THRIFT_NO_CONTENTION_PROFILING
+
/**
* Implementation of Mutex class using POSIX mutex
*
class Mutex::impl {
public:
impl(Initializer init) : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
init(&pthread_mutex_);
initialized_ = true;
}
}
}
- void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+ void lock() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_mutex_lock(&pthread_mutex_);
+ PROFILE_MUTEX_LOCKED();
+ }
bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
bool timedlock(int64_t milliseconds) const {
+ PROFILE_MUTEX_START_LOCK();
+
struct timespec ts;
Util::toTimespec(ts, milliseconds);
- return (0 == pthread_mutex_timedlock(&pthread_mutex_, &ts));
+ int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+ if (ret == 0) {
+ PROFILE_MUTEX_LOCKED();
+ return true;
+ }
+
+ PROFILE_MUTEX_NOT_LOCKED();
+ return false;
}
- void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+ void unlock() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_mutex_unlock(&pthread_mutex_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
private:
mutable pthread_mutex_t pthread_mutex_;
mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
};
Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
class ReadWriteMutex::impl {
public:
impl() : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
int ret = pthread_rwlock_init(&rw_lock_, NULL);
assert(ret == 0);
initialized_ = true;
}
}
- void acquireRead() const { pthread_rwlock_rdlock(&rw_lock_); }
+ void acquireRead() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_rdlock(&rw_lock_);
+ PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
+ }
- void acquireWrite() const { pthread_rwlock_wrlock(&rw_lock_); }
+ void acquireWrite() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_wrlock(&rw_lock_);
+ PROFILE_MUTEX_LOCKED();
+ }
bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); }
bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); }
- void release() const { pthread_rwlock_unlock(&rw_lock_); }
+ void release() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_rwlock_unlock(&rw_lock_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
private:
mutable pthread_rwlock_t rw_lock_;
mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
};
ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}
namespace apache { namespace thrift { namespace concurrency {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+/**
+ * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to
+ * profile their blocking acquire methods. If this value is set to non-zero,
+ * Thrift will attempt to invoke the callback once every profilingSampleRate
+ * times. However, as the sampling is not synchronized the rate is not
+ * guranateed, and could be subject to big bursts and swings. Please ensure
+ * your sampling callback is as performant as your application requires.
+ *
+ * The callback will get called with the wait time taken to lock the mutex in
+ * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex)
+ * being locked.
+ *
+ * The enableMutexProfiling() function is unsynchronized; calling this function
+ * while profiling is already enabled may result in race conditions. On
+ * architectures where a pointer assignment is atomic, this is safe but there
+ * is no guarantee threads will agree on a single callback within any
+ * particular time period.
+ */
+typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback);
+
+#endif
+
/**
* A simple mutex class
*
static const int64_t MS_PER_S = 1000LL;
static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
+ static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
public:
result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us
}
+ static const void toTicks(int64_t& result, int64_t secs, int64_t oldTicks,
+ int64_t oldTicksPerSec, int64_t newTicksPerSec) {
+ result = secs * newTicksPerSec;
+ result += oldTicks * newTicksPerSec / oldTicksPerSec;
+
+ int64_t oldPerNew = oldTicksPerSec / newTicksPerSec;
+ if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) {
+ ++result;
+ }
+ }
+ /**
+ * Converts struct timespec to arbitrary-sized ticks since epoch
+ */
+ static const void toTicks(int64_t& result,
+ const struct timespec& value,
+ int64_t ticksPerSec) {
+ return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
+ }
+
+ /**
+ * Converts struct timeval to arbitrary-sized ticks since epoch
+ */
+ static const void toTicks(int64_t& result,
+ const struct timeval& value,
+ int64_t ticksPerSec) {
+ return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
+ }
+
/**
* Converts struct timespec to milliseconds
*/
- static const void toMilliseconds(int64_t& result, const struct timespec& value) {
- result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS);
- // round up -- int64_t cast is to avoid a compiler error for some GCCs
- if (int64_t(value.tv_nsec) % NS_PER_MS >= (NS_PER_MS / 2)) {
- ++result;
- }
+ static const void toMilliseconds(int64_t& result,
+ const struct timespec& value) {
+ return toTicks(result, value, MS_PER_S);
}
/**
* Converts struct timeval to milliseconds
*/
- static const void toMilliseconds(int64_t& result, const struct timeval& value) {
- result = (value.tv_sec * MS_PER_S) + (value.tv_usec / US_PER_MS);
- // round up -- int64_t cast is to avoid a compiler error for some GCCs
- if (int64_t(value.tv_usec) % US_PER_MS >= (US_PER_MS / 2)) {
- ++result;
- }
+ static const void toMilliseconds(int64_t& result,
+ const struct timeval& value) {
+ return toTicks(result, value, MS_PER_S);
+ }
+
+ /**
+ * Converts struct timespec to microseconds
+ */
+ static const void toUsec(int64_t& result, const struct timespec& value) {
+ return toTicks(result, value, US_PER_S);
}
+ /**
+ * Converts struct timeval to microseconds
+ */
+ static const void toUsec(int64_t& result, const struct timeval& value) {
+ return toTicks(result, value, US_PER_S);
+ }
+
+ /**
+ * Get current time as a number of arbitrary-size ticks from epoch
+ */
+ static const int64_t currentTimeTicks(int64_t ticksPerSec);
+
/**
* Get current time as milliseconds from epoch
*/
- static const int64_t currentTime();
+ static const int64_t currentTime() { return currentTimeTicks(MS_PER_S); }
+
+ /**
+ * Get current time as micros from epoch
+ */
+ static const int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
};
}}} // apache::thrift::concurrency