blob: 819233282520bf915f235cace1b52472ede55f96 [file] [log] [blame]
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00001#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
2#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
Aditya Agarwale528c762006-10-11 02:48:43 +00003
4#include "TTransport.h"
5#include "Thrift.h"
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00006#include "TProcessor.h"
Aditya Agarwale528c762006-10-11 02:48:43 +00007
8#include <string>
Aditya Agarwale9ef8d72006-12-08 23:52:57 +00009#include <stdio.h>
Aditya Agarwale528c762006-10-11 02:48:43 +000010
11#include <boost/shared_ptr.hpp>
12
13namespace facebook { namespace thrift { namespace transport {
14
15using namespace boost;
16using std::string;
17
18// Data pertaining to a single event
19typedef struct eventInfo {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000020 uint8_t* eventBuff_;
21 uint32_t eventSize_;
22 uint32_t eventBuffPos_;
Aditya Agarwale528c762006-10-11 02:48:43 +000023
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000024 eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
25 ~eventInfo() {
26 if (eventBuff_) {
27 delete[] eventBuff_;
28 }
29 }
Aditya Agarwale528c762006-10-11 02:48:43 +000030} eventInfo;
31
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000032// information about current read state
33typedef struct readState {
34 eventInfo* event_;
Aditya Agarwale528c762006-10-11 02:48:43 +000035
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000036 // keep track of event size
37 uint8_t eventSizeBuff_[4];
38 uint8_t eventSizeBuffPos_;
39 bool readingSize_;
40
41 // read buffer variables
42 int32_t bufferPtr_;
43 int32_t bufferLen_;
44
45 // last successful dispatch point
46 int32_t lastDispatchPtr_;
47
48 void resetState(uint32_t lastDispatchPtr) {
49 readingSize_ = true;
50 eventSizeBuffPos_ = 0;
51 lastDispatchPtr_ = lastDispatchPtr;
52 }
53
54 void resetAllValues() {
55 resetState(0);
56 bufferPtr_ = 0;
57 bufferLen_ = 0;
58 if (event_) {
59 delete(event_);
60 }
61 event_ = 0;
62 }
63
64 readState() {
65 event_ = 0;
66 resetAllValues();
67 }
68
69 ~readState() {
70 if (event_) {
71 delete(event_);
72 }
73 }
74
75} readState;
76
Aditya Agarwale528c762006-10-11 02:48:43 +000077/**
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000078 * File implementation of a transport. Reads and writes are done to a
79 * file on disk.
Aditya Agarwale528c762006-10-11 02:48:43 +000080 *
81 * @author Aditya Agarwal <aditya@facebook.com>
82 */
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000083class TFileTransport : public TTransport {
Aditya Agarwale528c762006-10-11 02:48:43 +000084 public:
Aditya Agarwale9ef8d72006-12-08 23:52:57 +000085 TFileTransport(string path);
86 ~TFileTransport();
87
88 // TODO: what is the correct behaviour for this?
89 // the log file is generally always open
90 bool isOpen() {
91 return true;
92 }
93
94 void write(const uint8_t* buf, uint32_t len) {
95 enqueueEvent(buf, len, false);
96 }
97
98 void flush();
99
100 uint32_t readAll(uint8_t* buf, uint32_t len);
101 uint32_t read(uint8_t* buf, uint32_t len);
102
103 // log-file specific functions
104 void seekToChunk(int chunk);
105 void seekToEnd();
106 uint32_t getNumChunks();
107
108 // for changing the output file
109 void resetOutputFile(int fd, string filename, long long offset);
110
111 // Setter/Getter functions for user-controllable options
112 void setReadBuffSize(uint32_t readBuffSize) {
113 if (readBuffSize) {
114 readBuffSize_ = readBuffSize;
115 }
116 }
117 uint32_t getReadBuffSize() {
118 return readBuffSize_;
119 }
120
121 void setReadTimeout(int32_t readTimeout) {
122 readTimeout_ = readTimeout;
123 }
124 int32_t getReadTimeout() {
125 return readTimeout_;
126 }
127
128 void setChunkSize(uint32_t chunkSize) {
129 if (chunkSize) {
130 chunkSize_ = chunkSize;
131 }
132 }
133 uint32_t getChunkSize() {
134 return chunkSize_;
135 }
136
137 void setEventBufferSize(uint32_t bufferSize) {
138 if (bufferSize) {
139 if (buffer_) {
140 delete[] buffer_;
141 }
142 eventBufferSize_ = bufferSize;
143 buffer_ = new eventInfo*[eventBufferSize_];
144 }
145 }
146 uint32_t getEventBufferSize() {
147 return eventBufferSize_;
148 }
149
Aditya Agarwale528c762006-10-11 02:48:43 +0000150 void setFlushMaxUs(uint32_t flushMaxUs) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000151 if (flushMaxUs) {
152 flushMaxUs_ = flushMaxUs;
153 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000154 }
155 uint32_t getFlushMaxUs() {
156 return flushMaxUs_;
157 }
158
159 void setFlushMaxBytes(uint32_t flushMaxBytes) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000160 if (flushMaxBytes) {
161 flushMaxBytes_ = flushMaxBytes;
162 }
Aditya Agarwale528c762006-10-11 02:48:43 +0000163 }
164 uint32_t getFlushMaxBytes() {
165 return flushMaxBytes_;
166 }
167
Aditya Agarwale528c762006-10-11 02:48:43 +0000168 void setMaxEventSize(uint32_t maxEventSize) {
169 maxEventSize_ = maxEventSize;
170 }
171 uint32_t getMaxEventSize() {
172 return maxEventSize_;
173 }
174
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000175 void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
176 maxCorruptedEvents_ = maxCorruptedEvents;
177 }
178 uint32_t getMaxCorruptedEvents() {
179 return maxCorruptedEvents_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000180 }
181
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000182 void setEofSleepTimeUs(uint32_t eofSleepTime) {
183 if (eofSleepTime) {
184 eofSleepTime_ = eofSleepTime;
185 }
186 }
187 uint32_t getEofSleepTimeUs() {
188 return eofSleepTime_;
189 }
190
191 private:
192 // helper functions for writing to a file
193 void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
194 void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
195 eventInfo* dequeueEvent(long long deadline);
Aditya Agarwale528c762006-10-11 02:48:43 +0000196
197 // control for writer thread
198 static void* startWriterThread(void* ptr) {
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000199 (((TFileTransport*)ptr)->writerThread());
Aditya Agarwale528c762006-10-11 02:48:43 +0000200 return 0;
201 }
202 void writerThread();
203
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000204 // helper functions for reading from a file
205 bool readEvent();
Aditya Agarwale528c762006-10-11 02:48:43 +0000206
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000207 // Utility functions
208 void openLogFile();
209 uint32_t getCurrentTime();
210
211 // Class variables
212 readState readState_;
213 uint8_t* readBuff_;
214
215 eventInfo* currentEvent_;
216
217 uint32_t readBuffSize_;
218 static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
219
220 int32_t readTimeout_;
221 static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
Aditya Agarwale528c762006-10-11 02:48:43 +0000222
223 // size of chunks that file will be split up into
224 uint32_t chunkSize_;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000225 static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
Aditya Agarwale528c762006-10-11 02:48:43 +0000226
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000227 // size of string buffer
228 uint32_t eventBufferSize_;
229 static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
230
231 // circular buffer to hold data in before it is flushed. This is an array of strings. Each
232 // element of the array stores a msg that needs to be written to the file
233 eventInfo** buffer_;
234
Aditya Agarwale528c762006-10-11 02:48:43 +0000235 // max number of microseconds that can pass without flushing
236 uint32_t flushMaxUs_;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000237 static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
Aditya Agarwale528c762006-10-11 02:48:43 +0000238
239 // max number of bytes that can be written without flushing
240 uint32_t flushMaxBytes_;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000241 static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
Aditya Agarwale528c762006-10-11 02:48:43 +0000242
243 // max event size
244 uint32_t maxEventSize_;
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000245 static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
246
247 // max number of corrupted events per chunk
248 uint32_t maxCorruptedEvents_;
249 static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
Aditya Agarwale528c762006-10-11 02:48:43 +0000250
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000251 // sleep duration when EOF is hit
252 uint32_t eofSleepTime_;
253 static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
254
Aditya Agarwale528c762006-10-11 02:48:43 +0000255 // writer thread id
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000256 pthread_t writerThreadId_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000257
258 // variables that determine position of head/tail of circular buffer
259 int headPos_, tailPos_;
260
261 // variables indicating whether the buffer is full or empty
262 bool isFull_, isEmpty_;
263 pthread_cond_t notFull_, notEmpty_;
264 bool closing_;
265
266 // To keep track of whether the buffer has been flushed
267 pthread_cond_t flushed_;
268 bool notFlushed_;
269
270 // Mutex that is grabbed when enqueueing, dequeueing and flushing
271 // from the circular buffer
272 pthread_mutex_t mutex_;
273
274 // File information
275 string filename_;
276 int fd_;
277
278 // Offset within the file
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000279 off_t offset_;
Aditya Agarwale528c762006-10-11 02:48:43 +0000280
281};
282
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000283// Exception thrown when EOF is hit
284class TEOFException : public facebook::thrift::TTransportException {
285 public:
286 TEOFException():
287 facebook::thrift::TTransportException(TTX_EOF) {};
288};
Aditya Agarwale528c762006-10-11 02:48:43 +0000289
Aditya Agarwale9ef8d72006-12-08 23:52:57 +0000290
291// wrapper class to process events from a file containing thrift events
292class TFileProcessor {
293 public:
294 /**
295 * Constructor that defaults output transport to null transport
296 *
297 * @param processor processes log-file events
298 * @param protocolFactory protocol factory
299 * @param inputTransport file transport
300 */
301 TFileProcessor(shared_ptr<TProcessor> processor,
302 shared_ptr<TProtocolFactory> protocolFactory,
303 shared_ptr<TFileTransport> inputTransport);
304
305 /**
306 * Constructor
307 *
308 * @param processor processes log-file events
309 * @param protocolFactory protocol factory
310 * @param inputTransport input file transport
311 * @param output output transport
312 */
313 TFileProcessor(shared_ptr<TProcessor> processor,
314 shared_ptr<TProtocolFactory> protocolFactory,
315 shared_ptr<TFileTransport> inputTransport,
316 shared_ptr<TTransport> outputTransport);
317
318 /**
319 * processes events from the file
320 *
321 * @param numEvents number of events to process (0 for unlimited)
322 * @param tail tails the file if true
323 */
324 void process(uint32_t numEvents, bool tail);
325
326 private:
327 shared_ptr<TProcessor> processor_;
328 shared_ptr<TProtocolFactory> protocolFactory_;
329 shared_ptr<TFileTransport> inputTransport_;
330 shared_ptr<TTransport> outputTransport_;
331};
332
333
334}}} // facebook::thrift::transport
335
336#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_