THRIFT-1361 Optional replacement of pthread by boost::thread
Patch: alexandre parenteau
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1178176 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TFDTransport.cpp b/lib/cpp/src/transport/TFDTransport.cpp
index b1479fa..8a448fa 100644
--- a/lib/cpp/src/transport/TFDTransport.cpp
+++ b/lib/cpp/src/transport/TFDTransport.cpp
@@ -22,7 +22,9 @@
#include <transport/TFDTransport.h>
+#ifdef HAVE_UNISTD_H
#include <unistd.h>
+#endif
using namespace std;
diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp
index c6c3155..405c162 100644
--- a/lib/cpp/src/transport/TFileTransport.cpp
+++ b/lib/cpp/src/transport/TFileTransport.cpp
@@ -47,12 +47,17 @@
#include <sys/stat.h>
#endif
+#ifdef _WIN32
+#include <io.h>
+#endif
+
namespace apache { namespace thrift { namespace transport {
using boost::scoped_ptr;
using boost::shared_ptr;
using namespace std;
using namespace apache::thrift::protocol;
+using namespace apache::thrift::concurrency;
#ifndef HAVE_CLOCK_GETTIME
@@ -102,13 +107,10 @@
, lastBadChunk_(0)
, numCorruptedEventsInChunk_(0)
, readOnly_(readOnly)
+ , notFull_(&mutex_)
+ , notEmpty_(&mutex_)
+ , flushed_(&mutex_)
{
- // initialize all the condition vars/mutexes
- pthread_mutex_init(&mutex_, NULL);
- pthread_cond_init(¬Full_, NULL);
- pthread_cond_init(¬Empty_, NULL);
- pthread_cond_init(&flushed_, NULL);
-
openLogFile();
}
@@ -142,16 +144,25 @@
TFileTransport::~TFileTransport() {
// flush the buffer if a writer thread is active
+#ifdef USE_BOOST_THREAD
+ if(writerThreadId_.get()) {
+#else
if (writerThreadId_ > 0) {
+#endif
// set state to closing
closing_ = true;
// wake up the writer thread
// Since closing_ is true, it will attempt to flush all data, then exit.
- pthread_cond_signal(¬Empty_);
+ notEmpty_.notify();
+#ifdef USE_BOOST_THREAD
+ writerThreadId_->join();
+ writerThreadId_.reset();
+#else
pthread_join(writerThreadId_, NULL);
writerThreadId_ = 0;
+#endif
}
if (dequeueBuffer_) {
@@ -191,12 +202,18 @@
return false;
}
+#ifdef USE_BOOST_THREAD
+ if(!writerThreadId_.get()) {
+ writerThreadId_ = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(startWriterThread, (void *)this)));
+ }
+#else
if (writerThreadId_ == 0) {
if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
T_ERROR("%s", "Could not create writer thread");
return false;
}
}
+#endif
dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
@@ -242,20 +259,19 @@
toEnqueue->eventSize_ = eventLen + 4;
// lock mutex
- pthread_mutex_lock(&mutex_);
+ Guard g(mutex_);
// make sure that enqueue buffer is initialized and writer thread is running
if (!bufferAndThreadInitialized_) {
if (!initBufferAndWriteThread()) {
delete toEnqueue;
- pthread_mutex_unlock(&mutex_);
return;
}
}
// Can't enqueue while buffer is full
while (enqueueBuffer_->isFull()) {
- pthread_cond_wait(¬Full_, &mutex_);
+ notFull_.wait();
}
// We shouldn't be trying to enqueue new data while a forced flush is
@@ -266,23 +282,21 @@
// add to the buffer
if (!enqueueBuffer_->addEvent(toEnqueue)) {
delete toEnqueue;
- pthread_mutex_unlock(&mutex_);
return;
}
// signal anybody who's waiting for the buffer to be non-empty
- pthread_cond_signal(¬Empty_);
+ notEmpty_.notify();
// this really should be a loop where it makes sure it got flushed
// because condition variables can get triggered by the os for no reason
// it is probably a non-factor for the time being
- pthread_mutex_unlock(&mutex_);
}
bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
- pthread_mutex_lock(&mutex_);
-
bool swap;
+ Guard g(mutex_);
+
if (!enqueueBuffer_->isEmpty()) {
swap = true;
} else if (closing_) {
@@ -292,10 +306,10 @@
} else {
if (deadline != NULL) {
// if we were handed a deadline time struct, do a timed wait
- pthread_cond_timedwait(¬Empty_, &mutex_, deadline);
+ notEmpty_.waitForTime(deadline);
} else {
// just wait until the buffer gets an item
- pthread_cond_wait(¬Empty_, &mutex_);
+ notEmpty_.wait();
}
// could be empty if we timed out
@@ -308,11 +322,9 @@
dequeueBuffer_ = temp;
}
- // unlock the mutex and signal if required
- pthread_mutex_unlock(&mutex_);
if (swap) {
- pthread_cond_signal(¬Full_);
+ notFull_.notify();
}
return swap;
@@ -340,7 +352,11 @@
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
+#ifndef _WIN32
ftruncate(fd_, offset_);
+#else
+ _chsize_s(fd_, offset_);
+#endif
readState_.resetAllValues();
} catch (...) {
int errno_copy = errno;
@@ -358,12 +374,18 @@
// this will only be true when the destructor is being invoked
if (closing_) {
if (hasIOError) {
- pthread_exit(NULL);
+#ifndef USE_BOOST_THREAD
+ pthread_exit(NULL);
+#else
+ return;
+#endif
}
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+#ifndef _WIN32
fsync(fd_);
+#endif
if (-1 == ::close(fd_)) {
int errno_copy = errno;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
@@ -371,8 +393,12 @@
//fd successfully closed
fd_ = 0;
}
+#ifndef USE_BOOST_THREAD
pthread_exit(NULL);
- }
+#else
+ return;
+#endif
+ }
}
if (swapEventBuffers(&ts_next_flush)) {
@@ -387,7 +413,11 @@
T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
usleep(writerThreadIOErrorSleepTime_);
if (closing_) {
+#ifndef USE_BOOST_THREAD
pthread_exit(NULL);
+#else
+ return;
+#endif
}
if (!fd_) {
::close(fd_);
@@ -467,7 +497,8 @@
// time, it could have changed state in between. This will result in us
// making inconsistent decisions.
bool forced_flush = false;
- pthread_mutex_lock(&mutex_);
+ {
+ Guard g(mutex_);
if (forceFlush_) {
if (!enqueueBuffer_->isEmpty()) {
// If forceFlush_ is true, we need to flush all available data.
@@ -479,12 +510,11 @@
// forceFlush_. Therefore the next time around the loop enqueueBuffer_
// is guaranteed to be empty. (I.e., we're guaranteed to make progress
// and clear forceFlush_ the next time around the loop.)
- pthread_mutex_unlock(&mutex_);
continue;
}
forced_flush = true;
- }
- pthread_mutex_unlock(&mutex_);
+ }
+ }
// determine if we need to perform an fsync
bool flush = false;
@@ -508,18 +538,19 @@
if (flush) {
// sync (force flush) file to disk
+#ifndef _WIN32
fsync(fd_);
+#endif
unflushed = 0;
getNextFlushTime(&ts_next_flush);
// notify anybody waiting for flush completion
if (forced_flush) {
- pthread_mutex_lock(&mutex_);
+ Guard g(mutex_);
forceFlush_ = false;
assert(enqueueBuffer_->isEmpty());
assert(dequeueBuffer_->isEmpty());
- pthread_cond_broadcast(&flushed_);
- pthread_mutex_unlock(&mutex_);
+ flushed_.notifyAll();
}
}
}
@@ -527,22 +558,26 @@
void TFileTransport::flush() {
// file must be open for writing for any flushing to take place
+#ifdef USE_BOOST_THREAD
+ if (!writerThreadId_.get()) {
+ return;
+ }
+#else
if (writerThreadId_ <= 0) {
return;
}
+#endif
// wait for flush to take place
- pthread_mutex_lock(&mutex_);
+ Guard g(mutex_);
// Indicate that we are requesting a flush
forceFlush_ = true;
// Wake up the writer thread so it will perform the flush immediately
- pthread_cond_signal(¬Empty_);
+ notEmpty_.notify();
while (forceFlush_) {
- pthread_cond_wait(&flushed_, &mutex_);
+ flushed_.wait();
}
-
- pthread_mutex_unlock(&mutex_);
}
@@ -892,9 +927,15 @@
// Utility Functions
void TFileTransport::openLogFile() {
+#ifndef _WIN32
mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
fd_ = ::open(filename_.c_str(), flags, mode);
+#else
+ int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
+ int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
+ fd_ = ::_open(filename_.c_str(), flags, mode);
+#endif
offset_ = 0;
// make sure open call was successful
diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h
index 2ea8c9a..b0e48d1 100644
--- a/lib/cpp/src/transport/TFileTransport.h
+++ b/lib/cpp/src/transport/TFileTransport.h
@@ -27,15 +27,26 @@
#include <string>
#include <stdio.h>
+#ifdef HAVE_PTHREAD_H
#include <pthread.h>
+#endif
+
+#ifdef USE_BOOST_THREAD
+#include <boost/thread.hpp>
+#endif
#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
+#include "concurrency/Mutex.h"
+#include "concurrency/Monitor.h"
+
namespace apache { namespace thrift { namespace transport {
using apache::thrift::TProcessor;
using apache::thrift::protocol::TProtocolFactory;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Monitor;
// Data pertaining to a single event
typedef struct eventInfo {
@@ -360,7 +371,11 @@
static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
// writer thread id
- pthread_t writerThreadId_;
+#ifdef USE_BOOST_THREAD
+ std::auto_ptr<boost::thread> writerThreadId_;
+#else
+ pthread_t writerThreadId_;
+#endif
// buffers to hold data before it is flushed. Each element of the buffer stores a msg that
// needs to be written to the file. The buffers are swapped by the writer thread.
@@ -368,15 +383,15 @@
TFileTransportBuffer *enqueueBuffer_;
// conditions used to block when the buffer is full or empty
- pthread_cond_t notFull_, notEmpty_;
+ Monitor notFull_, notEmpty_;
volatile bool closing_;
// To keep track of whether the buffer has been flushed
- pthread_cond_t flushed_;
+ Monitor flushed_;
volatile bool forceFlush_;
// Mutex that is grabbed when enqueueing and swapping the read/write buffers
- pthread_mutex_t mutex_;
+ Mutex mutex_;
// File information
std::string filename_;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 2db8f8b..a0cc77a 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -496,6 +496,12 @@
}
#endif
+#ifdef _WIN32
+ if(errno_copy == WSAECONNRESET) {
+ return 0; // EOF
+ }
+#endif
+
// Now it's not a try again case, but a real probblez
GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);