// check if current file is still open
if (fd_ > 0) {
- // TODO: should there be a flush here?
+ // flush any events in the queue
+ flush();
fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
if(-1 == ::close(fd_)) {
perror("TFileTransport: error in file close");
TFileTransport::~TFileTransport() {
- // TODO: Make sure the buffer is actually flushed
// flush the buffer if a writer thread is active
if (writerThreadId_ > 0) {
// flush output buffer
flush();
- // send a signal to write thread to end
+ // set state to closing
closing_ = true;
+
+ // TODO: make sure event queue is empty
+ // currently only the write buffer is flushed
+ // we dont actually wait until the queue is empty. This shouldn't be a big
+ // deal in the common case because writing is quick
+
pthread_join(writerThreadId_, NULL);
}
void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
// make sure that event size is valid
if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
- T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
+ T_ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
return;
}
pthread_cond_wait(&flushed_, &mutex_);
}
- // TODO: don't return until flushed to disk
// this really should be a loop where it makes sure it got flushed
// because condition variables can get triggered by the os for no reason
// it is probably a non-factor for the time being
}
if (padding > (int32_t)chunkSize_) {
T_DEBUG("padding is larger than chunk size, skipping event");
- continue;
+ continue;
}
uint8_t zeros[padding];
bzero(zeros, padding);
throw TTransportException("TFileTransport: error while reading from file");
} else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
- if (readTimeout_ == -1) {
+ if (readTimeout_ == TAIL_READ_TIMEOUT) {
usleep(eofSleepTime_);
continue;
- } else if (readTimeout_ == 0) {
+ } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
// reset state
readState_.resetState(0);
return false;
// point and punt on the error
readState_.resetState(readState_.lastDispatchPtr_);
char errorMsg[1024];
- sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+ sprintf(errorMsg, "TFileTransport: log file corrupted at offset:%lu",
offset_ + readState_.lastDispatchPtr_);
perror(errorMsg);
throw TTransportException(errorMsg);
// seek to EOF if user wanted to go to last chunk
if (seekToEnd) {
uint32_t oldReadTimeout = getReadTimeout();
- setReadTimeout(0);
+ setReadTimeout(NO_TAIL_READ_TIMEOUT);
// keep on reading unti the last event at point of seekChunk call
while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
setReadTimeout(oldReadTimeout);
int32_t oldReadTimeout = inputTransport_->getReadTimeout();
if (tail) {
// save old read timeout so it can be restored
- inputTransport_->setReadTimeout(0);
+ inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
}
uint32_t numProcessed = 0;