blob: 063c9fe4e543bb9c9a660303d8ada0f34892455c [file] [log] [blame]
Aditya Agarwale528c762006-10-11 02:48:43 +00001#include "TBufferedFileWriter.h"
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00002#include "TTransportUtils.h"
Aditya Agarwale528c762006-10-11 02:48:43 +00003
4#include <pthread.h>
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00005 #include <sys/time.h>
Aditya Agarwale528c762006-10-11 02:48:43 +00006#include <fcntl.h>
7#include <errno.h>
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00008#include <unistd.h>
9#include <iostream>
Aditya Agarwale528c762006-10-11 02:48:43 +000010
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000011using namespace std;
Aditya Agarwale528c762006-10-11 02:48:43 +000012
13namespace facebook { namespace thrift { namespace transport {
14
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000015TFileTransport::TFileTransport(string path) {
16 filename_ = path;
17 openLogFile();
Aditya Agarwale528c762006-10-11 02:48:43 +000018
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000019 // set initial values to default
20 readBuffSize_ = DEFAULT_READ_BUFF_SIZE;
21 readTimeout_ = DEFAULT_READ_TIMEOUT_MS;
22 chunkSize_ = DEFAULT_CHUNK_SIZE;
23 eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
24 flushMaxUs_ = DEFAULT_FLUSH_MAX_US;
25 flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES;
26 maxEventSize_ = DEFAULT_MAX_EVENT_SIZE;
27 maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS;
28 eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US;
Aditya Agarwale528c762006-10-11 02:48:43 +000029
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000030 // initialize buffer lazily
31 buffer_ = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +000032
33 // buffer is initially empty
34 isEmpty_ = true;
35 isFull_ = false;
36
37 // both head and tail are initially at 0
38 headPos_ = 0;
39 tailPos_ = 0;
40
Aditya Agarwale528c762006-10-11 02:48:43 +000041 // initialize all the condition vars/mutexes
42 pthread_mutex_init(&mutex_, NULL);
43 pthread_cond_init(&notFull_, NULL);
44 pthread_cond_init(&notEmpty_, NULL);
45 pthread_cond_init(&flushed_, NULL);
46
47 // not closing the file during init
48 closing_ = false;
49
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000050 // create writer thread on demand
51 writerThreadId_ = 0;
52
53 // read related variables
54 // read buff initialized lazily
55 readBuff_ = 0;
56 currentEvent_ = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +000057}
58
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000059void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
Aditya Agarwale528c762006-10-11 02:48:43 +000060 filename_ = filename;
61 offset_ = offset;
62
63 // check if current file is still open
64 if (fd_ > 0) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000065 // TODO: should there be a flush here?
66 fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
Aditya Agarwal0c341a12006-12-09 00:47:03 +000067 if(-1 == ::close(fd_)) {
68 perror("TFileTransport: error in file close");
69 throw TTransportException("TFileTransport: error in file close");
70 }
Aditya Agarwale528c762006-10-11 02:48:43 +000071 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000072
73 if (fd) {
74 fd_ = fd;
75 } else {
76 // open file if the input fd is 0
77 openLogFile();
78 }
Aditya Agarwale528c762006-10-11 02:48:43 +000079}
80
81
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000082TFileTransport::~TFileTransport() {
83 // TODO: Make sure the buffer is actually flushed
84 // flush the buffer if a writer thread is active
85 if (writerThreadId_ > 0) {
86 // flush output buffer
87 flush();
Aditya Agarwale528c762006-10-11 02:48:43 +000088
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000089 // send a signal to write thread to end
90 closing_ = true;
91 pthread_join(writerThreadId_, NULL);
92 }
Aditya Agarwale528c762006-10-11 02:48:43 +000093
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000094 if (buffer_) {
95 delete[] buffer_;
96 }
Aditya Agarwale528c762006-10-11 02:48:43 +000097
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000098 if (readBuff_) {
99 delete readBuff_;
100 }
101
102 if (currentEvent_) {
103 delete currentEvent_;
104 }
105
106 // close logfile
107 if (fd_ > 0) {
Aditya Agarwal0c341a12006-12-09 00:47:03 +0000108 if(-1 == ::close(fd_)) {
109 perror("TFileTransport: error in file close");
110 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000111 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000112}
113
114
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000115void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
Aditya Agarwale528c762006-10-11 02:48:43 +0000116 // make sure that event size is valid
117 if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000118 T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
Aditya Agarwale528c762006-10-11 02:48:43 +0000119 return;
120 }
121
122 if (eventLen == 0) {
Aditya Agarwal35ae1c72006-10-26 03:31:34 +0000123 T_ERROR("cannot enqueue an empty event");
Aditya Agarwale528c762006-10-11 02:48:43 +0000124 return;
125 }
126
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000127 eventInfo* toEnqueue = new eventInfo();
128 toEnqueue->eventBuff_ = (uint8_t *)malloc((sizeof(uint8_t) * eventLen) + 4);
129 // first 4 bytes is the event length
130 memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
131 // actual event contents
132 memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
133 toEnqueue->eventSize_ = eventLen + 4;
Aditya Agarwale528c762006-10-11 02:48:43 +0000134
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000135 // T_DEBUG_L(1, "event size: %u", eventLen);
Aditya Agarwale528c762006-10-11 02:48:43 +0000136 return enqueueEvent(toEnqueue, blockUntilFlush);
137}
138
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000139void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) {
140 // lock mutex
Aditya Agarwale528c762006-10-11 02:48:43 +0000141 pthread_mutex_lock(&mutex_);
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000142
143 // make sure that enqueue buffer is initialized and writer thread is running
144 if (buffer_ == 0) {
145 buffer_ = new eventInfo*[eventBufferSize_];
146 }
147 if (writerThreadId_ == 0) {
148 if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
149 T_ERROR("Error creating write thread");
150 return;
151 }
152 }
153
Aditya Agarwale528c762006-10-11 02:48:43 +0000154 // Can't enqueue while buffer is full
155 while(isFull_) {
156 pthread_cond_wait(&notFull_, &mutex_);
157 }
158
159 // make a copy and enqueue at tail of buffer
160 buffer_[tailPos_] = toEnqueue;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000161 tailPos_ = (tailPos_+1) % eventBufferSize_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000162
163 // mark the buffer as non-empty
164 isEmpty_ = false;
165
166 // circular buffer has wrapped around (and is full)
167 if(tailPos_ == headPos_) {
Aditya Agarwal35ae1c72006-10-26 03:31:34 +0000168 // T_DEBUG("queue is full");
Aditya Agarwale528c762006-10-11 02:48:43 +0000169 isFull_ = true;
170 }
171
172 // signal anybody who's waiting for the buffer to be non-empty
173 pthread_cond_signal(&notEmpty_);
174 if(blockUntilFlush) {
175 pthread_cond_wait(&flushed_, &mutex_);
176 }
177
178 // TODO: don't return until flushed to disk
179 // this really should be a loop where it makes sure it got flushed
180 // because condition variables can get triggered by the os for no reason
181 // it is probably a non-factor for the time being
182 pthread_mutex_unlock(&mutex_);
183
184}
185
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000186eventInfo* TFileTransport::dequeueEvent(long long deadline) {
Aditya Agarwale528c762006-10-11 02:48:43 +0000187 //deadline time struc
188 struct timespec ts;
189 if(deadline) {
190 ts.tv_sec = deadline/(1000*1000);
191 ts.tv_nsec = (deadline%(1000*1000))*1000;
192 }
193
194 // wait for the queue to fill up
195 pthread_mutex_lock(&mutex_);
196 while(isEmpty_) {
197 // do a timed wait on the condition variable
198 if(deadline) {
199 int e = pthread_cond_timedwait(&notEmpty_, &mutex_, &ts);
200 if(e == ETIMEDOUT) {
201 break;
202 }
203 }
204 else {
205 // just wait until the buffer gets an item
206 pthread_cond_wait(&notEmpty_, &mutex_);
207 }
208 }
209
210 string ret;
211 bool doSignal = false;
212
213 // could be empty if we timed out
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000214 eventInfo* retEvent = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +0000215 if(!isEmpty_) {
216 retEvent = buffer_[headPos_];
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000217 headPos_ = (headPos_+1) % eventBufferSize_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000218
219 isFull_ = false;
220 doSignal = true;
221
222 // check if this is the last item in the buffer
223 if(headPos_ == tailPos_) {
224 isEmpty_ = true;
225 }
226 }
227
228 // unlock the mutex and signal if required
229 pthread_mutex_unlock(&mutex_);
230 if(doSignal) {
231 pthread_cond_signal(&notFull_);
232 }
233
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000234 if (!retEvent) {
235 retEvent = new eventInfo();
236 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000237 return retEvent;
238}
239
240
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000241void TFileTransport::writerThread() {
242 // open file if it is not open
243 if(!fd_) {
244 openLogFile();
245 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000246
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000247 // set the offset to the correct value (EOF)
248 offset_ = lseek(fd_, 0, SEEK_END);
249
250 // Figure out the next time by which a flush must take place
251 long long nextFlush = getCurrentTime() + flushMaxUs_;
252 uint32_t unflushed = 0;
253
254 while(1) {
255 // this will only be true when the destructor is being invoked
256 if(closing_) {
257 if(-1 == ::close(fd_)) {
258 perror("TFileTransport: error in close");
Aditya Agarwal0c341a12006-12-09 00:47:03 +0000259 throw TTransportException("TFileTransport: error in file close");
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000260 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000261 fd_ = 0;
262 return;
263 }
264
265 //long long start = now();
266 eventInfo* outEvent = dequeueEvent(nextFlush);
267 if (!outEvent) {
268 T_DEBUG_L(1, "Got an empty event");
269 return;
270 }
271
272 // sanity check on event
273 if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
274 T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
275 delete(outEvent);
276 continue;
277 }
278 //long long diff = now()-start;
279 //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
280
281 // If chunking is required, then make sure that msg does not cross chunk boundary
282 if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
283
284 // event size must be less than chunk size
285 if(outEvent->eventSize_ > chunkSize_) {
286 T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
287 outEvent->eventSize_, chunkSize_);
288 delete(outEvent);
289 continue;
290 }
291
292 long long chunk1 = offset_/chunkSize_;
293 long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
294
295 // if adding this event will cross a chunk boundary, pad the chunk with zeros
296 if(chunk1 != chunk2) {
297 int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
298
299 // sanity check
300 if (padding <= 0) {
301 T_DEBUG("Padding is empty, skipping event");
302 continue;
303 }
304 if (padding > (int32_t)chunkSize_) {
305 T_DEBUG("padding is larger than chunk size, skipping event");
306 continue;
307 }
308 // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
309 uint8_t zeros[padding];
310 bzero(zeros, padding);
311 T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
312 if(-1 == ::write(fd_, zeros, padding)) {
313 perror("TFileTransport: error while padding zeros");
314 throw TTransportException("TFileTransport: error while padding zeros");
315 }
316 unflushed += padding;
317 offset_ += padding;
318 }
319 }
320
321 // write the dequeued event to the file
322 if(outEvent->eventSize_ > 0) {
323 if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
324 perror("TFileTransport: error while writing event");
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000325 throw TTransportException("TFileTransport: error while writing event");
326 }
327
328 unflushed += outEvent->eventSize_;
329 offset_ += outEvent->eventSize_;
330 }
331
332 // couple of cases from which a flush could be triggered
333 if((getCurrentTime() >= nextFlush && unflushed > 0) ||
334 unflushed > flushMaxBytes_ ||
335 (outEvent && (outEvent->eventSize_== 0)) ) {
336 //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
337
338 // sync (force flush) file to disk
339 fsync(fd_);
340 nextFlush = getCurrentTime() + flushMaxUs_;
341 unflushed = 0;
342
343 // notify anybody(thing?) waiting for flush completion
344 pthread_mutex_lock(&mutex_);
345 notFlushed_ = false;
346 pthread_mutex_unlock(&mutex_);
347 pthread_cond_broadcast(&flushed_);
348 }
349 // deallocate dequeued event
350 delete(outEvent);
351 }
352}
353
354void TFileTransport::flush() {
355 eventInfo* flushEvent = new eventInfo();
Aditya Agarwale528c762006-10-11 02:48:43 +0000356 notFlushed_ = true;
357
358 enqueueEvent(flushEvent, false);
359
360 // wait for flush to take place
361 pthread_mutex_lock(&mutex_);
362
363 while(notFlushed_) {
364 pthread_cond_wait(&flushed_, &mutex_);
365 }
366
367 pthread_mutex_unlock(&mutex_);
368}
369
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000370
371uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
372 uint32_t have = 0;
373 uint32_t get = 0;
374
375 while (have < len) {
376 get = read(buf+have, len-have);
377 if (get <= 0) {
378 throw TEOFException();
379 }
380 have += get;
381 }
382
383 return have;
384}
385
386uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
387 // check if there an event is ready to be read
388 if (!currentEvent_) {
389 readEvent();
390 }
391
392 // did not manage to read an event from the file. This could have happened
393 // if the timeout expired or there was some other error
394 if (!currentEvent_) {
395 return 0;
396 }
397
398 // read as much of the current event as possible
399 int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
400 if (remaining <= (int32_t)len) {
401 memcpy(buf,
402 currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
403 remaining);
404 delete(currentEvent_);
405 currentEvent_ = 0;
406 return remaining;
407 }
408
409 // read as much as possible
410 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
411 currentEvent_->eventBuffPos_ += len;
412 return len;
413}
414
415bool TFileTransport::readEvent() {
416 int readTries = 0;
417
418 if (!readBuff_) {
419 readBuff_ = new uint8_t[readBuffSize_];
420 }
421
422 while (1) {
423 // check if there is anything in the read buffer
424 if (readState_.bufferPtr_ == readState_.bufferLen_) {
425 // advance the offset pointer
426 offset_ += readState_.bufferLen_;
427 readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
428 if (readState_.bufferLen_) {
429 T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
430 }
431 readState_.bufferPtr_ = 0;
432 readState_.lastDispatchPtr_ = 0;
433
434 // read error
435 if (readState_.bufferLen_ == -1) {
436 readState_.resetAllValues();
437 perror("TFileTransport: error while reading from file");
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000438 throw TTransportException("TFileTransport: error while reading from file");
439 } else if (readState_.bufferLen_ == 0) { // EOF
440 // wait indefinitely if there is no timeout
441 if (readTimeout_ == -1) {
442 usleep(eofSleepTime_);
443 continue;
444 } else if (readTimeout_ == 0) {
445 // reset state
446 readState_.resetState(0);
447 return false;
448 } else if (readTimeout_ > 0) {
449 // timeout already expired once
450 if (readTries > 0) {
451 readState_.resetState(0);
452 return false;
453 } else {
454 usleep(readTimeout_ * 1000);
455 readTries++;
456 continue;
457 }
458 }
459 }
460 }
461
462 readTries = 0;
463
464 // attempt to read an event from the buffer
465 while(readState_.bufferPtr_ < readState_.bufferLen_) {
466 if (readState_.readingSize_) {
467 if(readState_.eventSizeBuffPos_ == 0) {
468 if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
469 ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
470 // skip one byte towards chunk boundary
471 // T_DEBUG_L(1, "Skipping a byte");
472 readState_.bufferPtr_++;
473 continue;
474 }
475 }
476
477 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
478 readBuff_[readState_.bufferPtr_++];
479 bool eventCorruption = false;
480 if (readState_.eventSizeBuffPos_ == 4) {
481 // 0 length event indicates padding
482 if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
483 T_DEBUG_L(1, "Got padding");
484 readState_.resetState(readState_.lastDispatchPtr_);
485 continue;
486 }
487 // got a valid event
488 readState_.readingSize_ = false;
489 if (readState_.event_) {
490 delete(readState_.event_);
491 }
492 readState_.event_ = new eventInfo();
493 readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
494
495 T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
496
497 // TODO
498 // make sure event is valid, an error is triggered if:
499 // 1. Event size is larger than user-speficied max-event size
500
501 // 2. Event size is larger than chunk size
502
503 // 3. size indicates that event crosses chunk boundary
504
505 }
506
507 if (eventCorruption) {
508 // perform some kickass recovery
509 }
510 } else {
511 if (!readState_.event_->eventBuff_) {
512 readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
513 readState_.event_->eventBuffPos_ = 0;
514 }
515 // take either the entire event or the remaining bytes in the buffer
516 int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
517 readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
518
519 // copy data from read buffer into event buffer
520 memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
521 readBuff_ + readState_.bufferPtr_,
522 reclaimBuffer);
523
524 // increment position ptrs
525 readState_.event_->eventBuffPos_ += reclaimBuffer;
526 readState_.bufferPtr_ += reclaimBuffer;
527
528 // if (reclaimBuffer > 0) {
529 // T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
530 // T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
531 // }
532
533 // check if the event has been read in full
534 if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
535 // set the completed event to the current event
536 currentEvent_ = readState_.event_;
537 currentEvent_->eventBuffPos_ = 0;
538
539 readState_.event_ = 0;
540 readState_.resetState(readState_.bufferPtr_);
541
542 // exit criteria
543 T_DEBUG_L(0, "Finished one event");
544 return true;
545 }
546 }
547 }
548
549
550 }
551}
552
553void TFileTransport::seekToChunk(int32_t chunk) {
554 if (fd_ <= 0) {
555 throw TTransportException("File not open");
556 }
557
558 int32_t lastChunk = getNumChunks();
559
560 // negative indicates reverse seek (from the end)
561 if (chunk < 0) {
562 chunk += lastChunk;
563 }
564
565 // cannot seek past EOF
566 if (chunk > lastChunk) {
567 T_DEBUG("Trying to seek past EOF. Seeking to EOF instead");
568 chunk = lastChunk;
569 }
570
571 uint32_t minEndOffset = 0;
572 if (chunk == lastChunk) {
573 minEndOffset = lseek(fd_, 0, SEEK_END);
574 }
575
576 offset_ = lseek(fd_, chunk * chunkSize_, SEEK_SET);
577 readState_.resetAllValues();
578 if (offset_ == -1) {
579 perror("TFileTransport: lseek error in seekToChunk");
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000580 throw TTransportException("TFileTransport: lseek error in seekToChunk");
581 }
582
583 // seek to EOF if user wanted to go to last chunk
584 uint32_t oldReadTimeout = getReadTimeout();
585 setReadTimeout(0);
586 if (chunk == lastChunk) {
587 // keep on reading unti the last event at point of seekChunk call
588 while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
589 }
590 setReadTimeout(oldReadTimeout);
591
592}
593
594void TFileTransport::seekToEnd() {
595 seekToChunk(getNumChunks());
596}
597
598uint32_t TFileTransport::getNumChunks() {
599 if (fd_ <= 0) {
600 return 0;
601 }
602 struct stat f_info;
603 fstat(fd_, &f_info);
604 return (f_info.st_size)/chunkSize_;
605}
606
607// Utility Functions
608void TFileTransport::openLogFile() {
Aditya Agarwale528c762006-10-11 02:48:43 +0000609 mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000610 fd_ = ::open(filename_.c_str(), O_RDWR | O_CREAT | O_APPEND, mode);
Aditya Agarwale528c762006-10-11 02:48:43 +0000611
612 // make sure open call was successful
613 if(fd_ == -1) {
614 char errorMsg[1024];
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000615 sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
Aditya Agarwale528c762006-10-11 02:48:43 +0000616 perror(errorMsg);
617 throw TTransportException(errorMsg);
618 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000619
620 // opening the file in append mode causes offset_t to be at the end
621 offset_ = lseek(fd_, 0, SEEK_CUR);
622 T_DEBUG_L(1, "initial offset: %lu", offset_);
Aditya Agarwale528c762006-10-11 02:48:43 +0000623}
624
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000625uint32_t TFileTransport::getCurrentTime() {
Aditya Agarwale528c762006-10-11 02:48:43 +0000626 long long ret;
627 struct timeval tv;
628 gettimeofday(&tv, NULL);
629 ret = tv.tv_sec;
630 ret = ret*1000*1000 + tv.tv_usec;
631 return ret;
632}
633
634
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000635TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
636 shared_ptr<TProtocolFactory> protocolFactory,
637 shared_ptr<TFileTransport> inputTransport):
638 processor_(processor), protocolFactory_(protocolFactory),
639 inputTransport_(inputTransport) {
Aditya Agarwale528c762006-10-11 02:48:43 +0000640
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000641 // default the output transport to a null transport (common case)
642 outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
643}
Aditya Agarwale528c762006-10-11 02:48:43 +0000644
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000645TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
646 shared_ptr<TProtocolFactory> protocolFactory,
647 shared_ptr<TFileTransport> inputTransport,
648 shared_ptr<TTransport> outputTransport):
649 processor_(processor), protocolFactory_(protocolFactory),
650 inputTransport_(inputTransport), outputTransport_(outputTransport) {
651};
652
653void TFileProcessor::process(uint32_t numEvents, bool tail) {
654 pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
655 iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
656
657 // set the read timeout to 0 if tailing is required
658 int32_t oldReadTimeout = inputTransport_->getReadTimeout();
659 if (tail) {
660 // save old read timeout so it can be restored
661 inputTransport_->setReadTimeout(0);
662 }
663
664 uint32_t numProcessed = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +0000665 while(1) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000666 // bad form to use exceptions for flow control but there is really
667 // no other way around it
668 try {
669 processor_->process(iop.first, iop.second);
670 numProcessed++;
671 if ( (numEvents > 0) && (numProcessed == numEvents)) {
672 return;
Aditya Agarwale528c762006-10-11 02:48:43 +0000673 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000674 } catch (TEOFException& teof) {
675 if (!tail) {
676 break;
Aditya Agarwale528c762006-10-11 02:48:43 +0000677 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000678 } catch (TException te) {
679 cerr << te.what() << endl;
680 break;
Aditya Agarwale528c762006-10-11 02:48:43 +0000681 }
682 }
683
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000684 // restore old read timeout
685 if (tail) {
686 inputTransport_->setReadTimeout(oldReadTimeout);
687 }
688
Aditya Agarwale528c762006-10-11 02:48:43 +0000689}
690
691}}} // facebook::thrift::transport