blob: c327aabdaaf162c84eb1196f75ab39800aa56826 [file] [log] [blame]
Aditya Agarwale528c762006-10-11 02:48:43 +00001#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
2#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
3
4#include "TTransport.h"
5#include "Thrift.h"
6
7#include <string>
8
9#include <boost/shared_ptr.hpp>
10
11namespace facebook { namespace thrift { namespace transport {
12
13using namespace boost;
14using std::string;
15
16// Data pertaining to a single event
17typedef struct eventInfo {
18 uint8_t* payLoad_;
19 uint32_t eventSize_;
20
21 eventInfo():payLoad_(NULL), eventSize_(0){};
22} eventInfo;
23
24
25/**
26 * Class that stores a circular in-memory event/message buffer and writes
27 * elements to disk when the buffer becomes full or a flush is triggered.
28 *
29 * @author Aditya Agarwal <aditya@facebook.com>
30 */
31class TBufferedFileWriter : public TTransport {
32 public:
33 void setFlushMaxUs(uint32_t flushMaxUs) {
34 flushMaxUs_ = flushMaxUs;
35 }
36 uint32_t getFlushMaxUs() {
37 return flushMaxUs_;
38 }
39
40 void setFlushMaxBytes(uint32_t flushMaxBytes) {
41 flushMaxBytes_ = flushMaxBytes;
42 }
43 uint32_t getFlushMaxBytes() {
44 return flushMaxBytes_;
45 }
46
47 void setChunkSize(uint32_t chunkSize) {
48 chunkSize_ = chunkSize;
49 }
50 uint32_t getChunkSize() {
51 return chunkSize_;
52 }
53
54 void setMaxEventSize(uint32_t maxEventSize) {
55 maxEventSize_ = maxEventSize;
56 }
57 uint32_t getMaxEventSize() {
58 return maxEventSize_;
59 }
60
61 TBufferedFileWriter(string filename, uint32_t sz);
62 TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
63 void init(string filename, uint32_t sz, int fd, long long offset);
64 ~TBufferedFileWriter();
65
66 void resetOutputFile(int fd, string filename, long long offset);
67
68 void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
69 void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
70 void write(const uint8_t* buf, uint32_t len) {
71 enqueueEvent(buf, len, false);
72 }
73
74 eventInfo dequeueEvent(long long deadline);
75 void flush();
76
77 // control for writer thread
78 static void* startWriterThread(void* ptr) {
79 (((TBufferedFileWriter*)ptr)->writerThread());
80 return 0;
81 }
82 void writerThread();
83
84
85 private:
86 // circular buffer to hold data in before it is flushed. This is an array of strings. Each
87 // element of the array stores a msg that needs to be written to the file
88 eventInfo* buffer_;
89
90 // size of string buffer
91 uint32_t sz_;
92
93 // size of chunks that file will be split up into
94 uint32_t chunkSize_;
95
96 // max number of microseconds that can pass without flushing
97 uint32_t flushMaxUs_;
98
99 // max number of bytes that can be written without flushing
100 uint32_t flushMaxBytes_;
101
102 // max event size
103 uint32_t maxEventSize_;
104
105 // writer thread id
106 pthread_t writer_;
107
108 // variables that determine position of head/tail of circular buffer
109 int headPos_, tailPos_;
110
111 // variables indicating whether the buffer is full or empty
112 bool isFull_, isEmpty_;
113 pthread_cond_t notFull_, notEmpty_;
114 bool closing_;
115
116 // To keep track of whether the buffer has been flushed
117 pthread_cond_t flushed_;
118 bool notFlushed_;
119
120 // Mutex that is grabbed when enqueueing, dequeueing and flushing
121 // from the circular buffer
122 pthread_mutex_t mutex_;
123
124 // File information
125 string filename_;
126 int fd_;
127
128 // Offset within the file
129 long long offset_;
130
131 void openOutputFile();
132 uint32_t getCurrentTime();
133
134};
135
136}}}
137
138#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_