#ifdef _WIN32
# define THRIFT_GET_SOCKET_ERROR ::WSAGetLastError()
+# define THRIFT_ERRNO (*_errno())
# define THRIFT_EINPROGRESS WSAEINPROGRESS
# define THRIFT_EAGAIN WSAEWOULDBLOCK
# define THRIFT_EINTR WSAEINTR
# define THRIFT_F_SETFL 1
# define THRIFT_GETTIMEOFDAY thrift_gettimeofday
# define THRIFT_CLOSESOCKET closesocket
+# define THRIFT_CLOSE _close
+# define THRIFT_OPEN _open
+# define THRIFT_FTRUNCATE _chsize_s
+# define THRIFT_FSYNC _commit
+# define THRIFT_LSEEK _lseek
+# define THRIFT_WRITE _write
+# define THRIFT_READ _read
+# define THRIFT_FSTAT _fstat
+# define THRIFT_STAT _stat
# define THRIFT_GAI_STRERROR gai_strerrorA
# define THRIFT_SSIZET ptrdiff_t
# define THRIFT_SNPRINTF _snprintf
#else //not _WIN32
# include <errno.h>
# define THRIFT_GET_SOCKET_ERROR errno
+# define THRIFT_ERRNO errno
# define THRIFT_EINTR EINTR
# define THRIFT_EINPROGRESS EINPROGRESS
# define THRIFT_ECONNRESET ECONNRESET
# define THRIFT_F_SETFL F_SETFL
# define THRIFT_GETTIMEOFDAY gettimeofday
# define THRIFT_CLOSESOCKET close
+# define THRIFT_CLOSE close
+# define THRIFT_OPEN open
+# define THRIFT_FTRUNCATE ftruncate
+# define THRIFT_FSYNC fsync
+# define THRIFT_LSEEK lseek
+# define THRIFT_WRITE write
+# define THRIFT_READ read
+# define THRIFT_STAT stat
+# define THRIFT_FSTAT fstat
# define THRIFT_GAI_STRERROR gai_strerror
# define THRIFT_SSIZET ssize_t
# define THRIFT_SNPRINTF snprintf
return;
}
- int rv = ::THRIFT_CLOSESOCKET(fd_);
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int rv = ::THRIFT_CLOSE(fd_);
+ int errno_copy = THRIFT_ERRNO;
fd_ = -1;
// Have to check uncaught_exception because this is called in the destructor.
if (rv < 0 && !std::uncaught_exception()) {
unsigned int maxRetries = 5; // same as the TSocket default
unsigned int retries = 0;
while (true) {
- THRIFT_SSIZET rv = ::read(fd_, buf, len);
+ THRIFT_SSIZET rv = ::THRIFT_READ(fd_, buf, len);
if (rv < 0) {
- if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) {
+ if (THRIFT_ERRNO == THRIFT_EINTR && retries < maxRetries) {
// If interrupted, try again
++retries;
continue;
}
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::read()",
errno_copy);
void TFDTransport::write(const uint8_t* buf, uint32_t len) {
while (len > 0) {
- THRIFT_SSIZET rv = ::write(fd_, buf, len);
+ THRIFT_SSIZET rv = ::THRIFT_WRITE(fd_, buf, len);
if (rv < 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
throw TTransportException(TTransportException::UNKNOWN,
"TFDTransport::write()",
errno_copy);
// flush any events in the queue
flush();
GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
- if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (-1 == ::THRIFT_CLOSE(fd_)) {
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
} else {
// close logfile
if (fd_ > 0) {
- if(-1 == ::THRIFT_CLOSESOCKET(fd_)) {
- GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR);
+ if(-1 == ::THRIFT_CLOSE(fd_)) {
+ GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_ERRNO);
} else {
//successfully closed fd
fd_ = 0;
try {
openLogFile();
} catch (...) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
fd_ = 0;
hasIOError = true;
seekToEnd();
// throw away any partial events
offset_ += readState_.lastDispatchPtr_;
-#ifndef _WIN32
- ftruncate(fd_, offset_);
-#else
- _chsize_s(fd_, offset_);
-#endif
+ THRIFT_FTRUNCATE(fd_, offset_);
readState_.resetAllValues();
} catch (...) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
hasIOError = true;
}
// Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
-#ifndef _WIN32
- fsync(fd_);
-#endif
- if (-1 == ::THRIFT_CLOSESOCKET(fd_)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ ::THRIFT_FSYNC(fd_);
+ if (-1 == ::THRIFT_CLOSE(fd_)) {
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
} else {
//fd successfully closed
return;
}
if (!fd_) {
- ::THRIFT_CLOSESOCKET(fd_);
+ ::THRIFT_CLOSE(fd_);
fd_ = 0;
}
try {
// if adding this event will cross a chunk boundary, pad the chunk with zeros
if (chunk1 != chunk2) {
// refetch the offset to keep in sync
- offset_ = lseek(fd_, 0, SEEK_CUR);
+ offset_ = THRIFT_LSEEK(fd_, 0, SEEK_CUR);
int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
uint8_t* zeros = new uint8_t[padding];
memset(zeros, '\0', padding);
boost::scoped_array<uint8_t> array(zeros);
if (-1 == ::write(fd_, zeros, padding)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
hasIOError = true;
continue;
// write the dequeued event to the file
if (outEvent->eventSize_ > 0) {
- if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ if (-1 == ::THRIFT_WRITE(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
hasIOError = true;
continue;
if (flush) {
// sync (force flush) file to disk
-#ifndef _WIN32
- fsync(fd_);
-#endif
+ THRIFT_FSYNC(fd_);
unflushed = 0;
getNextFlushTime(&ts_next_flush);
if (readState_.bufferPtr_ == readState_.bufferLen_) {
// advance the offset pointer
offset_ += readState_.bufferLen_;
- readState_.bufferLen_ = static_cast<uint32_t>(::read(fd_, readBuff_, readBuffSize_));
+ readState_.bufferLen_ = static_cast<uint32_t>(::THRIFT_READ(fd_, readBuff_, readBuffSize_));
// if (readState_.bufferLen_) {
// T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
// }
seekToEnd = true;
chunk = numChunks - 1;
// this is the min offset to process events till
- minEndOffset = lseek(fd_, 0, SEEK_END);
+ minEndOffset = ::THRIFT_LSEEK(fd_, 0, SEEK_END);
}
off_t newOffset = off_t(chunk) * chunkSize_;
- offset_ = lseek(fd_, newOffset, SEEK_SET);
+ offset_ = ::THRIFT_LSEEK(fd_, newOffset, SEEK_SET);
readState_.resetAllValues();
currentEvent_ = NULL;
if (offset_ == -1) {
return 0;
}
- struct stat f_info;
- int rv = fstat(fd_, &f_info);
+ struct THRIFT_STAT f_info;
+ int rv = ::THRIFT_FSTAT(fd_, &f_info);
if (rv < 0) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
throw TTransportException(TTransportException::UNKNOWN,
"TFileTransport::getNumChunks() (fstat)",
errno_copy);
#ifndef _WIN32
mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
- fd_ = ::open(filename_.c_str(), flags, mode);
#else
int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE;
int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND;
- fd_ = ::_open(filename_.c_str(), flags, mode);
#endif
+ fd_ = ::THRIFT_OPEN(filename_.c_str(), flags, mode);
offset_ = 0;
// make sure open call was successful
if(fd_ == -1) {
- int errno_copy = THRIFT_GET_SOCKET_ERROR;
+ int errno_copy = THRIFT_ERRNO;
GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
}