namespace facebook { namespace thrift {
+TOutput GlobalOutput;
+
uint32_t TApplicationException::read(facebook::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
namespace facebook { namespace thrift {
+class TOutput{
+public:
+ TOutput() : f_(perror) {}
+
+ inline void setOutputFunction(void (*function)(const char *)){
+ f_ = function;
+ }
+
+ inline void operator()(const char *message){
+ f_(message);
+ }
+
+private:
+ void (*f_)(const char *);
+};
+
+extern TOutput GlobalOutput;
+
namespace protocol {
class TProtocol;
}
}
readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
if (readBuffer_ == NULL) {
- perror("TConnection::workSocket() realloc");
+ GlobalOutput("TConnection::workSocket() realloc");
close();
return;
}
}
if (errno != ECONNRESET) {
- perror("TConnection::workSocket() recv -1");
+ GlobalOutput("TConnection::workSocket() recv -1");
}
}
return;
}
if (errno != EPIPE) {
- perror("TConnection::workSocket() send -1");
+ GlobalOutput("TConnection::workSocket() send -1");
}
close();
return;
// Delete a previously existing event
if (eventFlags_ != 0) {
if (event_del(&event_) == -1) {
- perror("TConnection::setFlags event_del");
+ GlobalOutput("TConnection::setFlags event_del");
return;
}
}
// Add the event
if (event_add(&event_, 0) == -1) {
- perror("TConnection::setFlags(): coult not event_add");
+ GlobalOutput("TConnection::setFlags(): coult not event_add");
}
}
void TConnection::close() {
// Delete the registered libevent
if (event_del(&event_) == -1) {
- perror("TConnection::close() event_del");
+ GlobalOutput("TConnection::close() event_del");
}
// Close the socket
int flags;
if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
- perror("thriftServerEventHandler: set O_NONBLOCK");
+ GlobalOutput("thriftServerEventHandler: set O_NONBLOCK");
close(clientSocket);
return;
}
// Done looping accept, now we have to make sure the error is due to
// blocking. Any other error is a problem
if (errno != EAGAIN && errno != EWOULDBLOCK) {
- perror("thriftServerEventHandler: accept()");
+ GlobalOutput("thriftServerEventHandler: accept()");
}
}
// Create the server socket
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket_ == -1) {
- perror("TNonblockingServer::serve() socket() -1");
+ GlobalOutput("TNonblockingServer::serve() socket() -1");
return;
}
int flags;
if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
- perror("TNonblockingServer::serve() O_NONBLOCK");
+ GlobalOutput("TNonblockingServer::serve() O_NONBLOCK");
::close(serverSocket_);
return;
}
addr.sin_addr.s_addr = INADDR_ANY;
if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
- perror("TNonblockingServer::serve() bind");
+ GlobalOutput("TNonblockingServer::serve() bind");
close(serverSocket_);
return;
}
if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
- perror("TNonblockingServer::serve() listen");
+ GlobalOutput("TNonblockingServer::serve() listen");
close(serverSocket_);
return;
}
// Add the event and start up the server
if (event_add(&serverEvent, 0) == -1) {
- perror("TNonblockingServer::serve(): coult not event_add");
+ GlobalOutput("TNonblockingServer::serve(): coult not event_add");
return;
}
flush();
fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str());
if (-1 == ::close(fd_)) {
- perror("TFileTransport: error in file close");
+ GlobalOutput("TFileTransport: error in file close");
throw TTransportException("TFileTransport: error in file close");
}
}
// close logfile
if (fd_ > 0) {
if(-1 == ::close(fd_)) {
- perror("TFileTransport: error in file close");
+ GlobalOutput("TFileTransport: error in file close");
}
}
}
// empty out both the buffers
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
if (-1 == ::close(fd_)) {
- perror("TFileTransport: error in close");
+ GlobalOutput("TFileTransport: error in close");
throw TTransportException("TFileTransport: error in file close");
}
// just be safe and sync to disk
//T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)",
//padding, offset_, chunk2);
if (-1 == ::write(fd_, zeros, padding)) {
- perror("TFileTransport: error while padding zeros");
+ GlobalOutput("TFileTransport: error while padding zeros");
throw TTransportException("TFileTransport: error while padding zeros");
}
unflushed += padding;
// write the dequeued event to the file
if (outEvent->eventSize_ > 0) {
if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
- perror("TFileTransport: error while writing event");
+ GlobalOutput("TFileTransport: error while writing event");
throw TTransportException("TFileTransport: error while writing event");
}
// read error
if (readState_.bufferLen_ == -1) {
readState_.resetAllValues();
- perror("TFileTransport: error while reading from file");
+ GlobalOutput("TFileTransport: error while reading from file");
throw TTransportException("TFileTransport: error while reading from file");
} else if (readState_.bufferLen_ == 0) { // EOF
// wait indefinitely if there is no timeout
char errorMsg[1024];
sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
offset_ + readState_.lastDispatchPtr_);
- perror(errorMsg);
+ GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
}
offset_ = lseek(fd_, newOffset, SEEK_SET);
readState_.resetAllValues();
if (offset_ == -1) {
- perror("TFileTransport: lseek error in seekToChunk");
+ GlobalOutput("TFileTransport: lseek error in seekToChunk");
throw TTransportException("TFileTransport: lseek error in seekToChunk");
}
if(fd_ == -1) {
char errorMsg[1024];
sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str());
- perror(errorMsg);
+ GlobalOutput(errorMsg);
throw TTransportException(errorMsg);
}
bool TFileTransportBuffer::addEvent(eventInfo *event) {
if (bufferMode_ == READ) {
- perror("Trying to write to a buffer in read mode");
+ GlobalOutput("Trying to write to a buffer in read mode");
}
if (writePoint_ < size_) {
buffer_[writePoint_++] = event;
void setEventBufferSize(uint32_t bufferSize) {
if (bufferAndThreadInitialized_) {
- perror("Cannot change the buffer size after writer thread started");
+ GlobalOutput("Cannot change the buffer size after writer thread started");
return;
}
eventBufferSize_ = bufferSize;
void TServerSocket::listen() {
int sv[2];
if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
- perror("TServerSocket::init()");
+ GlobalOutput("TServerSocket::init()");
intSock1_ = -1;
intSock2_ = -1;
} else {
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket_ == -1) {
- perror("TServerSocket::listen() socket");
+ GlobalOutput("TServerSocket::listen() socket");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.");
}
int one = 1;
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
&one, sizeof(one))) {
- perror("TServerSocket::listen() SO_REUSEADDR");
+ GlobalOutput("TServerSocket::listen() SO_REUSEADDR");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR");
}
#ifdef TCP_DEFER_ACCEPT
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
&one, sizeof(one))) {
- perror("TServerSocket::listen() TCP_DEFER_ACCEPT");
+ GlobalOutput("TServerSocket::listen() TCP_DEFER_ACCEPT");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT");
}
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
&ling, sizeof(ling))) {
close();
- perror("TServerSocket::listen() SO_LINGER");
+ GlobalOutput("TServerSocket::listen() SO_LINGER");
throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER");
}
if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
&one, sizeof(one))) {
close();
- perror("setsockopt TCP_NODELAY");
+ GlobalOutput("setsockopt TCP_NODELAY");
throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY");
}
if (retries > retryLimit_) {
char errbuf[1024];
sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
- perror(errbuf);
+ GlobalOutput(errbuf);
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
}
// Call listen
if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
- perror("TServerSocket::listen() LISTEN");
+ GlobalOutput("TServerSocket::listen() LISTEN");
close();
throw TTransportException(TTransportException::NOT_OPEN, "Could not listen");
}
// a certain number
continue;
}
- perror("TServerSocket::acceptImpl() select -1");
+ GlobalOutput("TServerSocket::acceptImpl() select -1");
throw TTransportException(TTransportException::UNKNOWN);
} else if (ret > 0) {
// Check for an interrupt signal
if (intSock2_ >= 0 && FD_ISSET(intSock2_, &fds)) {
int8_t buf;
if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
- perror("TServerSocket::acceptImpl() interrupt receive");
+ GlobalOutput("TServerSocket::acceptImpl() interrupt receive");
}
throw TTransportException(TTransportException::INTERRUPTED);
}
break;
}
} else {
- perror("TServerSocket::acceptImpl() select 0");
+ GlobalOutput("TServerSocket::acceptImpl() select 0");
throw TTransportException(TTransportException::UNKNOWN);
}
}
(socklen_t *) &size);
if (clientSocket < 0) {
- perror("TServerSocket::accept()");
+ GlobalOutput("TServerSocket::accept()");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
// Make sure client socket is blocking
int flags = fcntl(clientSocket, F_GETFL, 0);
if (flags == -1) {
- perror("TServerSocket::select() fcntl GETFL");
+ GlobalOutput("TServerSocket::select() fcntl GETFL");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
- perror("TServerSocket::select() fcntl SETFL");
+ GlobalOutput("TServerSocket::select() fcntl SETFL");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
if (intSock1_ >= 0) {
int8_t byte = 0;
if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
- perror("TServerSocket::interrupt()");
+ GlobalOutput("TServerSocket::interrupt()");
}
}
}
uint8_t buf;
int r = recv(socket_, &buf, 1, MSG_PEEK);
if (r == -1) {
- perror("TSocket::peek()");
+ GlobalOutput("TSocket::peek()");
close();
throw TTransportException(TTransportException::UNKNOWN, "recv() ERROR:" + errno);
}
// Create socket
socket_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == -1) {
- perror("TSocket::open() socket");
+ GlobalOutput("TSocket::open() socket");
close();
throw TTransportException(TTransportException::NOT_OPEN, "socket() ERROR:" + errno);
}
struct hostent *host_entry = gethostbyname(host_.c_str());
if (host_entry == NULL) {
- perror("TSocket: dns error: failed call to gethostbyname.");
+ GlobalOutput("TSocket: dns error: failed call to gethostbyname.");
close();
throw TTransportException(TTransportException::NOT_OPEN, "gethostbyname() failed");
}
close();
char buff[1024];
sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
- perror(buff);
+ GlobalOutput(buff);
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
}
int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
if (ret2 == -1) {
close();
- perror("TSocket::open() getsockopt SO_ERROR");
+ GlobalOutput("TSocket::open() getsockopt SO_ERROR");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
}
if (val == 0) {
goto done;
}
close();
- perror("TSocket::open() SO_ERROR was set");
+ GlobalOutput("TSocket::open() SO_ERROR was set");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
} else if (ret == 0) {
close();
- perror("TSocket::open() timeed out");
+ GlobalOutput("TSocket::open() timeed out");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
} else {
close();
- perror("TSocket::open() select error");
+ GlobalOutput("TSocket::open() select error");
throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno);
}
}
// Now it's not a try again case, but a real probblez
- perror("TSocket::read()");
+ GlobalOutput("TSocket::read()");
// If we disconnect with no linger time
if (errno == ECONNRESET) {
throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN");
}
- perror("TSocket::write() send < 0");
+ GlobalOutput("TSocket::write() send < 0");
throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno);
}
struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
if (ret == -1) {
- perror("TSocket::setLinger()");
+ GlobalOutput("TSocket::setLinger()");
}
}
int v = noDelay_ ? 1 : 0;
int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
if (ret == -1) {
- perror("TSocket::setNoDelay()");
+ GlobalOutput("TSocket::setNoDelay()");
}
}
struct timeval r = recvTimeval_;
int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
if (ret == -1) {
- perror("TSocket::setRecvTimeout()");
+ GlobalOutput("TSocket::setRecvTimeout()");
}
}
(int)((sendTimeout_%1000)*1000)};
int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
if (ret == -1) {
- perror("TSocket::setSendTimeout()");
+ GlobalOutput("TSocket::setSendTimeout()");
}
}
alwaysTryLast_(true)
{
if (hosts.size() != ports.size()) {
- perror("TSocketPool::TSocketPool: hosts.size != ports.size");
+ GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
throw TTransportException(TTransportException::BAD_ARGS);
}
}
}
- perror("TSocketPool::open: all connections failed");
+ GlobalOutput("TSocketPool::open: all connections failed");
throw TTransportException(TTransportException::NOT_OPEN);
}