void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
- void wait(long long timeout) const {
+ void wait(int64_t timeout) const {
// XXX Need to assert that caller owns mutex
assert(timeout >= 0LL);
assert(iret == 0);
} else {
struct timespec abstime;
- long long now = Util::currentTime();
+ int64_t now = Util::currentTime();
Util::toTimespec(abstime, now + timeout);
int result = pthread_cond_timedwait(&pthread_cond_,
&pthread_mutex_,
void Monitor::unlock() const { impl_->unlock(); }
-void Monitor::wait(long long timeout) const { impl_->wait(timeout); }
+void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); }
void Monitor::notify() const { impl_->notify(); }
virtual void unlock() const;
- virtual void wait(long long timeout=0LL) const;
+ virtual void wait(int64_t timeout=0LL) const;
virtual void notify() const;
public:
- typedef unsigned long long id_t;
+ typedef uint64_t id_t;
virtual ~Thread() {};
bool canSleep();
- void add(shared_ptr<Runnable> value, long long timeout);
+ void add(shared_ptr<Runnable> value, int64_t timeout);
void remove(shared_ptr<Runnable> task);
return idMap_.find(id) == idMap_.end();
}
- void ThreadManager::Impl::add(shared_ptr<Runnable> value, long long timeout) {
+ void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
Synchronized s(monitor_);
if (state_ != ThreadManager::STARTED) {
*
* @throws TooManyPendingTasksException Pending task count exceeds max pending task count
*/
- virtual void add(boost::shared_ptr<Runnable>task, long long timeout=0LL) = 0;
+ virtual void add(boost::shared_ptr<Runnable>task, int64_t timeout=0LL) = 0;
/**
* Removes a pending task
using boost::shared_ptr;
-typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
+typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
/**
{
Synchronized s(manager_->monitor_);
task_iterator expiredTaskEnd;
- long long now = Util::currentTime();
+ int64_t now = Util::currentTime();
while (manager_->state_ == TimerManager::STARTED &&
(expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
- long long timeout = 0LL;
+ int64_t timeout = 0LL;
if (!manager_->taskMap_.empty()) {
timeout = manager_->taskMap_.begin()->first - now;
}
return taskCount_;
}
-void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
- long long now = Util::currentTime();
+void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
+ int64_t now = Util::currentTime();
timeout += now;
{
}
taskCount_++;
- taskMap_.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+ taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
// If the task map was empty, or if we have an expiration that is earlier
// than any previously seen, kick the dispatcher so it can update its
void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
- long long expiration;
+ int64_t expiration;
Util::toMilliseconds(expiration, value);
- long long now = Util::currentTime();
+ int64_t now = Util::currentTime();
if (expiration < now) {
throw InvalidArgumentException();
* @param task The task to execute
* @param timeout Time in milliseconds to delay before executing task
*/
- virtual void add(boost::shared_ptr<Runnable> task, long long timeout);
+ virtual void add(boost::shared_ptr<Runnable> task, int64_t timeout);
/**
* Adds a task to be executed at some time in the future by a worker thread.
boost::shared_ptr<const ThreadFactory> threadFactory_;
class Task;
friend class Task;
- std::multimap<long long, boost::shared_ptr<Task> > taskMap_;
+ std::multimap<int64_t, boost::shared_ptr<Task> > taskMap_;
size_t taskCount_;
Monitor monitor_;
STATE state_;
*/
class Util {
- static const long long NS_PER_S = 1000000000LL;
- static const long long MS_PER_S = 1000LL;
- static const long long NS_PER_MS = 1000000LL;
+ static const int64_t NS_PER_S = 1000000000LL;
+ static const int64_t MS_PER_S = 1000LL;
+ static const int64_t NS_PER_MS = 1000000LL;
public:
* @param struct timespec& result
* @param time or duration in milliseconds
*/
- static void toTimespec(struct timespec& result, long long value) {
+ static void toTimespec(struct timespec& result, int64_t value) {
result.tv_sec = value / MS_PER_S; // ms to s
result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
}
/**
* Converts timespec to milliseconds
*/
- static const void toMilliseconds(long long& result, const struct timespec& value) {
+ static const void toMilliseconds(int64_t& result, const struct timespec& value) {
result =
(value.tv_sec * MS_PER_S) +
(value.tv_nsec / NS_PER_MS) +
/**
* Get current time as milliseconds from epoch
*/
- static const long long currentTime() {
+ static const int64_t currentTime() {
#if defined(HAVE_CLOCK_GETTIME)
struct timespec now;
int ret = clock_gettime(CLOCK_REALTIME, &now);
int ret = gettimeofday(&now, NULL);
assert(ret == 0);
return
- (((long long)now.tv_sec) * MS_PER_S) +
+ (((int64_t)now.tv_sec) * MS_PER_S) +
(now.tv_usec / MS_PER_S) +
(now.tv_usec % MS_PER_S >= 500 ? 1 : 0);
#endif // defined(HAVE_GETTIMEDAY)
std::cout << "\t\tUtil minimum time" << std::endl;
- long long time00 = Util::currentTime();
- long long time01 = Util::currentTime();
+ int64_t time00 = Util::currentTime();
+ int64_t time01 = Util::currentTime();
std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
size_t taskCount = 100000;
- long long delay = 10LL;
+ int64_t delay = 10LL;
std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
size_t tasksPerWorker = 1000;
- long long delay = 10LL;
+ int64_t delay = 10LL;
for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
/** See how accurate monitor timeout is. */
- bool monitorTimeoutTest(size_t count=1000, long long timeout=10) {
+ bool monitorTimeoutTest(size_t count=1000, int64_t timeout=10) {
Monitor monitor;
- long long startTime = Util::currentTime();
+ int64_t startTime = Util::currentTime();
for (size_t ix = 0; ix < count; ix++) {
{
}
}
- long long endTime = Util::currentTime();
+ int64_t endTime = Util::currentTime();
double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
public:
- Task(Monitor& monitor, size_t& count, long long timeout) :
+ Task(Monitor& monitor, size_t& count, int64_t timeout) :
_monitor(monitor),
_count(count),
_timeout(timeout),
Monitor& _monitor;
size_t& _count;
- long long _timeout;
- long long _startTime;
- long long _endTime;
+ int64_t _timeout;
+ int64_t _startTime;
+ int64_t _endTime;
bool _done;
Monitor _sleep;
};
* completes. Verify that all tasks completed and that thread manager cleans
* up properly on delete.
*/
- bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
+ bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
Monitor monitor;
tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
}
- long long time00 = Util::currentTime();
+ int64_t time00 = Util::currentTime();
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
}
}
- long long time01 = Util::currentTime();
+ int64_t time01 = Util::currentTime();
- long long firstTime = 9223372036854775807LL;
- long long lastTime = 0;
+ int64_t firstTime = 9223372036854775807LL;
+ int64_t lastTime = 0;
double averageTime = 0;
- long long minTime = 9223372036854775807LL;
- long long maxTime = 0;
+ int64_t minTime = 9223372036854775807LL;
+ int64_t maxTime = 0;
for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
shared_ptr<ThreadManagerTests::Task> task = *ix;
- long long delta = task->_endTime - task->_startTime;
+ int64_t delta = task->_endTime - task->_startTime;
assert(delta > 0);
* Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
* pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
- bool blockTest(long long timeout=100LL, size_t workerCount=2) {
+ bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
bool success = false;
class Task: public Runnable {
public:
- Task(Monitor& monitor, long long timeout) :
+ Task(Monitor& monitor, int64_t timeout) :
_timeout(timeout),
_startTime(Util::currentTime()),
_monitor(monitor),
// Figure out error percentage
- long long delta = _endTime - _startTime;
+ int64_t delta = _endTime - _startTime;
delta = delta > _timeout ? delta - _timeout : _timeout - delta;
}
}
- long long _timeout;
- long long _startTime;
- long long _endTime;
+ int64_t _timeout;
+ int64_t _startTime;
+ int64_t _endTime;
Monitor& _monitor;
bool _success;
bool _done;
* properly clean up itself and the remaining orphaned timeout task when the
* manager goes out of scope and its destructor is called.
*/
- bool test00(long long timeout=1000LL) {
+ bool test00(int64_t timeout=1000LL) {
shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
return true;
}
- const std::map<std::string, long long>& get_frequency_map() {
+ const std::map<std::string, int64_t>& get_frequency_map() {
return frequency_map_;
}
}
boost::shared_ptr<facebook::thrift::protocol::TProtocol> piprot_;
- std::map<std::string, long long> frequency_map_;
+ std::map<std::string, int64_t> frequency_map_;
bool print_;
bool frequency_;
}
-long long TThreadPoolServer::timeout() const {return timeout_;}
-void TThreadPoolServer::timeout(long long value) {timeout_ = value;}
+int64_t TThreadPoolServer::getTimeout() const {
+ return timeout_;
+}
+
+void TThreadPoolServer::setTimeout(int64_t value) {
+ timeout_ = value;
+}
}}} // facebook::thrift::server
virtual void serve();
- virtual long long timeout() const;
- virtual void timeout(long long value);
+ virtual int64_t getTimeout() const;
+
+ virtual void setTimeout(int64_t value);
virtual void stop() {
stop_ = true;
volatile bool stop_;
- volatile long long timeout_;
+ volatile int64_t timeout_;
};
openLogFile();
}
-void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
+void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
filename_ = filename;
offset_ = offset;
}
// sanity check on event
- if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
+ if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
continue;
}
// If chunking is required, then make sure that msg does not cross chunk boundary
- if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
+ if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
// event size must be less than chunk size
if(outEvent->eventSize_ > chunkSize_) {
continue;
}
- long long chunk1 = offset_/chunkSize_;
- long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
+ int64_t chunk1 = offset_/chunkSize_;
+ int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
// if adding this event will cross a chunk boundary, pad the chunk with zeros
- if(chunk1 != chunk2) {
+ if (chunk1 != chunk2) {
// refetch the offset to keep in sync
offset_ = lseek(fd_, 0, SEEK_CUR);
int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
uint32_t getCurChunk();
// for changing the output file
- void resetOutputFile(int fd, std::string filename, long long offset);
+ void resetOutputFile(int fd, std::string filename, int64_t offset);
// Setter/Getter functions for user-controllable options
void setReadBuffSize(uint32_t readBuffSize) {