blob: 39e207451d127f665aad94efe5038803fe880244 [file] [log] [blame]
Aditya Agarwale528c762006-10-11 02:48:43 +00001#include "TBufferedFileWriter.h"
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00002#include "TTransportUtils.h"
Aditya Agarwale528c762006-10-11 02:48:43 +00003
4#include <pthread.h>
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00005 #include <sys/time.h>
Aditya Agarwale528c762006-10-11 02:48:43 +00006#include <fcntl.h>
7#include <errno.h>
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00008#include <unistd.h>
9#include <iostream>
Aditya Agarwale528c762006-10-11 02:48:43 +000010
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000011using namespace std;
Aditya Agarwale528c762006-10-11 02:48:43 +000012
13namespace facebook { namespace thrift { namespace transport {
14
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000015TFileTransport::TFileTransport(string path) {
16 filename_ = path;
17 openLogFile();
Aditya Agarwale528c762006-10-11 02:48:43 +000018
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000019 // set initial values to default
20 readBuffSize_ = DEFAULT_READ_BUFF_SIZE;
21 readTimeout_ = DEFAULT_READ_TIMEOUT_MS;
22 chunkSize_ = DEFAULT_CHUNK_SIZE;
23 eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
24 flushMaxUs_ = DEFAULT_FLUSH_MAX_US;
25 flushMaxBytes_ = DEFAULT_FLUSH_MAX_BYTES;
26 maxEventSize_ = DEFAULT_MAX_EVENT_SIZE;
27 maxCorruptedEvents_ = DEFAULT_MAX_CORRUPTED_EVENTS;
28 eofSleepTime_ = DEFAULT_EOF_SLEEP_TIME_US;
Aditya Agarwale528c762006-10-11 02:48:43 +000029
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000030 // initialize buffer lazily
31 buffer_ = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +000032
33 // buffer is initially empty
34 isEmpty_ = true;
35 isFull_ = false;
36
37 // both head and tail are initially at 0
38 headPos_ = 0;
39 tailPos_ = 0;
40
Aditya Agarwale528c762006-10-11 02:48:43 +000041 // initialize all the condition vars/mutexes
42 pthread_mutex_init(&mutex_, NULL);
43 pthread_cond_init(&notFull_, NULL);
44 pthread_cond_init(&notEmpty_, NULL);
45 pthread_cond_init(&flushed_, NULL);
46
47 // not closing the file during init
48 closing_ = false;
49
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000050 // create writer thread on demand
51 writerThreadId_ = 0;
52
53 // read related variables
54 // read buff initialized lazily
55 readBuff_ = 0;
56 currentEvent_ = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +000057}
58
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000059void TFileTransport::resetOutputFile(int fd, string filename, long long offset) {
Aditya Agarwale528c762006-10-11 02:48:43 +000060 filename_ = filename;
61 offset_ = offset;
62
63 // check if current file is still open
64 if (fd_ > 0) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000065 // TODO: should there be a flush here?
66 fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
Aditya Agarwale528c762006-10-11 02:48:43 +000067 ::close(fd_);
68 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000069
70 if (fd) {
71 fd_ = fd;
72 } else {
73 // open file if the input fd is 0
74 openLogFile();
75 }
Aditya Agarwale528c762006-10-11 02:48:43 +000076}
77
78
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000079TFileTransport::~TFileTransport() {
80 // TODO: Make sure the buffer is actually flushed
81 // flush the buffer if a writer thread is active
82 if (writerThreadId_ > 0) {
83 // flush output buffer
84 flush();
Aditya Agarwale528c762006-10-11 02:48:43 +000085
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000086 // send a signal to write thread to end
87 closing_ = true;
88 pthread_join(writerThreadId_, NULL);
89 }
Aditya Agarwale528c762006-10-11 02:48:43 +000090
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000091 if (buffer_) {
92 delete[] buffer_;
93 }
Aditya Agarwale528c762006-10-11 02:48:43 +000094
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000095 if (readBuff_) {
96 delete readBuff_;
97 }
98
99 if (currentEvent_) {
100 delete currentEvent_;
101 }
102
103 // close logfile
104 if (fd_ > 0) {
105 ::close(fd_);
106 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000107}
108
109
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000110void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
Aditya Agarwale528c762006-10-11 02:48:43 +0000111 // make sure that event size is valid
112 if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000113 T_DEBUG("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
Aditya Agarwale528c762006-10-11 02:48:43 +0000114 return;
115 }
116
117 if (eventLen == 0) {
Aditya Agarwal35ae1c72006-10-26 03:31:34 +0000118 T_ERROR("cannot enqueue an empty event");
Aditya Agarwale528c762006-10-11 02:48:43 +0000119 return;
120 }
121
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000122 eventInfo* toEnqueue = new eventInfo();
123 toEnqueue->eventBuff_ = (uint8_t *)malloc((sizeof(uint8_t) * eventLen) + 4);
124 // first 4 bytes is the event length
125 memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
126 // actual event contents
127 memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
128 toEnqueue->eventSize_ = eventLen + 4;
Aditya Agarwale528c762006-10-11 02:48:43 +0000129
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000130 // T_DEBUG_L(1, "event size: %u", eventLen);
Aditya Agarwale528c762006-10-11 02:48:43 +0000131 return enqueueEvent(toEnqueue, blockUntilFlush);
132}
133
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000134void TFileTransport::enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush) {
135 // lock mutex
Aditya Agarwale528c762006-10-11 02:48:43 +0000136 pthread_mutex_lock(&mutex_);
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000137
138 // make sure that enqueue buffer is initialized and writer thread is running
139 if (buffer_ == 0) {
140 buffer_ = new eventInfo*[eventBufferSize_];
141 }
142 if (writerThreadId_ == 0) {
143 if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
144 T_ERROR("Error creating write thread");
145 return;
146 }
147 }
148
Aditya Agarwale528c762006-10-11 02:48:43 +0000149 // Can't enqueue while buffer is full
150 while(isFull_) {
151 pthread_cond_wait(&notFull_, &mutex_);
152 }
153
154 // make a copy and enqueue at tail of buffer
155 buffer_[tailPos_] = toEnqueue;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000156 tailPos_ = (tailPos_+1) % eventBufferSize_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000157
158 // mark the buffer as non-empty
159 isEmpty_ = false;
160
161 // circular buffer has wrapped around (and is full)
162 if(tailPos_ == headPos_) {
Aditya Agarwal35ae1c72006-10-26 03:31:34 +0000163 // T_DEBUG("queue is full");
Aditya Agarwale528c762006-10-11 02:48:43 +0000164 isFull_ = true;
165 }
166
167 // signal anybody who's waiting for the buffer to be non-empty
168 pthread_cond_signal(&notEmpty_);
169 if(blockUntilFlush) {
170 pthread_cond_wait(&flushed_, &mutex_);
171 }
172
173 // TODO: don't return until flushed to disk
174 // this really should be a loop where it makes sure it got flushed
175 // because condition variables can get triggered by the os for no reason
176 // it is probably a non-factor for the time being
177 pthread_mutex_unlock(&mutex_);
178
179}
180
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000181eventInfo* TFileTransport::dequeueEvent(long long deadline) {
Aditya Agarwale528c762006-10-11 02:48:43 +0000182 //deadline time struc
183 struct timespec ts;
184 if(deadline) {
185 ts.tv_sec = deadline/(1000*1000);
186 ts.tv_nsec = (deadline%(1000*1000))*1000;
187 }
188
189 // wait for the queue to fill up
190 pthread_mutex_lock(&mutex_);
191 while(isEmpty_) {
192 // do a timed wait on the condition variable
193 if(deadline) {
194 int e = pthread_cond_timedwait(&notEmpty_, &mutex_, &ts);
195 if(e == ETIMEDOUT) {
196 break;
197 }
198 }
199 else {
200 // just wait until the buffer gets an item
201 pthread_cond_wait(&notEmpty_, &mutex_);
202 }
203 }
204
205 string ret;
206 bool doSignal = false;
207
208 // could be empty if we timed out
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000209 eventInfo* retEvent = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +0000210 if(!isEmpty_) {
211 retEvent = buffer_[headPos_];
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000212 headPos_ = (headPos_+1) % eventBufferSize_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000213
214 isFull_ = false;
215 doSignal = true;
216
217 // check if this is the last item in the buffer
218 if(headPos_ == tailPos_) {
219 isEmpty_ = true;
220 }
221 }
222
223 // unlock the mutex and signal if required
224 pthread_mutex_unlock(&mutex_);
225 if(doSignal) {
226 pthread_cond_signal(&notFull_);
227 }
228
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000229 if (!retEvent) {
230 retEvent = new eventInfo();
231 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000232 return retEvent;
233}
234
235
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000236void TFileTransport::writerThread() {
237 // open file if it is not open
238 if(!fd_) {
239 openLogFile();
240 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000241
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000242 // set the offset to the correct value (EOF)
243 offset_ = lseek(fd_, 0, SEEK_END);
244
245 // Figure out the next time by which a flush must take place
246 long long nextFlush = getCurrentTime() + flushMaxUs_;
247 uint32_t unflushed = 0;
248
249 while(1) {
250 // this will only be true when the destructor is being invoked
251 if(closing_) {
252 if(-1 == ::close(fd_)) {
253 perror("TFileTransport: error in close");
254 }
255 throw TTransportException("error in file close");
256 fd_ = 0;
257 return;
258 }
259
260 //long long start = now();
261 eventInfo* outEvent = dequeueEvent(nextFlush);
262 if (!outEvent) {
263 T_DEBUG_L(1, "Got an empty event");
264 return;
265 }
266
267 // sanity check on event
268 if ( (maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
269 T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
270 delete(outEvent);
271 continue;
272 }
273 //long long diff = now()-start;
274 //T_DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
275
276 // If chunking is required, then make sure that msg does not cross chunk boundary
277 if( (outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
278
279 // event size must be less than chunk size
280 if(outEvent->eventSize_ > chunkSize_) {
281 T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
282 outEvent->eventSize_, chunkSize_);
283 delete(outEvent);
284 continue;
285 }
286
287 long long chunk1 = offset_/chunkSize_;
288 long long chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
289
290 // if adding this event will cross a chunk boundary, pad the chunk with zeros
291 if(chunk1 != chunk2) {
292 int32_t padding = (int32_t)(chunk2*chunkSize_ - offset_);
293
294 // sanity check
295 if (padding <= 0) {
296 T_DEBUG("Padding is empty, skipping event");
297 continue;
298 }
299 if (padding > (int32_t)chunkSize_) {
300 T_DEBUG("padding is larger than chunk size, skipping event");
301 continue;
302 }
303 // T_DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
304 uint8_t zeros[padding];
305 bzero(zeros, padding);
306 T_DEBUG_L(1, "Adding padding of %u bytes at %lu", padding, offset_);
307 if(-1 == ::write(fd_, zeros, padding)) {
308 perror("TFileTransport: error while padding zeros");
309 throw TTransportException("TFileTransport: error while padding zeros");
310 }
311 unflushed += padding;
312 offset_ += padding;
313 }
314 }
315
316 // write the dequeued event to the file
317 if(outEvent->eventSize_ > 0) {
318 if(-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
319 perror("TFileTransport: error while writing event");
320 // TODO: should this trigger an exception or simply continue?
321 throw TTransportException("TFileTransport: error while writing event");
322 }
323
324 unflushed += outEvent->eventSize_;
325 offset_ += outEvent->eventSize_;
326 }
327
328 // couple of cases from which a flush could be triggered
329 if((getCurrentTime() >= nextFlush && unflushed > 0) ||
330 unflushed > flushMaxBytes_ ||
331 (outEvent && (outEvent->eventSize_== 0)) ) {
332 //T_DEBUG("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
333
334 // sync (force flush) file to disk
335 fsync(fd_);
336 nextFlush = getCurrentTime() + flushMaxUs_;
337 unflushed = 0;
338
339 // notify anybody(thing?) waiting for flush completion
340 pthread_mutex_lock(&mutex_);
341 notFlushed_ = false;
342 pthread_mutex_unlock(&mutex_);
343 pthread_cond_broadcast(&flushed_);
344 }
345 // deallocate dequeued event
346 delete(outEvent);
347 }
348}
349
350void TFileTransport::flush() {
351 eventInfo* flushEvent = new eventInfo();
Aditya Agarwale528c762006-10-11 02:48:43 +0000352 notFlushed_ = true;
353
354 enqueueEvent(flushEvent, false);
355
356 // wait for flush to take place
357 pthread_mutex_lock(&mutex_);
358
359 while(notFlushed_) {
360 pthread_cond_wait(&flushed_, &mutex_);
361 }
362
363 pthread_mutex_unlock(&mutex_);
364}
365
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000366
367uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
368 uint32_t have = 0;
369 uint32_t get = 0;
370
371 while (have < len) {
372 get = read(buf+have, len-have);
373 if (get <= 0) {
374 throw TEOFException();
375 }
376 have += get;
377 }
378
379 return have;
380}
381
382uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
383 // check if there an event is ready to be read
384 if (!currentEvent_) {
385 readEvent();
386 }
387
388 // did not manage to read an event from the file. This could have happened
389 // if the timeout expired or there was some other error
390 if (!currentEvent_) {
391 return 0;
392 }
393
394 // read as much of the current event as possible
395 int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
396 if (remaining <= (int32_t)len) {
397 memcpy(buf,
398 currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
399 remaining);
400 delete(currentEvent_);
401 currentEvent_ = 0;
402 return remaining;
403 }
404
405 // read as much as possible
406 memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
407 currentEvent_->eventBuffPos_ += len;
408 return len;
409}
410
411bool TFileTransport::readEvent() {
412 int readTries = 0;
413
414 if (!readBuff_) {
415 readBuff_ = new uint8_t[readBuffSize_];
416 }
417
418 while (1) {
419 // check if there is anything in the read buffer
420 if (readState_.bufferPtr_ == readState_.bufferLen_) {
421 // advance the offset pointer
422 offset_ += readState_.bufferLen_;
423 readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
424 if (readState_.bufferLen_) {
425 T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
426 }
427 readState_.bufferPtr_ = 0;
428 readState_.lastDispatchPtr_ = 0;
429
430 // read error
431 if (readState_.bufferLen_ == -1) {
432 readState_.resetAllValues();
433 perror("TFileTransport: error while reading from file");
434 // TODO: should this trigger an exception or simply continue?
435 throw TTransportException("TFileTransport: error while reading from file");
436 } else if (readState_.bufferLen_ == 0) { // EOF
437 // wait indefinitely if there is no timeout
438 if (readTimeout_ == -1) {
439 usleep(eofSleepTime_);
440 continue;
441 } else if (readTimeout_ == 0) {
442 // reset state
443 readState_.resetState(0);
444 return false;
445 } else if (readTimeout_ > 0) {
446 // timeout already expired once
447 if (readTries > 0) {
448 readState_.resetState(0);
449 return false;
450 } else {
451 usleep(readTimeout_ * 1000);
452 readTries++;
453 continue;
454 }
455 }
456 }
457 }
458
459 readTries = 0;
460
461 // attempt to read an event from the buffer
462 while(readState_.bufferPtr_ < readState_.bufferLen_) {
463 if (readState_.readingSize_) {
464 if(readState_.eventSizeBuffPos_ == 0) {
465 if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
466 ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
467 // skip one byte towards chunk boundary
468 // T_DEBUG_L(1, "Skipping a byte");
469 readState_.bufferPtr_++;
470 continue;
471 }
472 }
473
474 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
475 readBuff_[readState_.bufferPtr_++];
476 bool eventCorruption = false;
477 if (readState_.eventSizeBuffPos_ == 4) {
478 // 0 length event indicates padding
479 if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
480 T_DEBUG_L(1, "Got padding");
481 readState_.resetState(readState_.lastDispatchPtr_);
482 continue;
483 }
484 // got a valid event
485 readState_.readingSize_ = false;
486 if (readState_.event_) {
487 delete(readState_.event_);
488 }
489 readState_.event_ = new eventInfo();
490 readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
491
492 T_DEBUG_L(0, "Event size: %u", readState_.event_->eventSize_);
493
494 // TODO
495 // make sure event is valid, an error is triggered if:
496 // 1. Event size is larger than user-speficied max-event size
497
498 // 2. Event size is larger than chunk size
499
500 // 3. size indicates that event crosses chunk boundary
501
502 }
503
504 if (eventCorruption) {
505 // perform some kickass recovery
506 }
507 } else {
508 if (!readState_.event_->eventBuff_) {
509 readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
510 readState_.event_->eventBuffPos_ = 0;
511 }
512 // take either the entire event or the remaining bytes in the buffer
513 int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
514 readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
515
516 // copy data from read buffer into event buffer
517 memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
518 readBuff_ + readState_.bufferPtr_,
519 reclaimBuffer);
520
521 // increment position ptrs
522 readState_.event_->eventBuffPos_ += reclaimBuffer;
523 readState_.bufferPtr_ += reclaimBuffer;
524
525 // if (reclaimBuffer > 0) {
526 // T_DEBUG_L(0, "eventBuffPost: %u", readState_.event_->eventBuffPos_);
527 // T_DEBUG_L(0, "eventSize: %u", readState_.event_->eventSize_);
528 // }
529
530 // check if the event has been read in full
531 if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
532 // set the completed event to the current event
533 currentEvent_ = readState_.event_;
534 currentEvent_->eventBuffPos_ = 0;
535
536 readState_.event_ = 0;
537 readState_.resetState(readState_.bufferPtr_);
538
539 // exit criteria
540 T_DEBUG_L(0, "Finished one event");
541 return true;
542 }
543 }
544 }
545
546
547 }
548}
549
550void TFileTransport::seekToChunk(int32_t chunk) {
551 if (fd_ <= 0) {
552 throw TTransportException("File not open");
553 }
554
555 int32_t lastChunk = getNumChunks();
556
557 // negative indicates reverse seek (from the end)
558 if (chunk < 0) {
559 chunk += lastChunk;
560 }
561
562 // cannot seek past EOF
563 if (chunk > lastChunk) {
564 T_DEBUG("Trying to seek past EOF. Seeking to EOF instead");
565 chunk = lastChunk;
566 }
567
568 uint32_t minEndOffset = 0;
569 if (chunk == lastChunk) {
570 minEndOffset = lseek(fd_, 0, SEEK_END);
571 }
572
573 offset_ = lseek(fd_, chunk * chunkSize_, SEEK_SET);
574 readState_.resetAllValues();
575 if (offset_ == -1) {
576 perror("TFileTransport: lseek error in seekToChunk");
577 // TODO: should this trigger an exception or simply continue?
578 throw TTransportException("TFileTransport: lseek error in seekToChunk");
579 }
580
581 // seek to EOF if user wanted to go to last chunk
582 uint32_t oldReadTimeout = getReadTimeout();
583 setReadTimeout(0);
584 if (chunk == lastChunk) {
585 // keep on reading unti the last event at point of seekChunk call
586 while( readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
587 }
588 setReadTimeout(oldReadTimeout);
589
590}
591
592void TFileTransport::seekToEnd() {
593 seekToChunk(getNumChunks());
594}
595
596uint32_t TFileTransport::getNumChunks() {
597 if (fd_ <= 0) {
598 return 0;
599 }
600 struct stat f_info;
601 fstat(fd_, &f_info);
602 return (f_info.st_size)/chunkSize_;
603}
604
605// Utility Functions
606void TFileTransport::openLogFile() {
Aditya Agarwale528c762006-10-11 02:48:43 +0000607 mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000608 fd_ = ::open(filename_.c_str(), O_RDWR | O_CREAT | O_APPEND, mode);
Aditya Agarwale528c762006-10-11 02:48:43 +0000609
610 // make sure open call was successful
611 if(fd_ == -1) {
612 char errorMsg[1024];
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000613 sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
Aditya Agarwale528c762006-10-11 02:48:43 +0000614 perror(errorMsg);
615 throw TTransportException(errorMsg);
616 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000617
618 // opening the file in append mode causes offset_t to be at the end
619 offset_ = lseek(fd_, 0, SEEK_CUR);
620 T_DEBUG_L(1, "initial offset: %lu", offset_);
Aditya Agarwale528c762006-10-11 02:48:43 +0000621}
622
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000623uint32_t TFileTransport::getCurrentTime() {
Aditya Agarwale528c762006-10-11 02:48:43 +0000624 long long ret;
625 struct timeval tv;
626 gettimeofday(&tv, NULL);
627 ret = tv.tv_sec;
628 ret = ret*1000*1000 + tv.tv_usec;
629 return ret;
630}
631
632
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000633TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
634 shared_ptr<TProtocolFactory> protocolFactory,
635 shared_ptr<TFileTransport> inputTransport):
636 processor_(processor), protocolFactory_(protocolFactory),
637 inputTransport_(inputTransport) {
Aditya Agarwale528c762006-10-11 02:48:43 +0000638
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000639 // default the output transport to a null transport (common case)
640 outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
641}
Aditya Agarwale528c762006-10-11 02:48:43 +0000642
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000643TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
644 shared_ptr<TProtocolFactory> protocolFactory,
645 shared_ptr<TFileTransport> inputTransport,
646 shared_ptr<TTransport> outputTransport):
647 processor_(processor), protocolFactory_(protocolFactory),
648 inputTransport_(inputTransport), outputTransport_(outputTransport) {
649};
650
651void TFileProcessor::process(uint32_t numEvents, bool tail) {
652 pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
653 iop = protocolFactory_->getIOProtocols(inputTransport_, outputTransport_);
654
655 // set the read timeout to 0 if tailing is required
656 int32_t oldReadTimeout = inputTransport_->getReadTimeout();
657 if (tail) {
658 // save old read timeout so it can be restored
659 inputTransport_->setReadTimeout(0);
660 }
661
662 uint32_t numProcessed = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +0000663 while(1) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000664 // bad form to use exceptions for flow control but there is really
665 // no other way around it
666 try {
667 processor_->process(iop.first, iop.second);
668 numProcessed++;
669 if ( (numEvents > 0) && (numProcessed == numEvents)) {
670 return;
Aditya Agarwale528c762006-10-11 02:48:43 +0000671 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000672 } catch (TEOFException& teof) {
673 if (!tail) {
674 break;
Aditya Agarwale528c762006-10-11 02:48:43 +0000675 }
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000676 } catch (TException te) {
677 cerr << te.what() << endl;
678 break;
Aditya Agarwale528c762006-10-11 02:48:43 +0000679 }
680 }
681
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000682 // restore old read timeout
683 if (tail) {
684 inputTransport_->setReadTimeout(oldReadTimeout);
685 }
686
Aditya Agarwale528c762006-10-11 02:48:43 +0000687}
688
689}}} // facebook::thrift::transport