From: boz Date: Tue, 5 Jun 2007 22:41:18 +0000 (+0000) Subject: THRIFT: generic output handler X-Git-Tag: 0.2.0~1341 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=6ded775196d10f86ac5b99843f018aa2b29772b8;p=common%2Fthrift.git THRIFT: generic output handler Summary: I'm tired of getting output from thrift via perror AND exceptions, so this class allows the client to set an alternate (or empty) handler for error output Reviewed By: mcslee Test Plan: I ran on the worker with the default, got output via perror, then overloaded with my own function and got output via syslog and then NULL git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665131 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/Thrift.cpp b/lib/cpp/src/Thrift.cpp index 0f3ad881..186c5af4 100644 --- a/lib/cpp/src/Thrift.cpp +++ b/lib/cpp/src/Thrift.cpp @@ -9,6 +9,8 @@ namespace facebook { namespace thrift { +TOutput GlobalOutput; + uint32_t TApplicationException::read(facebook::thrift::protocol::TProtocol* iprot) { uint32_t xfer = 0; std::string fname; diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h index 2cb656b2..07ccb97e 100644 --- a/lib/cpp/src/Thrift.h +++ b/lib/cpp/src/Thrift.h @@ -26,6 +26,24 @@ namespace facebook { namespace thrift { +class TOutput{ +public: + TOutput() : f_(perror) {} + + inline void setOutputFunction(void (*function)(const char *)){ + f_ = function; + } + + inline void operator()(const char *message){ + f_(message); + } + +private: + void (*f_)(const char *); +}; + +extern TOutput GlobalOutput; + namespace protocol { class TProtocol; } diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 2b910a58..63378061 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -62,7 +62,7 @@ void TConnection::workSocket() { } readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_); if (readBuffer_ == NULL) { - perror("TConnection::workSocket() realloc"); + GlobalOutput("TConnection::workSocket() realloc"); close(); return; } @@ -91,7 +91,7 @@ void TConnection::workSocket() { } if (errno != ECONNRESET) { - perror("TConnection::workSocket() recv -1"); + GlobalOutput("TConnection::workSocket() recv -1"); } } @@ -127,7 +127,7 @@ void TConnection::workSocket() { return; } if (errno != EPIPE) { - perror("TConnection::workSocket() send -1"); + GlobalOutput("TConnection::workSocket() send -1"); } close(); return; @@ -302,7 +302,7 @@ void TConnection::setFlags(short eventFlags) { // Delete a previously existing event if (eventFlags_ != 0) { if (event_del(&event_) == -1) { - perror("TConnection::setFlags event_del"); + GlobalOutput("TConnection::setFlags event_del"); return; } } @@ -341,7 +341,7 @@ void TConnection::setFlags(short eventFlags) { // Add the event if (event_add(&event_, 0) == -1) { - perror("TConnection::setFlags(): coult not event_add"); + GlobalOutput("TConnection::setFlags(): coult not event_add"); } } @@ -351,7 +351,7 @@ void TConnection::setFlags(short eventFlags) { void TConnection::close() { // Delete the registered libevent if (event_del(&event_) == -1) { - perror("TConnection::close() event_del"); + GlobalOutput("TConnection::close() event_del"); } // Close the socket @@ -415,7 +415,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { int flags; if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 || fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) { - perror("thriftServerEventHandler: set O_NONBLOCK"); + GlobalOutput("thriftServerEventHandler: set O_NONBLOCK"); close(clientSocket); return; } @@ -438,7 +438,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Done looping accept, now we have to make sure the error is due to // blocking. Any other error is a problem if (errno != EAGAIN && errno != EWOULDBLOCK) { - perror("thriftServerEventHandler: accept()"); + GlobalOutput("thriftServerEventHandler: accept()"); } } @@ -459,7 +459,7 @@ void TNonblockingServer::serve() { // Create the server socket serverSocket_ = socket(AF_INET, SOCK_STREAM, 0); if (serverSocket_ == -1) { - perror("TNonblockingServer::serve() socket() -1"); + GlobalOutput("TNonblockingServer::serve() socket() -1"); return; } @@ -467,7 +467,7 @@ void TNonblockingServer::serve() { int flags; if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 || fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) { - perror("TNonblockingServer::serve() O_NONBLOCK"); + GlobalOutput("TNonblockingServer::serve() O_NONBLOCK"); ::close(serverSocket_); return; } @@ -496,13 +496,13 @@ void TNonblockingServer::serve() { addr.sin_addr.s_addr = INADDR_ANY; if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) { - perror("TNonblockingServer::serve() bind"); + GlobalOutput("TNonblockingServer::serve() bind"); close(serverSocket_); return; } if (listen(serverSocket_, LISTEN_BACKLOG) == -1) { - perror("TNonblockingServer::serve() listen"); + GlobalOutput("TNonblockingServer::serve() listen"); close(serverSocket_); return; } @@ -517,7 +517,7 @@ void TNonblockingServer::serve() { // Add the event and start up the server if (event_add(&serverEvent, 0) == -1) { - perror("TNonblockingServer::serve(): coult not event_add"); + GlobalOutput("TNonblockingServer::serve(): coult not event_add"); return; } diff --git a/lib/cpp/src/transport/TFileTransport.cpp b/lib/cpp/src/transport/TFileTransport.cpp index 5039564f..7a1ca4f7 100644 --- a/lib/cpp/src/transport/TFileTransport.cpp +++ b/lib/cpp/src/transport/TFileTransport.cpp @@ -100,7 +100,7 @@ void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) { flush(); fprintf(stderr, "error, current file (%s) not closed\n", filename_.c_str()); if (-1 == ::close(fd_)) { - perror("TFileTransport: error in file close"); + GlobalOutput("TFileTransport: error in file close"); throw TTransportException("TFileTransport: error in file close"); } } @@ -158,7 +158,7 @@ TFileTransport::~TFileTransport() { // close logfile if (fd_ > 0) { if(-1 == ::close(fd_)) { - perror("TFileTransport: error in file close"); + GlobalOutput("TFileTransport: error in file close"); } } } @@ -306,7 +306,7 @@ void TFileTransport::writerThread() { // empty out both the buffers if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { if (-1 == ::close(fd_)) { - perror("TFileTransport: error in close"); + GlobalOutput("TFileTransport: error in close"); throw TTransportException("TFileTransport: error in file close"); } // just be safe and sync to disk @@ -364,7 +364,7 @@ void TFileTransport::writerThread() { //T_DEBUG_L(1, "Adding padding of %u bytes at %lu (to reach chunk %lld)", //padding, offset_, chunk2); if (-1 == ::write(fd_, zeros, padding)) { - perror("TFileTransport: error while padding zeros"); + GlobalOutput("TFileTransport: error while padding zeros"); throw TTransportException("TFileTransport: error while padding zeros"); } unflushed += padding; @@ -375,7 +375,7 @@ void TFileTransport::writerThread() { // write the dequeued event to the file if (outEvent->eventSize_ > 0) { if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { - perror("TFileTransport: error while writing event"); + GlobalOutput("TFileTransport: error while writing event"); throw TTransportException("TFileTransport: error while writing event"); } @@ -499,7 +499,7 @@ bool TFileTransport::readEvent() { // read error if (readState_.bufferLen_ == -1) { readState_.resetAllValues(); - perror("TFileTransport: error while reading from file"); + GlobalOutput("TFileTransport: error while reading from file"); throw TTransportException("TFileTransport: error while reading from file"); } else if (readState_.bufferLen_ == 0) { // EOF // wait indefinitely if there is no timeout @@ -655,7 +655,7 @@ void TFileTransport::performRecovery() { char errorMsg[1024]; sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", offset_ + readState_.lastDispatchPtr_); - perror(errorMsg); + GlobalOutput(errorMsg); throw TTransportException(errorMsg); } } @@ -700,7 +700,7 @@ void TFileTransport::seekToChunk(int32_t chunk) { offset_ = lseek(fd_, newOffset, SEEK_SET); readState_.resetAllValues(); if (offset_ == -1) { - perror("TFileTransport: lseek error in seekToChunk"); + GlobalOutput("TFileTransport: lseek error in seekToChunk"); throw TTransportException("TFileTransport: lseek error in seekToChunk"); } @@ -747,7 +747,7 @@ void TFileTransport::openLogFile() { if(fd_ == -1) { char errorMsg[1024]; sprintf(errorMsg, "TFileTransport: Could not open file: %s", filename_.c_str()); - perror(errorMsg); + GlobalOutput(errorMsg); throw TTransportException(errorMsg); } @@ -786,7 +786,7 @@ TFileTransportBuffer::~TFileTransportBuffer() { bool TFileTransportBuffer::addEvent(eventInfo *event) { if (bufferMode_ == READ) { - perror("Trying to write to a buffer in read mode"); + GlobalOutput("Trying to write to a buffer in read mode"); } if (writePoint_ < size_) { buffer_[writePoint_++] = event; diff --git a/lib/cpp/src/transport/TFileTransport.h b/lib/cpp/src/transport/TFileTransport.h index 2cb50b49..abd8928a 100644 --- a/lib/cpp/src/transport/TFileTransport.h +++ b/lib/cpp/src/transport/TFileTransport.h @@ -207,7 +207,7 @@ class TFileTransport : public TFileReaderTransport, void setEventBufferSize(uint32_t bufferSize) { if (bufferAndThreadInitialized_) { - perror("Cannot change the buffer size after writer thread started"); + GlobalOutput("Cannot change the buffer size after writer thread started"); return; } eventBufferSize_ = bufferSize; diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp index f341a2bb..0d25bf02 100644 --- a/lib/cpp/src/transport/TServerSocket.cpp +++ b/lib/cpp/src/transport/TServerSocket.cpp @@ -64,7 +64,7 @@ void TServerSocket::setRetryDelay(int retryDelay) { void TServerSocket::listen() { int sv[2]; if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) { - perror("TServerSocket::init()"); + GlobalOutput("TServerSocket::init()"); intSock1_ = -1; intSock2_ = -1; } else { @@ -74,7 +74,7 @@ void TServerSocket::listen() { serverSocket_ = socket(AF_INET, SOCK_STREAM, 0); if (serverSocket_ == -1) { - perror("TServerSocket::listen() socket"); + GlobalOutput("TServerSocket::listen() socket"); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket."); } @@ -83,7 +83,7 @@ void TServerSocket::listen() { int one = 1; if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { - perror("TServerSocket::listen() SO_REUSEADDR"); + GlobalOutput("TServerSocket::listen() SO_REUSEADDR"); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR"); } @@ -92,7 +92,7 @@ void TServerSocket::listen() { #ifdef TCP_DEFER_ACCEPT if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT, &one, sizeof(one))) { - perror("TServerSocket::listen() TCP_DEFER_ACCEPT"); + GlobalOutput("TServerSocket::listen() TCP_DEFER_ACCEPT"); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT"); } @@ -103,7 +103,7 @@ void TServerSocket::listen() { if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) { close(); - perror("TServerSocket::listen() SO_LINGER"); + GlobalOutput("TServerSocket::listen() SO_LINGER"); throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER"); } @@ -111,7 +111,7 @@ void TServerSocket::listen() { if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { close(); - perror("setsockopt TCP_NODELAY"); + GlobalOutput("setsockopt TCP_NODELAY"); throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY"); } @@ -147,14 +147,14 @@ void TServerSocket::listen() { if (retries > retryLimit_) { char errbuf[1024]; sprintf(errbuf, "TServerSocket::listen() BIND %d", port_); - perror(errbuf); + GlobalOutput(errbuf); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not bind"); } // Call listen if (-1 == ::listen(serverSocket_, acceptBacklog_)) { - perror("TServerSocket::listen() LISTEN"); + GlobalOutput("TServerSocket::listen() LISTEN"); close(); throw TTransportException(TTransportException::NOT_OPEN, "Could not listen"); } @@ -187,14 +187,14 @@ shared_ptr TServerSocket::acceptImpl() { // a certain number continue; } - perror("TServerSocket::acceptImpl() select -1"); + GlobalOutput("TServerSocket::acceptImpl() select -1"); throw TTransportException(TTransportException::UNKNOWN); } else if (ret > 0) { // Check for an interrupt signal if (intSock2_ >= 0 && FD_ISSET(intSock2_, &fds)) { int8_t buf; if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) { - perror("TServerSocket::acceptImpl() interrupt receive"); + GlobalOutput("TServerSocket::acceptImpl() interrupt receive"); } throw TTransportException(TTransportException::INTERRUPTED); } @@ -203,7 +203,7 @@ shared_ptr TServerSocket::acceptImpl() { break; } } else { - perror("TServerSocket::acceptImpl() select 0"); + GlobalOutput("TServerSocket::acceptImpl() select 0"); throw TTransportException(TTransportException::UNKNOWN); } } @@ -215,18 +215,18 @@ shared_ptr TServerSocket::acceptImpl() { (socklen_t *) &size); if (clientSocket < 0) { - perror("TServerSocket::accept()"); + GlobalOutput("TServerSocket::accept()"); throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno); } // Make sure client socket is blocking int flags = fcntl(clientSocket, F_GETFL, 0); if (flags == -1) { - perror("TServerSocket::select() fcntl GETFL"); + GlobalOutput("TServerSocket::select() fcntl GETFL"); throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno); } if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) { - perror("TServerSocket::select() fcntl SETFL"); + GlobalOutput("TServerSocket::select() fcntl SETFL"); throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno); } @@ -245,7 +245,7 @@ void TServerSocket::interrupt() { if (intSock1_ >= 0) { int8_t byte = 0; if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) { - perror("TServerSocket::interrupt()"); + GlobalOutput("TServerSocket::interrupt()"); } } } diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index 0777807b..42364fa7 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -96,7 +96,7 @@ bool TSocket::peek() { uint8_t buf; int r = recv(socket_, &buf, 1, MSG_PEEK); if (r == -1) { - perror("TSocket::peek()"); + GlobalOutput("TSocket::peek()"); close(); throw TTransportException(TTransportException::UNKNOWN, "recv() ERROR:" + errno); } @@ -111,7 +111,7 @@ void TSocket::open() { // Create socket socket_ = socket(AF_INET, SOCK_STREAM, 0); if (socket_ == -1) { - perror("TSocket::open() socket"); + GlobalOutput("TSocket::open() socket"); close(); throw TTransportException(TTransportException::NOT_OPEN, "socket() ERROR:" + errno); } @@ -143,7 +143,7 @@ void TSocket::open() { struct hostent *host_entry = gethostbyname(host_.c_str()); if (host_entry == NULL) { - perror("TSocket: dns error: failed call to gethostbyname."); + GlobalOutput("TSocket: dns error: failed call to gethostbyname."); close(); throw TTransportException(TTransportException::NOT_OPEN, "gethostbyname() failed"); } @@ -181,7 +181,7 @@ void TSocket::open() { close(); char buff[1024]; sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_); - perror(buff); + GlobalOutput(buff); throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno); } @@ -198,22 +198,22 @@ void TSocket::open() { int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon); if (ret2 == -1) { close(); - perror("TSocket::open() getsockopt SO_ERROR"); + GlobalOutput("TSocket::open() getsockopt SO_ERROR"); throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno); } if (val == 0) { goto done; } close(); - perror("TSocket::open() SO_ERROR was set"); + GlobalOutput("TSocket::open() SO_ERROR was set"); throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno); } else if (ret == 0) { close(); - perror("TSocket::open() timeed out"); + GlobalOutput("TSocket::open() timeed out"); throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno); } else { close(); - perror("TSocket::open() select error"); + GlobalOutput("TSocket::open() select error"); throw TTransportException(TTransportException::NOT_OPEN, "open() ERROR: " + errno); } @@ -285,7 +285,7 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { } // Now it's not a try again case, but a real probblez - perror("TSocket::read()"); + GlobalOutput("TSocket::read()"); // If we disconnect with no linger time if (errno == ECONNRESET) { @@ -354,7 +354,7 @@ void TSocket::write(const uint8_t* buf, uint32_t len) { throw TTransportException(TTransportException::NOT_OPEN, "ENOTCONN"); } - perror("TSocket::write() send < 0"); + GlobalOutput("TSocket::write() send < 0"); throw TTransportException(TTransportException::UNKNOWN, "ERROR:" + errno); } @@ -384,7 +384,7 @@ void TSocket::setLinger(bool on, int linger) { struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_}; int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l)); if (ret == -1) { - perror("TSocket::setLinger()"); + GlobalOutput("TSocket::setLinger()"); } } @@ -398,7 +398,7 @@ void TSocket::setNoDelay(bool noDelay) { int v = noDelay_ ? 1 : 0; int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)); if (ret == -1) { - perror("TSocket::setNoDelay()"); + GlobalOutput("TSocket::setNoDelay()"); } } @@ -418,7 +418,7 @@ void TSocket::setRecvTimeout(int ms) { struct timeval r = recvTimeval_; int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r)); if (ret == -1) { - perror("TSocket::setRecvTimeout()"); + GlobalOutput("TSocket::setRecvTimeout()"); } } @@ -432,7 +432,7 @@ void TSocket::setSendTimeout(int ms) { (int)((sendTimeout_%1000)*1000)}; int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s)); if (ret == -1) { - perror("TSocket::setSendTimeout()"); + GlobalOutput("TSocket::setSendTimeout()"); } } diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp index 1450d1c9..1293e5b0 100644 --- a/lib/cpp/src/transport/TSocketPool.cpp +++ b/lib/cpp/src/transport/TSocketPool.cpp @@ -28,7 +28,7 @@ TSocketPool::TSocketPool(const vector &hosts, alwaysTryLast_(true) { if (hosts.size() != ports.size()) { - perror("TSocketPool::TSocketPool: hosts.size != ports.size"); + GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size"); throw TTransportException(TTransportException::BAD_ARGS); } @@ -94,7 +94,7 @@ void TSocketPool::open() { } } - perror("TSocketPool::open: all connections failed"); + GlobalOutput("TSocketPool::open: all connections failed"); throw TTransportException(TTransportException::NOT_OPEN); }