blob: 819233282520bf915f235cace1b52472ede55f96 [file] [log] [blame]
#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
#include "TTransport.h"
#include "Thrift.h"
#include "TProcessor.h"
#include <string>
#include <stdio.h>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
using namespace boost;
using std::string;
// Data pertaining to a single event
typedef struct eventInfo {
uint8_t* eventBuff_;
uint32_t eventSize_;
uint32_t eventBuffPos_;
eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){};
~eventInfo() {
if (eventBuff_) {
delete[] eventBuff_;
}
}
} eventInfo;
// information about current read state
typedef struct readState {
eventInfo* event_;
// keep track of event size
uint8_t eventSizeBuff_[4];
uint8_t eventSizeBuffPos_;
bool readingSize_;
// read buffer variables
int32_t bufferPtr_;
int32_t bufferLen_;
// last successful dispatch point
int32_t lastDispatchPtr_;
void resetState(uint32_t lastDispatchPtr) {
readingSize_ = true;
eventSizeBuffPos_ = 0;
lastDispatchPtr_ = lastDispatchPtr;
}
void resetAllValues() {
resetState(0);
bufferPtr_ = 0;
bufferLen_ = 0;
if (event_) {
delete(event_);
}
event_ = 0;
}
readState() {
event_ = 0;
resetAllValues();
}
~readState() {
if (event_) {
delete(event_);
}
}
} readState;
/**
* File implementation of a transport. Reads and writes are done to a
* file on disk.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
class TFileTransport : public TTransport {
public:
TFileTransport(string path);
~TFileTransport();
// TODO: what is the correct behaviour for this?
// the log file is generally always open
bool isOpen() {
return true;
}
void write(const uint8_t* buf, uint32_t len) {
enqueueEvent(buf, len, false);
}
void flush();
uint32_t readAll(uint8_t* buf, uint32_t len);
uint32_t read(uint8_t* buf, uint32_t len);
// log-file specific functions
void seekToChunk(int chunk);
void seekToEnd();
uint32_t getNumChunks();
// for changing the output file
void resetOutputFile(int fd, string filename, long long offset);
// Setter/Getter functions for user-controllable options
void setReadBuffSize(uint32_t readBuffSize) {
if (readBuffSize) {
readBuffSize_ = readBuffSize;
}
}
uint32_t getReadBuffSize() {
return readBuffSize_;
}
void setReadTimeout(int32_t readTimeout) {
readTimeout_ = readTimeout;
}
int32_t getReadTimeout() {
return readTimeout_;
}
void setChunkSize(uint32_t chunkSize) {
if (chunkSize) {
chunkSize_ = chunkSize;
}
}
uint32_t getChunkSize() {
return chunkSize_;
}
void setEventBufferSize(uint32_t bufferSize) {
if (bufferSize) {
if (buffer_) {
delete[] buffer_;
}
eventBufferSize_ = bufferSize;
buffer_ = new eventInfo*[eventBufferSize_];
}
}
uint32_t getEventBufferSize() {
return eventBufferSize_;
}
void setFlushMaxUs(uint32_t flushMaxUs) {
if (flushMaxUs) {
flushMaxUs_ = flushMaxUs;
}
}
uint32_t getFlushMaxUs() {
return flushMaxUs_;
}
void setFlushMaxBytes(uint32_t flushMaxBytes) {
if (flushMaxBytes) {
flushMaxBytes_ = flushMaxBytes;
}
}
uint32_t getFlushMaxBytes() {
return flushMaxBytes_;
}
void setMaxEventSize(uint32_t maxEventSize) {
maxEventSize_ = maxEventSize;
}
uint32_t getMaxEventSize() {
return maxEventSize_;
}
void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
maxCorruptedEvents_ = maxCorruptedEvents;
}
uint32_t getMaxCorruptedEvents() {
return maxCorruptedEvents_;
}
void setEofSleepTimeUs(uint32_t eofSleepTime) {
if (eofSleepTime) {
eofSleepTime_ = eofSleepTime;
}
}
uint32_t getEofSleepTimeUs() {
return eofSleepTime_;
}
private:
// helper functions for writing to a file
void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
void enqueueEvent(eventInfo* toEnqueue, bool blockUntilFlush);
eventInfo* dequeueEvent(long long deadline);
// control for writer thread
static void* startWriterThread(void* ptr) {
(((TFileTransport*)ptr)->writerThread());
return 0;
}
void writerThread();
// helper functions for reading from a file
bool readEvent();
// Utility functions
void openLogFile();
uint32_t getCurrentTime();
// Class variables
readState readState_;
uint8_t* readBuff_;
eventInfo* currentEvent_;
uint32_t readBuffSize_;
static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
int32_t readTimeout_;
static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
// size of chunks that file will be split up into
uint32_t chunkSize_;
static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
// size of string buffer
uint32_t eventBufferSize_;
static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 1024;
// circular buffer to hold data in before it is flushed. This is an array of strings. Each
// element of the array stores a msg that needs to be written to the file
eventInfo** buffer_;
// max number of microseconds that can pass without flushing
uint32_t flushMaxUs_;
static const uint32_t DEFAULT_FLUSH_MAX_US = 20000;
// max number of bytes that can be written without flushing
uint32_t flushMaxBytes_;
static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
// max event size
uint32_t maxEventSize_;
static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
// max number of corrupted events per chunk
uint32_t maxCorruptedEvents_;
static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
// sleep duration when EOF is hit
uint32_t eofSleepTime_;
static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
// writer thread id
pthread_t writerThreadId_;
// variables that determine position of head/tail of circular buffer
int headPos_, tailPos_;
// variables indicating whether the buffer is full or empty
bool isFull_, isEmpty_;
pthread_cond_t notFull_, notEmpty_;
bool closing_;
// To keep track of whether the buffer has been flushed
pthread_cond_t flushed_;
bool notFlushed_;
// Mutex that is grabbed when enqueueing, dequeueing and flushing
// from the circular buffer
pthread_mutex_t mutex_;
// File information
string filename_;
int fd_;
// Offset within the file
off_t offset_;
};
// Exception thrown when EOF is hit
class TEOFException : public facebook::thrift::TTransportException {
public:
TEOFException():
facebook::thrift::TTransportException(TTX_EOF) {};
};
// wrapper class to process events from a file containing thrift events
class TFileProcessor {
public:
/**
* Constructor that defaults output transport to null transport
*
* @param processor processes log-file events
* @param protocolFactory protocol factory
* @param inputTransport file transport
*/
TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport);
/**
* Constructor
*
* @param processor processes log-file events
* @param protocolFactory protocol factory
* @param inputTransport input file transport
* @param output output transport
*/
TFileProcessor(shared_ptr<TProcessor> processor,
shared_ptr<TProtocolFactory> protocolFactory,
shared_ptr<TFileTransport> inputTransport,
shared_ptr<TTransport> outputTransport);
/**
* processes events from the file
*
* @param numEvents number of events to process (0 for unlimited)
* @param tail tails the file if true
*/
void process(uint32_t numEvents, bool tail);
private:
shared_ptr<TProcessor> processor_;
shared_ptr<TProtocolFactory> protocolFactory_;
shared_ptr<TFileTransport> inputTransport_;
shared_ptr<TTransport> outputTransport_;
};
}}} // facebook::thrift::transport
#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_