THRIFT-926. cpp: Fix bugs in TFileTransport::flush()
authorDavid Reiss <dreiss@apache.org>
Wed, 6 Oct 2010 17:10:33 +0000 (17:10 +0000)
committerDavid Reiss <dreiss@apache.org>
Wed, 6 Oct 2010 17:10:33 +0000 (17:10 +0000)
Previously flush() had race conditions that could cause it to return
before all data had actually been flushed to disk.  Now the writer
makes sure both buffer queues have been flushed when forceFlush_ is set.

Also, flush() did not wake up the writer thread, so it normally had to
wait for the writer thread to wake up on its own time.  (By default,
this could take up to 3 seconds.)

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005156 13f79535-47bb-0310-9956-ffa450edef68

lib/cpp/src/transport/TFileTransport.cpp
lib/cpp/src/transport/TFileTransport.h
lib/cpp/test/TFileTransportTest.cpp
lib/cpp/test/TransportTest.cpp

index 50232cf..4deb1aa 100644 (file)
@@ -204,10 +204,10 @@ void TFileTransport::write(const uint8_t* buf, uint32_t len) {
     throw TTransportException("TFileTransport: attempting to write to file opened readonly");
   }
 
-  enqueueEvent(buf, len, false);
+  enqueueEvent(buf, len);
 }
 
