throw TTransportException("TFileTransport: attempting to write to file opened readonly");
}
- enqueueEvent(buf, len, false);
+ enqueueEvent(buf, len);
}
-void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
// can't enqueue more events if file is going to close
if (closing_) {
return;
pthread_cond_wait(¬Full_, &mutex_);
}
+ // We shouldn't be trying to enqueue new data while a forced flush is
+ // requested. (Otherwise the writer thread might not ever be able to finish
+ // the flush if more data keeps being enqueued.)
+ assert(!forceFlush_);
+
// add to the buffer
if (!enqueueBuffer_->addEvent(toEnqueue)) {
delete toEnqueue;
// signal anybody who's waiting for the buffer to be non-empty
pthread_cond_signal(¬Empty_);
- if (blockUntilFlush) {
- pthread_cond_wait(&flushed_, &mutex_);
- }
-
// this really should be a loop where it makes sure it got flushed
// because condition variables can get triggered by the os for no reason
// it is probably a non-factor for the time being
continue;
}
- bool flushTimeElapsed = false;
- struct timespec current_time;
- clock_gettime(CLOCK_REALTIME, ¤t_time);
-
- if (current_time.tv_sec > ts_next_flush.tv_sec ||
- (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
- flushTimeElapsed = true;
- getNextFlushTime(&ts_next_flush);
+ // Local variable to cache the state of forceFlush_.
+ //
+ // We only want to check the value of forceFlush_ once each time around the
+ // loop. If we check it more than once without holding the lock the entire
+ // time, it could have changed state in between. This will result in us
+ // making inconsistent decisions.
+ bool forced_flush = false;
+ pthread_mutex_lock(&mutex_);
+ if (forceFlush_) {
+ if (!enqueueBuffer_->isEmpty()) {
+ // If forceFlush_ is true, we need to flush all available data.
+ // If enqueueBuffer_ is not empty, go back to the start of the loop to
+ // write it out.
+ //
+ // We know the main thread is waiting on forceFlush_ to be cleared,
+ // so no new events will be added to enqueueBuffer_ until we clear
+ // forceFlush_. Therefore the next time around the loop enqueueBuffer_
+ // is guaranteed to be empty. (I.e., we're guaranteed to make progress
+ // and clear forceFlush_ the next time around the loop.)
+ pthread_mutex_unlock(&mutex_);
+ continue;
+ }
+ forced_flush = true;
}
+ pthread_mutex_unlock(&mutex_);
- // couple of cases from which a flush could be triggered
- if ((flushTimeElapsed && unflushed > 0) ||
- unflushed > flushMaxBytes_ ||
- forceFlush_) {
+ // determine if we need to perform an fsync
+ bool flush = false;
+ if (forced_flush || unflushed > flushMaxBytes_) {
+ flush = true;
+ } else {
+ struct timespec current_time;
+ clock_gettime(CLOCK_REALTIME, ¤t_time);
+ if (current_time.tv_sec > ts_next_flush.tv_sec ||
+ (current_time.tv_sec == ts_next_flush.tv_sec &&
+ current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+ if (unflushed > 0) {
+ flush = true;
+ } else {
+ // If there is no new data since the last fsync,
+ // don't perform the fsync, but do reset the timer.
+ getNextFlushTime(&ts_next_flush);
+ }
+ }
+ }
+ if (flush) {
// sync (force flush) file to disk
fsync(fd_);
unflushed = 0;
+ getNextFlushTime(&ts_next_flush);
// notify anybody waiting for flush completion
- forceFlush_ = false;
- pthread_cond_broadcast(&flushed_);
+ if (forced_flush) {
+ pthread_mutex_lock(&mutex_);
+ forceFlush_ = false;
+ assert(enqueueBuffer_->isEmpty());
+ assert(dequeueBuffer_->isEmpty());
+ pthread_cond_broadcast(&flushed_);
+ pthread_mutex_unlock(&mutex_);
+ }
}
}
}
// wait for flush to take place
pthread_mutex_lock(&mutex_);
+ // Indicate that we are requesting a flush
forceFlush_ = true;
+ // Wake up the writer thread so it will perform the flush immediately
+ pthread_cond_signal(¬Empty_);
while (forceFlush_) {
pthread_cond_wait(&flushed_, &mutex_);
private:
// helper functions for writing to a file
- void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+ void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
bool swapEventBuffers(struct timespec* deadline);
bool initBufferAndWriteThread();
// Provide BOOST_CHECK_LT() and BOOST_CHECK_GT(), in case we're compiled
// with an older version of boost
#ifndef BOOST_CHECK_LT
-#define BOOST_CHECK_CMP(a, b, op) \
- BOOST_CHECK_MESSAGE((a) op (b), \
- "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
- BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" <<\
- (a) << " " BOOST_STRINGIZE(b) "=" << (b))
-
-#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <)
-#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >)
+#define BOOST_CHECK_CMP(a, b, op, check_fn) \
+ check_fn((a) op (b), \
+ "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
+ BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" << (a) << \
+ " " BOOST_STRINGIZE(b) "=" << (b))
+
+#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_CHECK_MESSAGE)
+#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >, BOOST_CHECK_MESSAGE)
+#define BOOST_REQUIRE_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_REQUIRE_MESSAGE)
#endif // BOOST_CHECK_LT
/**
return 0;
}
-
int time_diff(const struct timeval* t1, const struct timeval* t2) {
return (t2->tv_usec - t1->tv_usec) + (t2->tv_sec - t1->tv_sec) * 1000000;
}
test_flush_max_us_impl(400000, 300000, 1000000);
}
+/**
+ * Make sure flush() is fast when there is nothing to do.
+ *
+ * TFileTransport used to have a bug where flush() would wait for the fsync
+ * timeout to expire.
+ */
+void test_noop_flush() {
+ TempFile f(tmp_dir, "thrift.TFileTransportTest.");
+ TFileTransport transport(f.getPath());
+
+ // Write something to start the writer thread.
+ uint8_t buf[] = "a";
+ transport.write(buf, 1);
+
+ struct timeval start;
+ gettimeofday(&start, NULL);
+
+ for (unsigned int n = 0; n < 10; ++n) {
+ transport.flush();
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+
+ // Fail if at any point we've been running for longer than half a second.
+ // (With the buggy code, TFileTransport used to take 3 seconds per flush())
+ //
+ // Use a fatal fail so we break out early, rather than continuing to make
+ // many more slow flush() calls.
+ int delta = time_diff(&start, &now);
+ BOOST_REQUIRE_LT(delta, 500000);
+ }
+}
+
/**************************************************************************
* General Initialization
**************************************************************************/
suite->add(BOOST_TEST_CASE(test_flush_max_us1));
suite->add(BOOST_TEST_CASE(test_flush_max_us2));
suite->add(BOOST_TEST_CASE(test_flush_max_us3));
+ suite->add(BOOST_TEST_CASE(test_noop_flush));
return suite;
}
TEST_RW_BUF(CoupledTransports, 1024*1024*30, 0, 0); \
TEST_RW_BUF(CoupledTransports, 1024*1024*10, rand4k, rand4k); \
TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163); \
- TEST_RW_BUF(CoupledTransports, 1024*1024, 1, 1); \
+ TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1); \
\
TEST_RW_BUF(CoupledTransports, 1024*1024*10, 0, 0, rand4k, rand4k); \
TEST_RW_BUF(CoupledTransports, 1024*1024*10, \
rand4k, rand4k, rand4k, rand4k); \
TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163, rand4k, rand4k); \
- TEST_RW_BUF(CoupledTransports, 1024*1024*2, 1, 1, rand4k, rand4k);
+ TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1, rand4k, rand4k);
class TransportTestGen {
public: