, maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
, eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
, corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
+ , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
, writerThreadId_(0)
, dequeueBuffer_(NULL)
, enqueueBuffer_(NULL)
void TFileTransport::writerThread() {
+ bool hasIOError = false;
+
// open file if it is not open
if(!fd_) {
- openLogFile();
+ try {
+ openLogFile();
+ } catch (...) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy);
+ fd_ = 0;
+ hasIOError = true;
+ }
}
// set the offset to the correct value (EOF)
- try {
- seekToEnd();
- } catch (TException &te) {
+ if (!hasIOError) {
+ try {
+ seekToEnd();
+ // throw away any partial events
+ offset_ += readState_.lastDispatchPtr_;
+ ftruncate(fd_, offset_);
+ readState_.resetAllValues();
+ } catch (...) {
+ int errno_copy = errno;
+ GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy);
+ hasIOError = true;
+ }
}
- // throw away any partial events
- offset_ += readState_.lastDispatchPtr_;
- ftruncate(fd_, offset_);
- readState_.resetAllValues();
-
// Figure out the next time by which a flush must take place
-
struct timespec ts_next_flush;
getNextFlushTime(&ts_next_flush);
uint32_t unflushed = 0;
- while(1) {
+ while (1) {
// this will only be true when the destructor is being invoked
- if(closing_) {
- // empty out both the buffers
+ if (closing_) {
+ if (hasIOError) {
+ pthread_exit(NULL);
+ }
+
+ // Try to empty buffers before exit
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
+ fsync(fd_);
if (-1 == ::close(fd_)) {
int errno_copy = errno;
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
}
- // just be safe and sync to disk
- fsync(fd_);
- fd_ = 0;
pthread_exit(NULL);
- return;
}
}
if (swapEventBuffers(&ts_next_flush)) {
eventInfo* outEvent;
while (NULL != (outEvent = dequeueBuffer_->getNext())) {
- if (!outEvent) {
- T_DEBUG_L(1, "Got an empty event");
- return;
+ // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance,
+ // the output file is unmounted or deleted, then this event is dropped. However, the writer thread
+ // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing
+ // from the end.
+
+ while (hasIOError) {
+ T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_);
+ usleep(writerThreadIOErrorSleepTime_);
+ if (closing_) {
+ pthread_exit(NULL);
+ }
+ if (!fd_) {
+ ::close(fd_);
+ fd_ = 0;
+ }
+ try {
+ openLogFile();
+ seekToEnd();
+ unflushed = 0;
+ hasIOError = false;
+ T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str());
+ } catch (...) {
+ T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str());
+ }
}
// sanity check on event
// If chunking is required, then make sure that msg does not cross chunk boundary
if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
-
// event size must be less than chunk size
- if(outEvent->eventSize_ > chunkSize_) {
- T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
- outEvent->eventSize_, chunkSize_);
+ if (outEvent->eventSize_ > chunkSize_) {
+ T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_);
continue;
}
if (chunk1 != chunk2) {
// refetch the offset to keep in sync
offset_ = lseek(fd_, 0, SEEK_CUR);
- int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
+ int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);
uint8_t zeros[padding];
bzero(zeros, padding);
if (-1 == ::write(fd_, zeros, padding)) {
int errno_copy = errno;
GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);
+ hasIOError = true;
+ continue;
}
unflushed += padding;
offset_ += padding;
if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
int errno_copy = errno;
GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
- throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);
+ hasIOError = true;
+ continue;
}
-
unflushed += outEvent->eventSize_;
offset_ += outEvent->eventSize_;
}
dequeueBuffer_->reset();
}
+ if (hasIOError) {
+ continue;
+ }
+
bool flushTimeElapsed = false;
struct timespec current_time;
clock_gettime(CLOCK_REALTIME, ¤t_time);