Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 1 | #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_ |
| 2 | #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1 |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 3 | |
| 4 | #include "TTransport.h" |
| 5 | #include "Thrift.h" |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 6 | #include "TProcessor.h" |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 7 | |
| 8 | #include <string> |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 9 | #include <stdio.h> |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 10 | |
| 11 | #include <boost/shared_ptr.hpp> |
| 12 | |
| 13 | namespace facebook { namespace thrift { namespace transport { |
| 14 | |
| 15 | using namespace boost; |
| 16 | using std::string; |
| 17 | |
| 18 | // Data pertaining to a single event |
| 19 | typedef struct eventInfo { |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 20 | uint8_t* eventBuff_; |
| 21 | uint32_t eventSize_; |
| 22 | uint32_t eventBuffPos_; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 23 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 24 | eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){}; |
| 25 | ~eventInfo() { |
| 26 | if (eventBuff_) { |
| 27 | delete[] eventBuff_; |
| 28 | } |
| 29 | } |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 30 | } eventInfo; |
| 31 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 32 | // information about current read state |
| 33 | typedef struct readState { |
| 34 | eventInfo* event_; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 35 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 36 | // keep track of event size |
| 37 | uint8_t eventSizeBuff_[4]; |
| 38 | uint8_t eventSizeBuffPos_; |
| 39 | bool readingSize_; |
| 40 | |
| 41 | // read buffer variables |
| 42 | int32_t bufferPtr_; |
| 43 | int32_t bufferLen_; |
| 44 | |
| 45 | // last successful dispatch point |
| 46 | int32_t lastDispatchPtr_; |
| 47 | |
| 48 | void resetState(uint32_t lastDispatchPtr) { |
| 49 | readingSize_ = true; |
| 50 | eventSizeBuffPos_ = 0; |
| 51 | lastDispatchPtr_ = lastDispatchPtr; |
| 52 | } |
| 53 | |
| 54 | void resetAllValues() { |
| 55 | resetState(0); |
| 56 | bufferPtr_ = 0; |
| 57 | bufferLen_ = 0; |
| 58 | if (event_) { |
| 59 | delete(event_); |
| 60 | } |
| 61 | event_ = 0; |
| 62 | } |
| 63 | |
| 64 | readState() { |
| 65 | event_ = 0; |
| 66 | resetAllValues(); |
| 67 | } |
| 68 | |
| 69 | ~readState() { |
| 70 | if (event_) { |
| 71 | delete(event_); |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | } readState; |
| 76 | |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 77 | /** |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 78 | * File implementation of a transport. Reads and writes are done to a |
| 79 | * file on disk. |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 80 | * |
| 81 | * @author Aditya Agarwal <aditya@facebook.com> |
| 82 | */ |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 83 | class TFileTransport : public TTransport { |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 84 | public: |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 85 | TFileTransport(string path); |
| 86 | ~TFileTransport(); |
| 87 | |
| 88 | // TODO: what is the correct behaviour for this? |
| 89 | // the log file is generally always open |
| 90 | bool isOpen() { |
| 91 | return true; |
| 92 | } |
| 93 | |
| 94 | void write(const uint8_t* buf, uint32_t len) { |
| 95 | enqueueEvent(buf, len, false); |
| 96 | } |
| 97 | |
| 98 | void flush(); |
| 99 | |
| 100 | uint32_t readAll(uint8_t* buf, uint32_t len); |
| 101 | uint32_t read(uint8_t* buf, uint32_t len); |
| 102 | |
| 103 | // log-file specific functions |
| 104 | void seekToChunk(int chunk); |
| 105 | void seekToEnd(); |
| 106 | uint32_t getNumChunks(); |
| 107 | |
| 108 | // for changing the output file |
| 109 | void resetOutputFile(int fd, string filename, long long offset); |
| 110 | |
| 111 | // Setter/Getter functions for user-controllable options |
| 112 | void setReadBuffSize(uint32_t readBuffSize) { |
| 113 | if (readBuffSize) { |
| 114 | readBuffSize_ = readBuffSize; |
| 115 | } |
| 116 | } |
| 117 | uint32_t getReadBuffSize() { |
| 118 | return readBuffSize_; |
| 119 | } |
| 120 | |
| 121 | void setReadTimeout(int32_t readTimeout) { |
| 122 | readTimeout_ = readTimeout; |
| 123 | } |
| 124 | int32_t getReadTimeout() { |
| 125 | return readTimeout_; |
| 126 | } |
| 127 | |
| 128 | void setChunkSize(uint32_t chunkSize) { |
| 129 | if (chunkSize) { |
| 130 | chunkSize_ = chunkSize; |
| 131 | } |
| 132 | } |
| 133 | uint32_t getChunkSize() { |
| 134 | return chunkSize_; |
| 135 | } |
| 136 | |
| 137 | void setEventBufferSize(uint32_t bufferSize) { |
| 138 | if (bufferSize) { |
| 139 | if (buffer_) { |
| 140 | delete[] buffer_; |
| 141 | } |
| 142 | eventBufferSize_ = bufferSize; |
| 143 | buffer_ = new eventInfo*[eventBufferSize_]; |
| 144 | } |
| 145 | } |
| 146 | uint32_t getEventBufferSize() { |
| 147 | return eventBufferSize_; |
| 148 | } |
| 149 | |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 150 | void setFlushMaxUs(uint32_t flushMaxUs) { |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 151 | if (flushMaxUs) { |
| 152 | flushMaxUs_ = flushMaxUs; |
| 153 | } |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 154 | } |
| 155 | uint32_t getFlushMaxUs() { |
| 156 | return flushMaxUs_; |
| 157 | } |
| 158 | |
| 159 | void setFlushMaxBytes(uint32_t flushMaxBytes) { |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 160 | if (flushMaxBytes) { |
| 161 | flushMaxBytes_ = flushMaxBytes; |
| 162 | } |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 163 | } |
| 164 | uint32_t getFlushMaxBytes() { |
| 165 | return flushMaxBytes_; |
| 166 | } |
| 167 | |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 168 | void setMaxEventSize(uint32_t maxEventSize) { |
| 169 | maxEventSize_ = maxEventSize; |
| 170 | } |
| 171 | uint32_t getMaxEventSize() { |
| 172 | return maxEventSize_; |
| 173 | } |
| 174 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 175 | void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) { |
| 176 | maxCorruptedEvents_ = maxCorruptedEvents; |
| 177 | } |
| 178 | uint32_t getMaxCorruptedEvents() { |
| 179 | return maxCorruptedEvents_; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 180 | } |
| 181 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 182 | void setEofSleepTimeUs(uint32_t eofSleepTime) { |
| 183 | if (eofSleepTime) { |
| 184 | eofSleepTime_ = eofSleepTime; |
| 185 | } |
| 186 | } |
| 187 | uint32_t getEofSleepTimeUs() { |
| 188 | return eofSleepTime_; |
| 189 | } |
| 190 | |
| 191 | private: |
| 192 | // helper functions for writing to a file |
| 193 | void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush); |
| 194 | void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush); |
| 195 | eventInfo* dequeueEvent(long long deadline); |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 196 | |
| 197 | // control for writer thread |
| 198 | static void* startWriterThread(void* ptr) { |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 199 | (((TFileTransport*)ptr)->writerThread()); |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 200 | return 0; |
| 201 | } |
| 202 | void writerThread(); |
| 203 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 204 | // helper functions for reading from a file |
| 205 | bool readEvent(); |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 206 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 207 | // Utility functions |
| 208 | void openLogFile(); |
| 209 | uint32_t getCurrentTime(); |
| 210 | |
| 211 | // Class variables |
| 212 | readState readState_; |
| 213 | uint8_t* readBuff_; |
| 214 | |
| 215 | eventInfo* currentEvent_; |
| 216 | |
| 217 | uint32_t readBuffSize_; |
| 218 | static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024; |
| 219 | |
| 220 | int32_t readTimeout_; |
| 221 | static const int32_t DEFAULT_READ_TIMEOUT_MS = 200; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 222 | |
| 223 | // size of chunks that file will be split up into |
| 224 | uint32_t chunkSize_; |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 225 | static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 226 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 227 | // size of string buffer |
| 228 | uint32_t eventBufferSize_; |
| 229 | static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024; |
| 230 | |
| 231 | // circular buffer to hold data in before it is flushed. This is an array of strings. Each |
| 232 | // element of the array stores a msg that needs to be written to the file |
| 233 | eventInfo** buffer_; |
| 234 | |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 235 | // max number of microseconds that can pass without flushing |
| 236 | uint32_t flushMaxUs_; |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 237 | static const uint32_t DEFAULT_FLUSH_MAX_US = 20000; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 238 | |
| 239 | // max number of bytes that can be written without flushing |
| 240 | uint32_t flushMaxBytes_; |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 241 | static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 242 | |
| 243 | // max event size |
| 244 | uint32_t maxEventSize_; |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 245 | static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0; |
| 246 | |
| 247 | // max number of corrupted events per chunk |
| 248 | uint32_t maxCorruptedEvents_; |
| 249 | static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 250 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 251 | // sleep duration when EOF is hit |
| 252 | uint32_t eofSleepTime_; |
| 253 | static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; |
| 254 | |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 255 | // writer thread id |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 256 | pthread_t writerThreadId_; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 257 | |
| 258 | // variables that determine position of head/tail of circular buffer |
| 259 | int headPos_, tailPos_; |
| 260 | |
| 261 | // variables indicating whether the buffer is full or empty |
| 262 | bool isFull_, isEmpty_; |
| 263 | pthread_cond_t notFull_, notEmpty_; |
| 264 | bool closing_; |
| 265 | |
| 266 | // To keep track of whether the buffer has been flushed |
| 267 | pthread_cond_t flushed_; |
| 268 | bool notFlushed_; |
| 269 | |
| 270 | // Mutex that is grabbed when enqueueing, dequeueing and flushing |
| 271 | // from the circular buffer |
| 272 | pthread_mutex_t mutex_; |
| 273 | |
| 274 | // File information |
| 275 | string filename_; |
| 276 | int fd_; |
| 277 | |
| 278 | // Offset within the file |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 279 | off_t offset_; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 280 | |
| 281 | }; |
| 282 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 283 | // Exception thrown when EOF is hit |
| 284 | class TEOFException : public facebook::thrift::TTransportException { |
| 285 | public: |
| 286 | TEOFException(): |
| 287 | facebook::thrift::TTransportException(TTX_EOF) {}; |
| 288 | }; |
Aditya Agarwal | e528c76 | 2006-10-11 02:48:43 +0000 | [diff] [blame] | 289 | |
Aditya Agarwal | e9ef8d7 | 2006-12-08 23:52:57 +0000 | [diff] [blame] | 290 | |
| 291 | // wrapper class to process events from a file containing thrift events |
| 292 | class TFileProcessor { |
| 293 | public: |
| 294 | /** |
| 295 | * Constructor that defaults output transport to null transport |
| 296 | * |
| 297 | * @param processor processes log-file events |
| 298 | * @param protocolFactory protocol factory |
| 299 | * @param inputTransport file transport |
| 300 | */ |
| 301 | TFileProcessor(shared_ptr<TProcessor> processor, |
| 302 | shared_ptr<TProtocolFactory> protocolFactory, |
| 303 | shared_ptr<TFileTransport> inputTransport); |
| 304 | |
| 305 | /** |
| 306 | * Constructor |
| 307 | * |
| 308 | * @param processor processes log-file events |
| 309 | * @param protocolFactory protocol factory |
| 310 | * @param inputTransport input file transport |
| 311 | * @param output output transport |
| 312 | */ |
| 313 | TFileProcessor(shared_ptr<TProcessor> processor, |
| 314 | shared_ptr<TProtocolFactory> protocolFactory, |
| 315 | shared_ptr<TFileTransport> inputTransport, |
| 316 | shared_ptr<TTransport> outputTransport); |
| 317 | |
| 318 | /** |
| 319 | * processes events from the file |
| 320 | * |
| 321 | * @param numEvents number of events to process (0 for unlimited) |
| 322 | * @param tail tails the file if true |
| 323 | */ |
| 324 | void process(uint32_t numEvents, bool tail); |
| 325 | |
| 326 | private: |
| 327 | shared_ptr<TProcessor> processor_; |
| 328 | shared_ptr<TProtocolFactory> protocolFactory_; |
| 329 | shared_ptr<TFileTransport> inputTransport_; |
| 330 | shared_ptr<TTransport> outputTransport_; |
| 331 | }; |
| 332 | |
| 333 | |
| 334 | }}} // facebook::thrift::transport |
| 335 | |
| 336 | #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_ |