From 68f08ee5f5986138105bb3906b086cd15df02026 Mon Sep 17 00:00:00 2001 From: Aditya Agarwal Date: Wed, 24 Jan 2007 23:39:10 +0000 Subject: [PATCH] -- Error recovery code for thrift logfile Summary: - perform some basic corruption checks: 1) Event larger than chunk 2) Event larger than specified max 3) Event crossing chunk boundary etc. - If error encountered, then try to perform some recovery Reviewed By: Slee Test Plan: Going to test now...need to check in because of compile issues Notes: - These checks take care of the case when there is a dirty read from the filesystem (which we have encountered with the netapps). The recovery involves trying to perform the read again from ths FS and if that fails skipping the chunk altogether. Keep in mind that this might only be useful for idempotent systems (e.g. search redolog). git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664943 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/transport/TFileTransport.cpp | 130 +++++++++++++++++------ lib/cpp/src/transport/TFileTransport.h | 24 ++++- 2 files changed, 120 insertions(+), 34 deletions(-) diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index e4510ecf..4615977f 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -424,7 +424,7 @@ bool TFileTransport::readEvent() { } while (1) { - // check if there is anything in the read buffer + // read from the file if read buffer is exhausted if (readState_.bufferPtr_ == readState_.bufferLen_) { // advance the offset pointer offset_ += readState_.bufferLen_; @@ -480,7 +480,6 @@ bool TFileTransport::readEvent() { readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = readBuff_[readState_.bufferPtr_++]; - bool eventCorruption = false; if (readState_.eventSizeBuffPos_ == 4) { // 0 length event indicates padding if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) { @@ -496,18 +495,12 @@ bool TFileTransport::readEvent() { readState_.event_ = new eventInfo(); readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_)); - // TODO - // make sure event is valid, an error is triggered if: - // 1. Event size is larger than user-speficied max-event size - - // 2. Event size is larger than chunk size - - // 3. size indicates that event crosses chunk boundary - - } - - if (eventCorruption) { - // perform some kickass recovery + // check if the event is corrupted and perform recovery if required + if (isEventCorrupted()) { + performRecovery(); + // start from the top + break; + } } } else { if (!readState_.event_->eventBuff_) { @@ -527,11 +520,6 @@ bool TFileTransport::readEvent() { readState_.event_->eventBuffPos_ += reclaimBuffer; readState_.bufferPtr_ += reclaimBuffer; - // if (reclaimBuffer > 0) { - // T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_); - // T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_); - // } - // check if the event has been read in full if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { // set the completed event to the current event @@ -542,16 +530,77 @@ bool TFileTransport::readEvent() { readState_.resetState(readState_.bufferPtr_); // exit criteria - // T_DEBUG_L(0, "Finished one event"); return true; } } } - - + } } +bool TFileTransport::isEventCorrupted() { + // an error is triggered if: + if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) { + // 1. Event size is larger than user-speficied max-event size + T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)", + readState_.event_->eventSize_, maxEventSize_); + return true; + } else if (readState_.event_->eventSize_ > chunkSize_) { + // 2. Event size is larger than chunk size + T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)", + readState_.event_->eventSize_, chunkSize_); + return true; + } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) != + ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) { + // 3. size indicates that event crosses chunk boundary + T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld", + readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4); + return true; + } + + return false; +} + +void TFileTransport::performRecovery() { + // perform some kickass recovery + uint32_t curChunk = getCurChunk(); + if (lastBadChunk == curChunk) { + numCorruptedEventsinChunk++; + } else { + lastBadChunk = curChunk; + numCorruptedEventsinChunk = 1; + } + + if (numCorruptedEventsinChunk < maxCorruptedEvents_) { + // maybe there was an error in reading the file from disk + // seek to the beginning of chunk and try again + seekToChunk(curChunk); + } else { + + // just skip ahead to the next chunk if we not already at the last chunk + if (curChunk != (getNumChunks() - 1)) { + seekToChunk(curChunk + 1); + } else if (readTimeout_ == TAIL_READ_TIMEOUT) { + // if tailing the file, wait until there is enough data to start + // the next chunk + while(curChunk == (getNumChunks() - 1)) { + usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US); + } + seekToChunk(curChunk + 1); + } else { + // pretty hosed at this stage, rewind the file back to the last successful + // point and punt on the error + readState_.resetState(readState_.lastDispatchPtr_); + char errorMsg[1024]; + sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", + offset_ + readState_.lastDispatchPtr_); + perror(errorMsg); + throw TTransportException(errorMsg); + } + } + +} + void TFileTransport::seekToChunk(int32_t chunk) { if (fd_ <= 0) { throw TTransportException("File not open"); @@ -656,7 +705,22 @@ uint32_t TFileTransport::getCurrentTime() { TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, shared_ptr inputTransport): - processor_(processor), protocolFactory_(protocolFactory), + processor_(processor), + inputProtocolFactory_(protocolFactory), + outputProtocolFactory_(protocolFactory), + inputTransport_(inputTransport) { + + // default the output transport to a null transport (common case) + outputTransport_ = shared_ptr(new TNullTransport()); +} + +TFileProcessor::TFileProcessor(shared_ptr processor, + shared_ptr inputProtocolFactory, + shared_ptr outputProtocolFactory, + shared_ptr inputTransport): + processor_(processor), + inputProtocolFactory_(inputProtocolFactory), + outputProtocolFactory_(outputProtocolFactory), inputTransport_(inputTransport) { // default the output transport to a null transport (common case) @@ -667,13 +731,15 @@ TFileProcessor::TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, shared_ptr inputTransport, shared_ptr outputTransport): - processor_(processor), protocolFactory_(protocolFactory), - inputTransport_(inputTransport), outputTransport_(outputTransport) { -}; + processor_(processor), + inputProtocolFactory_(protocolFactory), + outputProtocolFactory_(protocolFactory), + inputTransport_(inputTransport), + outputTransport_(outputTransport) {}; void TFileProcessor::process(uint32_t numEvents, bool tail) { - pair,shared_ptr > iop; - iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_); + shared_ptr inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); + shared_ptr outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); // set the read timeout to 0 if tailing is required int32_t oldReadTimeout = inputTransport_->getReadTimeout(); @@ -687,7 +753,7 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) { // bad form to use exceptions for flow control but there is really // no other way around it try { - processor_->process(iop.first, iop.second); + processor_->process(inputProtocol, outputProtocol); numProcessed++; if ( (numEvents > 0) && (numProcessed == numEvents)) { return; @@ -710,8 +776,8 @@ void TFileProcessor::process(uint32_t numEvents, bool tail) { } void TFileProcessor::processChunk() { - pair,shared_ptr > iop; - iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_); + shared_ptr inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); + shared_ptr outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); uint32_t curChunk = inputTransport_->getCurChunk(); @@ -719,7 +785,7 @@ void TFileProcessor::processChunk() { // bad form to use exceptions for flow control but there is really // no other way around it try { - processor_->process(iop.first, iop.second); + processor_->process(inputProtocol, outputProtocol); if (curChunk != inputTransport_->getCurChunk()) { break; } diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index fb4a1c25..a58be2c2 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -119,6 +119,8 @@ class TFileTransport : public TTransport { return readBuffSize_; } + static const int32_t TAIL_READ_TIMEOUT = -1; + static const int32_t NO_TAIL_READ_TIMEOUT = 0; void setReadTimeout(int32_t readTimeout) { readTimeout_ = readTimeout; } @@ -205,6 +207,10 @@ class TFileTransport : public TTransport { // helper functions for reading from a file bool readEvent(); + // event corruption-related functions + bool isEventCorrupted(); + void performRecovery(); + // Utility functions void openLogFile(); uint32_t getCurrentTime(); @@ -252,6 +258,10 @@ class TFileTransport : public TTransport { // sleep duration when EOF is hit uint32_t eofSleepTime_; static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; + + // sleep duration when a corrupted event is encountered + uint32_t corruptedEventSleepTime_; + static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000; // writer thread id pthread_t writerThreadId_; @@ -279,6 +289,10 @@ class TFileTransport : public TTransport { // Offset within the file off_t offset_; + // event corruption information + uint32_t lastBadChunk; + uint32_t numCorruptedEventsinChunk; + }; // Exception thrown when EOF is hit @@ -303,6 +317,11 @@ class TFileProcessor { shared_ptr protocolFactory, shared_ptr inputTransport); + TFileProcessor(shared_ptr processor, + shared_ptr inputProtocolFactory, + shared_ptr outputProtocolFactory, + shared_ptr inputTransport); + /** * Constructor * @@ -314,7 +333,7 @@ class TFileProcessor { TFileProcessor(shared_ptr processor, shared_ptr protocolFactory, shared_ptr inputTransport, - shared_ptr outputTransport); + shared_ptr outputTransport); /** * processes events from the file @@ -332,7 +351,8 @@ class TFileProcessor { private: shared_ptr processor_; - shared_ptr protocolFactory_; + shared_ptr inputProtocolFactory_; + shared_ptr outputProtocolFactory_; shared_ptr inputTransport_; shared_ptr outputTransport_; }; -- 2.17.1