-void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
+void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) {
   // can't enqueue more events if file is going to close
   if (closing_) {
     return;
@@ -249,6 +249,11 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool bl
     pthread_cond_wait(&notFull_, &mutex_);
   }
 
+  // We shouldn't be trying to enqueue new data while a forced flush is
+  // requested.  (Otherwise the writer thread might not ever be able to finish
+  // the flush if more data keeps being enqueued.)
+  assert(!forceFlush_);
+
   // add to the buffer
   if (!enqueueBuffer_->addEvent(toEnqueue)) {
     delete toEnqueue;
@@ -259,10 +264,6 @@ void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool bl
   // signal anybody who's waiting for the buffer to be non-empty
   pthread_cond_signal(&notEmpty_);
 
-  if (blockUntilFlush) {
-    pthread_cond_wait(&flushed_, &mutex_);
-  }
-
   // this really should be a loop where it makes sure it got flushed
   // because condition variables can get triggered by the os for no reason
   // it is probably a non-factor for the time being
@@ -449,28 +450,67 @@ void TFileTransport::writerThread() {
       continue;
     }
 
-    bool flushTimeElapsed = false;
-    struct timespec current_time;
-    clock_gettime(CLOCK_REALTIME, &current_time);
-
-    if (current_time.tv_sec > ts_next_flush.tv_sec ||
-        (current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
-      flushTimeElapsed = true;
-      getNextFlushTime(&ts_next_flush);
+    // Local variable to cache the state of forceFlush_.
+    //
+    // We only want to check the value of forceFlush_ once each time around the
+    // loop.  If we check it more than once without holding the lock the entire
+    // time, it could have changed state in between.  This will result in us
+    // making inconsistent decisions.
+    bool forced_flush = false;
+    pthread_mutex_lock(&mutex_);
+    if (forceFlush_) {
+      if (!enqueueBuffer_->isEmpty()) {
+        // If forceFlush_ is true, we need to flush all available data.
+        // If enqueueBuffer_ is not empty, go back to the start of the loop to
+        // write it out.
+        //
+        // We know the main thread is waiting on forceFlush_ to be cleared,
+        // so no new events will be added to enqueueBuffer_ until we clear
+        // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
+        // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
+        // and clear forceFlush_ the next time around the loop.)
+        pthread_mutex_unlock(&mutex_);
+        continue;
+      }
+      forced_flush = true;
     }
+    pthread_mutex_unlock(&mutex_);
 
-    // couple of cases from which a flush could be triggered
-    if ((flushTimeElapsed && unflushed > 0) ||
-       unflushed > flushMaxBytes_ ||
-       forceFlush_) {
+    // determine if we need to perform an fsync
+    bool flush = false;
+    if (forced_flush || unflushed > flushMaxBytes_) {
+      flush = true;
+    } else {
+      struct timespec current_time;
+      clock_gettime(CLOCK_REALTIME, &current_time);
+      if (current_time.tv_sec > ts_next_flush.tv_sec ||
+          (current_time.tv_sec == ts_next_flush.tv_sec &&
+           current_time.tv_nsec > ts_next_flush.tv_nsec)) {
+        if (unflushed > 0) {
+          flush = true;
+        } else {
+          // If there is no new data since the last fsync,
+          // don't perform the fsync, but do reset the timer.
+          getNextFlushTime(&ts_next_flush);
+        }
+      }
+    }
 
+    if (flush) {
       // sync (force flush) file to disk
       fsync(fd_);
       unflushed = 0;
+      getNextFlushTime(&ts_next_flush);
 
       // notify anybody waiting for flush completion
-      forceFlush_ = false;
-      pthread_cond_broadcast(&flushed_);
+      if (forced_flush) {
+        pthread_mutex_lock(&mutex_);
+        forceFlush_ = false;
+        assert(enqueueBuffer_->isEmpty());
+        assert(dequeueBuffer_->isEmpty());
+        pthread_cond_broadcast(&flushed_);
+        pthread_mutex_unlock(&mutex_);
+      }
     }
   }
 }
@@ -483,7 +523,10 @@ void TFileTransport::flush() {
   // wait for flush to take place
   pthread_mutex_lock(&mutex_);
 
+  // Indicate that we are requesting a flush
   forceFlush_ = true;
+  // Wake up the writer thread so it will perform the flush immediately
+  pthread_cond_signal(&notEmpty_);
 
   while (forceFlush_) {
     pthread_cond_wait(&flushed_, &mutex_);
index f064b8e..2ea8c9a 100644 (file)
@@ -290,7 +290,7 @@ class TFileTransport : public TFileReaderTransport,
 
  private:
   // helper functions for writing to a file
-  void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
+  void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
   bool swapEventBuffers(struct timespec* deadline);
   bool initBufferAndWriteThread();
 
index a1827ec..4558344 100644 (file)
@@ -45,14 +45,15 @@ FsyncLog* fsync_log;
 // Provide BOOST_CHECK_LT() and BOOST_CHECK_GT(), in case we're compiled
 // with an older version of boost
 #ifndef BOOST_CHECK_LT
-#define BOOST_CHECK_CMP(a, b, op) \
-  BOOST_CHECK_MESSAGE((a) op (b), \
-                      "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
-                      BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" <<\
-                      (a) << " " BOOST_STRINGIZE(b) "=" << (b))
-
-#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <)
-#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >)
+#define BOOST_CHECK_CMP(a, b, op, check_fn) \
+  check_fn((a) op (b), \
+           "check " BOOST_STRINGIZE(a) " " BOOST_STRINGIZE(op) " " \
+           BOOST_STRINGIZE(b) " failed: " BOOST_STRINGIZE(a) "=" << (a) << \
+           " " BOOST_STRINGIZE(b) "=" << (b))
+
+#define BOOST_CHECK_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_CHECK_MESSAGE)
+#define BOOST_CHECK_GT(a, b) BOOST_CHECK_CMP(a, b, >, BOOST_CHECK_MESSAGE)
+#define BOOST_REQUIRE_LT(a, b) BOOST_CHECK_CMP(a, b, <, BOOST_REQUIRE_MESSAGE)
 #endif // BOOST_CHECK_LT
 
 /**
@@ -144,7 +145,6 @@ int fsync(int fd) {
   return 0;
 }
 
-
 int time_diff(const struct timeval* t1, const struct timeval* t2) {
   return (t2->tv_usec - t1->tv_usec) + (t2->tv_sec - t1->tv_sec) * 1000000;
 }
@@ -301,6 +301,39 @@ void test_flush_max_us3() {
   test_flush_max_us_impl(400000, 300000, 1000000);
 }
 
+/**
+ * Make sure flush() is fast when there is nothing to do.
+ *
+ * TFileTransport used to have a bug where flush() would wait for the fsync
+ * timeout to expire.
+ */
+void test_noop_flush() {
+  TempFile f(tmp_dir, "thrift.TFileTransportTest.");
+  TFileTransport transport(f.getPath());
+
+  // Write something to start the writer thread.
+  uint8_t buf[] = "a";
+  transport.write(buf, 1);
+
+  struct timeval start;
+  gettimeofday(&start, NULL);
+
+  for (unsigned int n = 0; n < 10; ++n) {
+    transport.flush();
+
+    struct timeval now;
+    gettimeofday(&now, NULL);
+
+    // Fail if at any point we've been running for longer than half a second.
+    // (With the buggy code, TFileTransport used to take 3 seconds per flush())
+    //
+    // Use a fatal fail so we break out early, rather than continuing to make
+    // many more slow flush() calls.
+    int delta = time_diff(&start, &now);
+    BOOST_REQUIRE_LT(delta, 500000);
+  }
+}
+
 /**************************************************************************
  * General Initialization
  **************************************************************************/
@@ -358,6 +391,7 @@ boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) {
   suite->add(BOOST_TEST_CASE(test_flush_max_us1));
   suite->add(BOOST_TEST_CASE(test_flush_max_us2));
   suite->add(BOOST_TEST_CASE(test_flush_max_us3));
+  suite->add(BOOST_TEST_CASE(test_noop_flush));
 
   return suite;
 }
index a932643..d6b40dd 100644 (file)
@@ -450,13 +450,13 @@ void test_rw(uint32_t totalSize,
   TEST_RW_BUF(CoupledTransports, 1024*1024*30, 0, 0); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163); \
-  TEST_RW_BUF(CoupledTransports, 1024*1024, 1, 1); \
+  TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1); \
   \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 0, 0, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, \
               rand4k, rand4k, rand4k, rand4k); \
   TEST_RW_BUF(CoupledTransports, 1024*1024*10, 167, 163, rand4k, rand4k); \
-  TEST_RW_BUF(CoupledTransports, 1024*1024*2, 1, 1, rand4k, rand4k);
+  TEST_RW_BUF(CoupledTransports, 1024*512, 1, 1, rand4k, rand4k);
 
 class TransportTestGen {
  public: