T_DEBUG("padding is larger than chunk size, skipping event");
continue;
}
- // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
uint8_t zeros[padding];
bzero(zeros, padding);
- T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
+ // T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
+ // padding, offset_, chunk2);
if(-1 == ::write(fd_, zeros, padding)) {
perror("TFileTransport: error while padding zeros");
throw TTransportException("TFileTransport: error while padding zeros");
// read as much of the current event as possible
int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
if (remaining <= (int32_t)len) {
- memcpy(buf,
- currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
- remaining);
+ // copy over anything thats remaining
+ if (remaining > 0) {
+ memcpy(buf,
+ currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
+ remaining);
+ }
delete(currentEvent_);
currentEvent_ = 0;
return remaining;
// advance the offset pointer
offset_ += readState_.bufferLen_;
readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
- if (readState_.bufferLen_) {
- T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
- }
+ // if (readState_.bufferLen_) {
+ // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
+ // }
readState_.bufferPtr_ = 0;
readState_.lastDispatchPtr_ = 0;
-
+
// read error
if (readState_.bufferLen_ == -1) {
readState_.resetAllValues();
if (readState_.eventSizeBuffPos_ == 4) {
// 0 length event indicates padding
if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
- T_DEBUG_L(1, "Got padding");
+ // T_DEBUG_L(1, "Got padding");
readState_.resetState(readState_.lastDispatchPtr_);
continue;
}
readState_.event_ = new eventInfo();
readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
- T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
-
// TODO
// make sure event is valid, an error is triggered if:
// 1. Event size is larger than user-speficied max-event size
readState_.resetState(readState_.bufferPtr_);
// exit criteria
- T_DEBUG_L(0, "Finished one event");
+ // T_DEBUG_L(0, "Finished one event");
return true;
}
}
return (f_info.st_size)/chunkSize_;
}
+uint32_t TFileTransport::getCurChunk() {
+ return offset_/chunkSize_;
+}
+
// Utility Functions
void TFileTransport::openLogFile() {
mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
// opening the file in append mode causes offset_t to be at the end
offset_ = lseek(fd_, 0, SEEK_CUR);
- T_DEBUG_L(1, "initial offset: %lu", offset_);
}
uint32_t TFileTransport::getCurrentTime() {
}
+void TFileProcessor::processChunk() {
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
+
+ uint32_t curChunk = inputTransport_->getCurChunk();
+
+ while(1) {
+ // bad form to use exceptions for flow control but there is really
+ // no other way around it
+ try {
+ processor_->process(iop.first, iop.second);
+ if (curChunk != inputTransport_->getCurChunk()) {
+ break;
+ }
+ } catch (TEOFException& teof) {
+ break;
+ } catch (TException te) {
+ cerr << te.what() << endl;
+ break;
+ }
+ }
+}
+
}}} // facebook::thrift::transport