THRIFT-1361 Optional replacement of pthread by boost::thread
Patch: alexandre parenteau

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1178176 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFDTransport.cpp b/lib/cpp/src/transport/TFDTransport.cpp
index b1479fa..8a448fa 100644
--- a/lib/cpp/src/transport/TFDTransport.cpp
+++ b/lib/cpp/src/transport/TFDTransport.cpp
@@ -22,7 +22,9 @@
 
 #include <transport/TFDTransport.h>
 
+#ifdef HAVE_UNISTD_H
 #include <unistd.h>
+#endif
 
 using namespace std;
 
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index c6c3155..405c162 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -47,12 +47,17 @@
 #include <sys/stat.h>
 #endif
 
+#ifdef _WIN32
+#include <io.h>
+#endif
+
 namespace apache { namespace thrift { namespace transport {
 
 using boost::scoped_ptr;
 using boost::shared_ptr;
 using namespace std;
 using namespace apache::thrift::protocol;
+using namespace apache::thrift::concurrency;
 
 #ifndef HAVE_CLOCK_GETTIME
 
@@ -102,13 +107,10 @@
   , lastBadChunk_(0)
   , numCorruptedEventsInChunk_(0)
   , readOnly_(readOnly)
+  , notFull_(&mutex_)
+  , notEmpty_(&mutex_)
+  , flushed_(&mutex_)
 {
-  // initialize all the condition vars/mutexes
-  pthread_mutex_init(&mutex_, NULL);
-  pthread_cond_init(&notFull_, NULL);
-  pthread_cond_init(&notEmpty_, NULL);
-  pthread_cond_init(&flushed_, NULL);
-
   openLogFile();
 }
 
@@ -142,16 +144,25 @@
 
 TFileTransport::~TFileTransport() {
   // flush the buffer if a writer thread is active
+#ifdef USE_BOOST_THREAD
+  if(writerThreadId_.get()) {
+#else
   if (writerThreadId_ > 0) {
+#endif
     // set state to closing
     closing_ = true;
 
     // wake up the writer thread
     // Since closing_ is true, it will attempt to flush all data, then exit.
-    pthread_cond_signal(&notEmpty_);
+	notEmpty_.notify();
 
+#ifdef USE_BOOST_THREAD
+    writerThreadId_->join();
+	writerThreadId_.reset();
+#else
     pthread_join(writerThreadId_, NULL);
     writerThreadId_ = 0;
+#endif
   }
 
   if (dequeueBuffer_) {
@@ -191,12 +202,18 @@
     return false;
   }
 
+#ifdef USE_BOOST_THREAD
+  if(!writerThreadId_.get()) {
+    writerThreadId_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(startWriterThread, (void *)this)));
+  }
+#else
   if (writerThreadId_ == 0) {
     if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
       T_ERROR("%s", "Could not create writer thread");
       return false;
     }
   }
+#endif
 
   dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
   enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
@@ -242,20 +259,19 @@
   toEnqueue->eventSize_ = eventLen + 4;
 
   // lock mutex
-  pthread_mutex_lock(&mutex_);
+  Guard g(mutex_);
 
   // make sure that enqueue buffer is initialized and writer thread is running
   if (!bufferAndThreadInitialized_) {
     if (!initBufferAndWriteThread()) {
       delete toEnqueue;
-      pthread_mutex_unlock(&mutex_);
       return;
     }
   }
 
   // Can't enqueue while buffer is full
   while (enqueueBuffer_->isFull()) {
-    pthread_cond_wait(&notFull_, &mutex_);
+	  notFull_.wait();
   }
 
   // We shouldn't be trying to enqueue new data while a forced flush is
@@ -266,23 +282,21 @@
   // add to the buffer
   if (!enqueueBuffer_->addEvent(toEnqueue)) {
     delete toEnqueue;
-    pthread_mutex_unlock(&mutex_);
     return;
   }
 
   // signal anybody who's waiting for the buffer to be non-empty
-  pthread_cond_signal(&notEmpty_);
+  notEmpty_.notify();
 
   // this really should be a loop where it makes sure it got flushed
   // because condition variables can get triggered by the os for no reason
   // it is probably a non-factor for the time being
-  pthread_mutex_unlock(&mutex_);
 }
 
 bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
-  pthread_mutex_lock(&mutex_);
-
   bool swap;
+  Guard g(mutex_);
+
   if (!enqueueBuffer_->isEmpty()) {
     swap = true;
   } else if (closing_) {
@@ -292,10 +306,10 @@
   } else {
     if (deadline != NULL) {
       // if we were handed a deadline time struct, do a timed wait
-      pthread_cond_timedwait(&notEmpty_, &mutex_, deadline);
+      notEmpty_.waitForTime(deadline);
     } else {
       // just wait until the buffer gets an item
-      pthread_cond_wait(&notEmpty_, &mutex_);
+      notEmpty_.wait();
     }
 
     // could be empty if we timed out
@@ -308,11 +322,9 @@
     dequeueBuffer_ = temp;
   }
 
-  // unlock the mutex and signal if required
-  pthread_mutex_unlock(&mutex_);
 
   if (swap) {
-    pthread_cond_signal(&notFull_);
+	  notFull_.notify();
   }
 
   return swap;
@@ -340,7 +352,11 @@
       seekToEnd();
       // throw away any partial events
       offset_ += readState_.lastDispatchPtr_;
+#ifndef _WIN32
       ftruncate(fd_, offset_);
+#else
+      _chsize_s(fd_, offset_);
+#endif
       readState_.resetAllValues();
     } catch (...) {
       int errno_copy = errno;
@@ -358,12 +374,18 @@
     // this will only be true when the destructor is being invoked
     if (closing_) {
       if (hasIOError) {
-        pthread_exit(NULL);
+#ifndef USE_BOOST_THREAD
+		  pthread_exit(NULL);
+#else
+		  return;
+#endif
       }
 
       // Try to empty buffers before exit
       if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+#ifndef _WIN32
         fsync(fd_);
+#endif
         if (-1 == ::close(fd_)) {
           int errno_copy = errno;
           GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
@@ -371,8 +393,12 @@
           //fd successfully closed
           fd_ = 0;
         }
+#ifndef USE_BOOST_THREAD
         pthread_exit(NULL);
-      }
+#else
+        return;
+#endif
+	  }
     }
 
     if (swapEventBuffers(&ts_next_flush)) {
@@ -387,7 +413,11 @@
           T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
           usleep(writerThreadIOErrorSleepTime_);
           if (closing_) {
+#ifndef USE_BOOST_THREAD
             pthread_exit(NULL);
+#else
+            return;
+#endif
           }
           if (!fd_) {
             ::close(fd_);
@@ -467,7 +497,8 @@
     // time, it could have changed state in between.  This will result in us
     // making inconsistent decisions.
     bool forced_flush = false;
-    pthread_mutex_lock(&mutex_);
+	{
+    Guard g(mutex_);
     if (forceFlush_) {
       if (!enqueueBuffer_->isEmpty()) {
         // If forceFlush_ is true, we need to flush all available data.
@@ -479,12 +510,11 @@
         // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
         // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
         // and clear forceFlush_ the next time around the loop.)
-        pthread_mutex_unlock(&mutex_);
         continue;
       }
       forced_flush = true;
-    }
-    pthread_mutex_unlock(&mutex_);
+	}
+	}
 
     // determine if we need to perform an fsync
     bool flush = false;
@@ -508,18 +538,19 @@
 
     if (flush) {
       // sync (force flush) file to disk
+#ifndef _WIN32
       fsync(fd_);
+#endif
       unflushed = 0;
       getNextFlushTime(&ts_next_flush);
 
       // notify anybody waiting for flush completion
       if (forced_flush) {
-        pthread_mutex_lock(&mutex_);
+        Guard g(mutex_);
         forceFlush_ = false;
         assert(enqueueBuffer_->isEmpty());
         assert(dequeueBuffer_->isEmpty());
-        pthread_cond_broadcast(&flushed_);
-        pthread_mutex_unlock(&mutex_);
+		flushed_.notifyAll();
       }
     }
   }
@@ -527,22 +558,26 @@
 
 void TFileTransport::flush() {
   // file must be open for writing for any flushing to take place
+#ifdef USE_BOOST_THREAD
+  if (!writerThreadId_.get()) {
+    return;
+  }
+#else
   if (writerThreadId_ <= 0) {
     return;
   }
+#endif
   // wait for flush to take place
-  pthread_mutex_lock(&mutex_);
+  Guard g(mutex_);
 
   // Indicate that we are requesting a flush
   forceFlush_ = true;
   // Wake up the writer thread so it will perform the flush immediately
-  pthread_cond_signal(&notEmpty_);
+  notEmpty_.notify();
 
   while (forceFlush_) {
-    pthread_cond_wait(&flushed_, &mutex_);
+    flushed_.wait();
   }
-
-  pthread_mutex_unlock(&mutex_);
 }
 
 
@@ -892,9 +927,15 @@
 
 // Utility Functions
 void TFileTransport::openLogFile() {
+#ifndef _WIN32
   mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
   int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
   fd_ = ::open(filename_.c_str(), flags, mode);
+#else
+  int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
+  int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
+  fd_ = ::_open(filename_.c_str(), flags, mode);
+#endif
   offset_ = 0;
 
   // make sure open call was successful
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 2ea8c9a..b0e48d1 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -27,15 +27,26 @@
 #include <string>
 #include <stdio.h>
 
+#ifdef HAVE_PTHREAD_H
 #include <pthread.h>
+#endif
+
+#ifdef USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#endif
 
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 
+#include "concurrency/Mutex.h"
+#include "concurrency/Monitor.h"
+
 namespace apache { namespace thrift { namespace transport {
 
 using apache::thrift::TProcessor;
 using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Monitor;
 
 // Data pertaining to a single event
 typedef struct eventInfo {
@@ -360,7 +371,11 @@
   static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
 
   // writer thread id
-  pthread_t writerThreadId_;
+#ifdef USE_BOOST_THREAD
+	std::auto_ptr<boost::thread> writerThreadId_;
+#else
+	pthread_t writerThreadId_;
+#endif
 
   // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
   // needs to be written to the file.  The buffers are swapped by the writer thread.
@@ -368,15 +383,15 @@
   TFileTransportBuffer *enqueueBuffer_;
 
   // conditions used to block when the buffer is full or empty
-  pthread_cond_t notFull_, notEmpty_;
+  Monitor notFull_, notEmpty_;
   volatile bool closing_;
 
   // To keep track of whether the buffer has been flushed
-  pthread_cond_t flushed_;
+  Monitor flushed_;
   volatile bool forceFlush_;
 
   // Mutex that is grabbed when enqueueing and swapping the read/write buffers
-  pthread_mutex_t mutex_;
+  Mutex mutex_;
 
   // File information
   std::string filename_;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 2db8f8b..a0cc77a 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -496,6 +496,12 @@
     }
     #endif
 
+#ifdef _WIN32
+    if(errno_copy == WSAECONNRESET) {
+      return 0; // EOF
+    }
+#endif
+
     // Now it's not a try again case, but a real probblez
     GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);