From: David Reiss Date: Wed, 6 Oct 2010 17:10:33 +0000 (+0000) Subject: THRIFT-926. cpp: Fix bugs in TFileTransport::flush() X-Git-Tag: 0.6.0~104 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=4f9efdb460a7c54cc55b25f2c6b8955a5fcff229;p=common%2Fthrift.git THRIFT-926. cpp: Fix bugs in TFileTransport::flush() Previously flush() had race conditions that could cause it to return before all data had actually been flushed to disk. Now the writer makes sure both buffer queues have been flushed when forceFlush_ is set. Also, flush() did not wake up the writer thread, so it normally had to wait for the writer thread to wake up on its own time. (By default, this could take up to 3 seconds.) git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005156 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index 50232cf2..4deb1aa2 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -204,10 +204,10 @@ void TFileTransport::write(const uint8_t* buf, uint32_t len) { throw TTransportException("TFileTransport: attempting to write to file opened readonly"); } - enqueueEvent(buf, len, false); + enqueueEvent(buf, len); } -void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { +void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { // can't enqueue more events if file is going to close if (closing_) { return; @@ -249,6 +249,11 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool bl pthread_cond_wait(¬Full_, &mutex_); } + // We shouldn't be trying to enqueue new data while a forced flush is + // requested. (Otherwise the writer thread might not ever be able to finish + // the flush if more data keeps being enqueued.) + assert(!forceFlush_); + // add to the buffer if (!enqueueBuffer_->addEvent(toEnqueue)) { delete toEnqueue; @@ -259,10 +264,6 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool bl // signal anybody who's waiting for the buffer to be non-empty pthread_cond_signal(¬Empty_); - if (blockUntilFlush) { - pthread_cond_wait(&flushed_, &mutex_); - } - // this really should be a loop where it makes sure it got flushed // because condition variables can get triggered by the os for no reason // it is probably a non-factor for the time being @@ -449,28 +450,67 @@ void TFileTransport::writerThread() { continue; } - bool flushTimeElapsed = false; - struct timespec current_time; - clock_gettime(CLOCK_REALTIME, ¤t_time); - - if (current_time.tv_sec > ts_next_flush.tv_sec || - (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) { - flushTimeElapsed = true; - getNextFlushTime(&ts_next_flush); + // Local variable to cache the state of forceFlush_. + // + // We only want to check the value of forceFlush_ once each time around the + // loop. If we check it more than once without holding the lock the entire + // time, it could have changed state in between. This will result in us + // making inconsistent decisions. + bool forced_flush = false; + pthread_mutex_lock(&mutex_); + if (forceFlush_) { + if (!enqueueBuffer_->isEmpty()) { + // If forceFlush_ is true, we need to flush all available data. + // If enqueueBuffer_ is not empty, go back to the start of the loop to + // write it out. + // + // We know the main thread is waiting on forceFlush_ to be cleared, + // so no new events will be added to enqueueBuffer_ until we clear + // forceFlush_. Therefore the next time around the loop enqueueBuffer_ + // is guaranteed to be empty. (I.e., we're guaranteed to make progress + // and clear forceFlush_ the next time around the loop.) + pthread_mutex_unlock(&mutex_); + continue; + } + forced_flush = true; } + pthread_mutex_unlock(&mutex_); - // couple of cases from which a flush could be triggered - if ((flushTimeElapsed && unflushed > 0) || - unflushed > flushMaxBytes_ || - forceFlush_) { + // determine if we need to perform an fsync + bool flush = false; + if (forced_flush || unflushed > flushMaxBytes_) { + flush = true; + } else { + struct timespec current_time; + clock_gettime(CLOCK_REALTIME, ¤t_time); + if (current_time.tv_sec > ts_next_flush.tv_sec || + (current_time.tv_sec == ts_next_flush.tv_sec && + current_time.tv_nsec > ts_next_flush.tv_nsec)) { + if (unflushed > 0) { + flush = true; + } else { + // If there is no new data since the last fsync, + // don't perform the fsync, but do reset the timer. + getNextFlushTime(&ts_next_flush); + } + } + } + if (flush) { // sync (force flush) file to disk fsync(fd_); unflushed = 0; + getNextFlushTime(&ts_next_flush); // notify anybody waiting for flush completion - forceFlush_ = false; - pthread_cond_broadcast(&flushed_); + if (forced_flush) { + pthread_mutex_lock(&mutex_); + forceFlush_ = false; + assert(enqueueBuffer_->isEmpty()); + assert(dequeueBuffer_->isEmpty()); + pthread_cond_broadcast(&flushed_); + pthread_mutex_unlock(&mutex_); + } } } } @@ -483,7 +523,10 @@ void TFileTransport::flush() { // wait for flush to take place pthread_mutex_lock(&mutex_); + // Indicate that we are requesting a flush forceFlush_ = true; + // Wake up the writer thread so it will perform the flush immediately + pthread_cond_signal(¬Empty_); while (forceFlush_) { pthread_cond_wait(&flushed_, &mutex_); diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index f064b8eb..2ea8c9af 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -290,7 +290,7 @@ class TFileTransport : public TFileReaderTransport, private: // helper functions for writing to a file - void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); + void enqueueEvent(const uint8_t* buf, uint32_t eventLen); bool swapEventBuffers(struct timespec* deadline); bool initBufferAndWriteThread(); diff --git a/lib/cpp/test/TFileTransportTest.cpp b/lib/cpp/test/TFileTransportTest.cpp index a1827ecf..45583449 100644 --- a/lib/cpp/test/TFileTransportTest.cpp +++ b/lib/cpp/test/TFileTransportTest.cpp @@ -45,14 +45,15 @@ FsyncLog* fsync_log; // Provide BOOST_CHECK_LT() and BOOST_CHECK_GT(), in case we're compiled // with an older version of boost #ifndef BOOST_CHECK_LT -#define BOOST_CHECK_CMP(a, b, op) \ - BOOST_CHECK_MESSAGE((a) op (b), \ - "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \ - BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" <<\ - (a) << " " BOOST_STRINGIZE(b) "=" << (b)) - -#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <) -#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >) +#define BOOST_CHECK_CMP(a, b, op, check_fn) \ + check_fn((a) op (b), \ + "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \ + BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" << (a) << \ + " " BOOST_STRINGIZE(b) "=" << (b)) + +#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_CHECK_MESSAGE) +#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >, BOOST_CHECK_MESSAGE) +#define BOOST_REQUIRE_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_REQUIRE_MESSAGE) #endif // BOOST_CHECK_LT /** @@ -144,7 +145,6 @@ int fsync(int fd) { return 0; } - int time_diff(const struct timeval* t1, const struct timeval* t2) { return (t2->tv_usec - t1->tv_usec) + (t2->tv_sec - t1->tv_sec) * 1000000; } @@ -301,6 +301,39 @@ void test_flush_max_us3() { test_flush_max_us_impl(400000, 300000, 1000000); } +/** + * Make sure flush() is fast when there is nothing to do. + * + * TFileTransport used to have a bug where flush() would wait for the fsync + * timeout to expire. + */ +void test_noop_flush() { + TempFile f(tmp_dir, "thrift.TFileTransportTest."); + TFileTransport transport(f.getPath()); + + // Write something to start the writer thread. + uint8_t buf[] = "a"; + transport.write(buf, 1); + + struct timeval start; + gettimeofday(&start, NULL); + + for (unsigned int n = 0; n < 10; ++n) { + transport.flush(); + + struct timeval now; + gettimeofday(&now, NULL); + + // Fail if at any point we've been running for longer than half a second. + // (With the buggy code, TFileTransport used to take 3 seconds per flush()) + // + // Use a fatal fail so we break out early, rather than continuing to make + // many more slow flush() calls. + int delta = time_diff(&start, &now); + BOOST_REQUIRE_LT(delta, 500000); + } +} + /************************************************************************** * General Initialization **************************************************************************/ @@ -358,6 +391,7 @@ boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) { suite->add(BOOST_TEST_CASE(test_flush_max_us1)); suite->add(BOOST_TEST_CASE(test_flush_max_us2)); suite->add(BOOST_TEST_CASE(test_flush_max_us3)); + suite->add(BOOST_TEST_CASE(test_noop_flush)); return suite; } diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp index a932643c..d6b40dda 100644 --- a/lib/cpp/test/TransportTest.cpp +++ b/lib/cpp/test/TransportTest.cpp @@ -450,13 +450,13 @@ void test_rw(uint32_t totalSize, TEST_RW_BUF(CoupledTransports, 1024*1024*30, 0, 0); \ TEST_RW_BUF(CoupledTransports, 1024*1024*10, rand4k, rand4k); \ TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163); \ - TEST_RW_BUF(CoupledTransports, 1024*1024, 1, 1); \ + TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1); \ \ TEST_RW_BUF(CoupledTransports, 1024*1024*10, 0, 0, rand4k, rand4k); \ TEST_RW_BUF(CoupledTransports, 1024*1024*10, \ rand4k, rand4k, rand4k, rand4k); \ TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163, rand4k, rand4k); \ - TEST_RW_BUF(CoupledTransports, 1024*1024*2, 1, 1, rand4k, rand4k); + TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1, rand4k, rand4k); class TransportTestGen { public: