}
// set the offset to the correct value (EOF)
- seekToEnd();
+ try {
+ seekToEnd();
+ } catch (TException &te) {
+ }
+
+ // throw away any partial events
+ offset_ += readState_.lastDispatchPtr_;
+ ftruncate(fd_, offset_);
+ readState_.resetAllValues();
// Figure out the next time by which a flush must take place
if (chunk1 != chunk2) {
// refetch the offset to keep in sync
offset_ = lseek(fd_, 0, SEEK_CUR);
- int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
+ int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
- // sanity check
- if (padding <= 0) {
- T_DEBUG("Padding is empty, skipping event");
- continue;
- }
- if (padding > (int32_t)chunkSize_) {
- T_DEBUG("padding is larger than chunk size, skipping event");
- continue;
- }
uint8_t zeros[padding];
bzero(zeros, padding);
- //T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
- //padding, offset_, chunk2);
if (-1 == ::write(fd_, zeros, padding)) {
GlobalOutput("TFileTransport: error while padding zeros");
throw TTransportException("TFileTransport: error while padding zeros");
uint32_t oldReadTimeout = getReadTimeout();
setReadTimeout(NO_TAIL_READ_TIMEOUT);
// keep on reading unti the last event at point of seekChunk call
- while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
+ while (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
setReadTimeout(oldReadTimeout);
}