From: Aditya Agarwal Date: Thu, 25 Jan 2007 03:27:43 +0000 (+0000) Subject: -- ThriftLogfile is finally done. X-Git-Tag: 0.2.0~1525 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=2c9f2fb0bd356f45a10b2c37743677969c61ef72;p=common%2Fthrift.git -- ThriftLogfile is finally done. Summary: - Everything seems to be working well.. Reviewed By: tbr - slee Test Plan: Tested using search git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664947 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index 4615977f..a9beff93 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -63,7 +63,8 @@ void TFileTransport::resetOutputFile(int fd, string filename, long long offset) // check if current file is still open if (fd_ > 0) { - // TODO: should there be a flush here? + // flush any events in the queue + flush(); fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str()); if(-1 == ::close(fd_)) { perror("TFileTransport: error in file close"); @@ -81,14 +82,19 @@ void TFileTransport::resetOutputFile(int fd, string filename, long long offset) TFileTransport::~TFileTransport() { - // TODO: Make sure the buffer is actually flushed // flush the buffer if a writer thread is active if (writerThreadId_ > 0) { // flush output buffer flush(); - // send a signal to write thread to end + // set state to closing closing_ = true; + + // TODO: make sure event queue is empty + // currently only the write buffer is flushed + // we dont actually wait until the queue is empty. This shouldn't be a big + // deal in the common case because writing is quick + pthread_join(writerThreadId_, NULL); } @@ -116,7 +122,7 @@ TFileTransport::~TFileTransport() { void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { // make sure that event size is valid if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) { - T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_); + T_ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_); return; } @@ -176,7 +182,6 @@ void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) { pthread_cond_wait(&flushed_, &mutex_); } - // TODO: don't return until flushed to disk // this really should be a loop where it makes sure it got flushed // because condition variables can get triggered by the os for no reason // it is probably a non-factor for the time being @@ -304,7 +309,7 @@ void TFileTransport::writerThread() { } if (padding > (int32_t)chunkSize_) { T_DEBUG("padding is larger than chunk size, skipping event"); - continue; + continue; } uint8_t zeros[padding]; bzero(zeros, padding); @@ -442,10 +447,10 @@ bool TFileTransport::readEvent() { throw TTransportException("TFileTransport: error while reading from file"); } else if (readState_.bufferLen_ == 0) { // EOF // wait indefinitely if there is no timeout - if (readTimeout_ == -1) { + if (readTimeout_ == TAIL_READ_TIMEOUT) { usleep(eofSleepTime_); continue; - } else if (readTimeout_ == 0) { + } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { // reset state readState_.resetState(0); return false; @@ -592,7 +597,7 @@ void TFileTransport::performRecovery() { // point and punt on the error readState_.resetState(readState_.lastDispatchPtr_); char errorMsg[1024]; - sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", + sprintf(errorMsg, "TFileTransport: log file corrupted at offset:%lu", offset_ + readState_.lastDispatchPtr_); perror(errorMsg); throw TTransportException(errorMsg); @@ -645,7 +650,7 @@ void TFileTransport::seekToChunk(int32_t chunk) { // seek to EOF if user wanted to go to last chunk if (seekToEnd) { uint32_t oldReadTimeout = getReadTimeout(); - setReadTimeout(0); + setReadTimeout(NO_TAIL_READ_TIMEOUT); // keep on reading unti the last event at point of seekChunk call while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {}; setReadTimeout(oldReadTimeout); @@ -745,7 +750,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) { int32_t oldReadTimeout = inputTransport_->getReadTimeout(); if (tail) { // save old read timeout so it can be restored - inputTransport_->setReadTimeout(0); + inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT); } uint32_t numProcessed = 0;