}
while (1) {
- // check if there is anything in the read buffer
+ // read from the file if read buffer is exhausted
if (readState_.bufferPtr_ == readState_.bufferLen_) {
// advance the offset pointer
offset_ += readState_.bufferLen_;
readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
readBuff_[readState_.bufferPtr_++];
- bool eventCorruption = false;
if (readState_.eventSizeBuffPos_ == 4) {
// 0 length event indicates padding
if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
readState_.event_ = new eventInfo();
readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
- // TODO
- // make sure event is valid, an error is triggered if:
- // 1. Event size is larger than user-speficied max-event size
-
- // 2. Event size is larger than chunk size
-
- // 3. size indicates that event crosses chunk boundary
-
- }
-
- if (eventCorruption) {
- // perform some kickass recovery
+ // check if the event is corrupted and perform recovery if required
+ if (isEventCorrupted()) {
+ performRecovery();
+ // start from the top
+ break;
+ }
}
} else {
if (!readState_.event_->eventBuff_) {
readState_.event_->eventBuffPos_ += reclaimBuffer;
readState_.bufferPtr_ += reclaimBuffer;
- // if (reclaimBuffer > 0) {
- // T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
- // T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
- // }
-
// check if the event has been read in full
if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
// set the completed event to the current event
readState_.resetState(readState_.bufferPtr_);
// exit criteria
- // T_DEBUG_L(0, "Finished one event");
return true;
}
}
}
-
-
+
}
}
+bool TFileTransport::isEventCorrupted() {
+ // an error is triggered if:
+ if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
+ // 1. Event size is larger than user-speficied max-event size
+ T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
+ readState_.event_->eventSize_, maxEventSize_);
+ return true;
+ } else if (readState_.event_->eventSize_ > chunkSize_) {
+ // 2. Event size is larger than chunk size
+ T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
+ readState_.event_->eventSize_, chunkSize_);
+ return true;
+ } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
+ ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
+ // 3. size indicates that event crosses chunk boundary
+ T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",
+ readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
+ return true;
+ }
+
+ return false;
+}
+
+void TFileTransport::performRecovery() {
+ // perform some kickass recovery
+ uint32_t curChunk = getCurChunk();
+ if (lastBadChunk == curChunk) {
+ numCorruptedEventsinChunk++;
+ } else {
+ lastBadChunk = curChunk;
+ numCorruptedEventsinChunk = 1;
+ }
+
+ if (numCorruptedEventsinChunk < maxCorruptedEvents_) {
+ // maybe there was an error in reading the file from disk
+ // seek to the beginning of chunk and try again
+ seekToChunk(curChunk);
+ } else {
+
+ // just skip ahead to the next chunk if we not already at the last chunk
+ if (curChunk != (getNumChunks() - 1)) {
+ seekToChunk(curChunk + 1);
+ } else if (readTimeout_ == TAIL_READ_TIMEOUT) {
+ // if tailing the file, wait until there is enough data to start
+ // the next chunk
+ while(curChunk == (getNumChunks() - 1)) {
+ usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
+ }
+ seekToChunk(curChunk + 1);
+ } else {
+ // pretty hosed at this stage, rewind the file back to the last successful
+ // point and punt on the error
+ readState_.resetState(readState_.lastDispatchPtr_);
+ char errorMsg[1024];
+ sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
+ offset_ + readState_.lastDispatchPtr_);
+ perror(errorMsg);
+ throw TTransportException(errorMsg);
+ }
+ }
+
+}
+
void TFileTransport::seekToChunk(int32_t chunk) {
if (fd_ <= 0) {
throw TTransportException("File not open");
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport):
- processor_(processor), protocolFactory_(protocolFactory),
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport) {
+
+ // default the output transport to a null transport (common case)
+ outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
+}
+
+TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<TFileTransport> inputTransport):
+ processor_(processor),
+ inputProtocolFactory_(inputProtocolFactory),
+ outputProtocolFactory_(outputProtocolFactory),
inputTransport_(inputTransport) {
// default the output transport to a null transport (common case)
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport,
shared_ptr<TTransport> outputTransport):
- processor_(processor), protocolFactory_(protocolFactory),
- inputTransport_(inputTransport), outputTransport_(outputTransport) {
-};
+ processor_(processor),
+ inputProtocolFactory_(protocolFactory),
+ outputProtocolFactory_(protocolFactory),
+ inputTransport_(inputTransport),
+ outputTransport_(outputTransport) {};
void TFileProcessor::process(uint32_t numEvents, bool tail) {
- pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
- iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+ shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+ shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
// set the read timeout to 0 if tailing is required
int32_t oldReadTimeout = inputTransport_->getReadTimeout();
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
- processor_->process(iop.first, iop.second);
+ processor_->process(inputProtocol, outputProtocol);
numProcessed++;
if ( (numEvents > 0) && (numProcessed == numEvents)) {
return;
}
void TFileProcessor::processChunk() {
- pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
- iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+ shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
+ shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
uint32_t curChunk = inputTransport_->getCurChunk();
// bad form to use exceptions for flow control but there is really
// no other way around it
try {
- processor_->process(iop.first, iop.second);
+ processor_->process(inputProtocol, outputProtocol);
if (curChunk != inputTransport_->getCurChunk()) {
break;
}
return readBuffSize_;
}
+ static const int32_t TAIL_READ_TIMEOUT = -1;
+ static const int32_t NO_TAIL_READ_TIMEOUT = 0;
void setReadTimeout(int32_t readTimeout) {
readTimeout_ = readTimeout;
}
// helper functions for reading from a file
bool readEvent();
+ // event corruption-related functions
+ bool isEventCorrupted();
+ void performRecovery();
+
// Utility functions
void openLogFile();
uint32_t getCurrentTime();
// sleep duration when EOF is hit
uint32_t eofSleepTime_;
static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
+
+ // sleep duration when a corrupted event is encountered
+ uint32_t corruptedEventSleepTime_;
+ static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
// writer thread id
pthread_t writerThreadId_;
// Offset within the file
off_t offset_;
+ // event corruption information
+ uint32_t lastBadChunk;
+ uint32_t numCorruptedEventsinChunk;
+
};
// Exception thrown when EOF is hit
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport);
+ TFileProcessor(shared_ptr<TProcessor> processor,
+ shared_ptr<TProtocolFactory> inputProtocolFactory,
+ shared_ptr<TProtocolFactory> outputProtocolFactory,
+ shared_ptr<TFileTransport> inputTransport);
+
/**
* Constructor
*
TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport,
- shared_ptr<TTransport> outputTransport);
+ shared_ptr<TTransport> outputTransport);
/**
* processes events from the file
private:
shared_ptr<TProcessor> processor_;
- shared_ptr<TProtocolFactory> protocolFactory_;
+ shared_ptr<TProtocolFactory> inputProtocolFactory_;
+ shared_ptr<TProtocolFactory> outputProtocolFactory_;
shared_ptr<TFileTransport> inputTransport_;
shared_ptr<TTransport> outputTransport_;
};