From 105961de04cf064981b70e8ec8458c196e0ee17e Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 6 Oct 2010 17:10:17 +0000 Subject: [PATCH] THRIFT-928. cpp: TNonblockingServer: use TSocket and support TClientInfo Modify TNonblockingServer to use TSocket for I/O and support server event handlers; this enables TClientInfo to function with a minor change to the processing loop. git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005145 13f79535-47bb-0310-9956-ffa450edef68 --- lib/cpp/src/server/TNonblockingServer.cpp | 111 ++++++++++++---------- lib/cpp/src/server/TNonblockingServer.h | 46 +++++++-- lib/cpp/src/transport/TSocket.cpp | 94 ++++++++++++------ lib/cpp/src/transport/TSocket.h | 24 ++++- 4 files changed, 184 insertions(+), 91 deletions(-) diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index 4245d5e9..62970308 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -52,12 +52,18 @@ class TConnection::Task: public Runnable { processor_(processor), input_(input), output_(output), - connection_(connection) {} + connection_(connection), + serverEventHandler_(connection_->getServerEventHandler()), + connectionContext_(connection_->getConnectionContext()) {} void run() { try { - while (processor_->process(input_, output_, NULL)) { - if (!input_->getTransport()->peek()) { + for (;;) { + if (serverEventHandler_ != NULL) { + serverEventHandler_->processContext(connectionContext_, connection_->getTSocket()); + } + if (!processor_->process(input_, output_, connectionContext_) || + !input_->getTransport()->peek()) { break; } } @@ -87,10 +93,15 @@ class TConnection::Task: public Runnable { boost::shared_ptr input_; boost::shared_ptr output_; TConnection* connection_; + boost::shared_ptr serverEventHandler_; + void* connectionContext_; }; -void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { - socket_ = socket; +void TConnection::init(int socket, short eventFlags, TNonblockingServer* s, + const sockaddr* addr, socklen_t addrLen) { + tSocket_->setSocketFD(socket); + tSocket_->setCachedAddress(addr, addrLen); + server_ = s; appState_ = APP_INIT; eventFlags_ = 0; @@ -115,10 +126,18 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) { // Create protocol inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_); outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); + + // Set up for any server event handler + serverEventHandler_ = server_->getEventHandler(); + if (serverEventHandler_ != NULL) { + connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_); + } else { + connectionContext_ = NULL; + } } void TConnection::workSocket() { - int flags=0, got=0, left=0, sent=0; + int got=0, left=0, sent=0; uint32_t fetch = 0; switch (socketState_) { @@ -142,10 +161,18 @@ void TConnection::workSocket() { readBufferSize_ = newSize; } - // Read from the socket - fetch = readWant_ - readBufferPos_; - got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0); + try { + // Read from the socket + fetch = readWant_ - readBufferPos_; + got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); + } + catch (TTransportException& te) { + GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); + close(); + return; + } + if (got > 0) { // Move along in the buffer readBufferPos_ += got; @@ -158,15 +185,6 @@ void TConnection::workSocket() { transition(); } return; - } else if (got == -1) { - // Blocking errors are okay, just move on - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return; - } - - if (errno != ECONNRESET) { - GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno); - } } // Whenever we get down here it means a remote disconnect @@ -185,24 +203,12 @@ void TConnection::workSocket() { return; } - flags = 0; - #ifdef MSG_NOSIGNAL - // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we - // check for the EPIPE return condition and close the socket in that case - flags |= MSG_NOSIGNAL; - #endif // ifdef MSG_NOSIGNAL - - left = writeBufferSize_ - writeBufferPos_; - sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags); - - if (sent <= 0) { - // Blocking errors are okay, just move on - if (errno == EAGAIN || errno == EWOULDBLOCK) { - return; - } - if (errno != EPIPE) { - GlobalOutput.perror("TConnection::workSocket() send -1 ", errno); - } + try { + left = writeBufferSize_ - writeBufferPos_; + sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left); + } + catch (TTransportException& te) { + GlobalOutput.printf("TConnection::workSocket(): %s ", te.what()); close(); return; } @@ -478,7 +484,8 @@ void TConnection::setFlags(short eventFlags) { * ev structure for multiple monitored descriptors; each descriptor needs * its own ev. */ - event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this); + event_set(&event_, tSocket_->getSocketFD(), eventFlags_, + TConnection::eventHandler, this); event_base_set(server_->getEventBase(), &event_); // Add the event @@ -493,14 +500,15 @@ void TConnection::setFlags(short eventFlags) { void TConnection::close() { // Delete the registered libevent if (event_del(&event_) == -1) { - GlobalOutput("TConnection::close() event_del"); + GlobalOutput.perror("TConnection::close() event_del", errno); } - // Close the socket - if (socket_ >= 0) { - ::close(socket_); + if (serverEventHandler_ != NULL) { + serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_); } - socket_ = -1; + + // Close the socket + tSocket_->close(); // close any factory produced transports factoryInputTransport_->close(); @@ -548,14 +556,16 @@ TNonblockingServer::~TNonblockingServer() { * Creates a new connection either by reusing an object off the stack or * by allocating a new one entirely */ -TConnection* TNonblockingServer::createConnection(int socket, short flags) { +TConnection* TNonblockingServer::createConnection(int socket, short flags, + const sockaddr* addr, + socklen_t addrLen) { // Check the stack if (connectionStack_.empty()) { - return new TConnection(socket, flags, this); + return new TConnection(socket, flags, this, addr, addrLen); } else { TConnection* result = connectionStack_.top(); connectionStack_.pop(); - result->init(socket, flags, this); + result->init(socket, flags, this, addr, addrLen); return result; } } @@ -583,8 +593,9 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Server socket accepted a new connection socklen_t addrLen; - struct sockaddr addr; - addrLen = sizeof(addr); + sockaddr_storage addrStorage; + sockaddr* addrp = (sockaddr*)&addrStorage; + addrLen = sizeof(addrStorage); // Going to accept a new client socket int clientSocket; @@ -592,7 +603,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Accept as many new clients as possible, even though libevent signaled only // one, this helps us to avoid having to go back into the libevent engine so // many times - while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) { + while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) { // If we're overloaded, take action here if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) { nConnectionsDropped_++; @@ -619,7 +630,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { // Create a new TConnection for this client socket. TConnection* clientConnection = - createConnection(clientSocket, EV_READ | EV_PERSIST); + createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen); // Fail fast if we could not create a TConnection object if (clientConnection == NULL) { @@ -632,7 +643,7 @@ void TNonblockingServer::handleEvent(int fd, short which) { clientConnection->transition(); // addrLen is written by the accept() call, so needs to be set before the next call. - addrLen = sizeof(addr); + addrLen = sizeof(addrStorage); } // Done looping accept, now we have to make sure the error is due to diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 2dd53629..ac0e3450 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,7 @@ namespace apache { namespace thrift { namespace server { using apache::thrift::transport::TMemoryBuffer; +using apache::thrift::transport::TSocket; using apache::thrift::protocol::TProtocol; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::ThreadManager; @@ -470,9 +472,12 @@ class TNonblockingServer : public TServer { * * @param socket FD of socket associated with this connection. * @param flags initial lib_event flags for this connection. + * @param addr the sockaddr of the client + * @param addrLen the length of addr * @return pointer to initialized TConnection object. */ - TConnection* createConnection(int socket, short flags); + TConnection* createConnection(int socket, short flags, + const sockaddr* addr, socklen_t addrLen); /** * Returns a connection to pool or deletion. If the connection pool @@ -576,7 +581,7 @@ enum TAppState { * Represents a connection that is handled via libevent. This connection * essentially encapsulates a socket that has some associated libevent state. */ -class TConnection { + class TConnection { private: /// Starting size for new connection buffer @@ -585,8 +590,8 @@ class TConnection { /// Server handle TNonblockingServer* server_; - /// Socket handle - int socket_; + /// Object wrapping network socket + boost::shared_ptr tSocket_; /// Libevent object struct event event_; @@ -649,6 +654,12 @@ class TConnection { /// Protocol encoder boost::shared_ptr outputProtocol_; + /// Server event handler, if any + boost::shared_ptr serverEventHandler_; + + /// Thrift call context, if any + void *connectionContext_; + /// Go into read mode void setRead() { setFlags(EV_READ | EV_PERSIST); @@ -687,7 +698,8 @@ class TConnection { class Task; /// Constructor - TConnection(int socket, short eventFlags, TNonblockingServer *s) { + TConnection(int socket, short eventFlags, TNonblockingServer *s, + const sockaddr* addr, socklen_t addrLen) { readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE); if (readBuffer_ == NULL) { throw new apache::thrift::TException("Out of memory."); @@ -702,8 +714,9 @@ class TConnection { // reallocated on init() call) inputTransport_ = boost::shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); outputTransport_ = boost::shared_ptr(new TMemoryBuffer()); + tSocket_.reset(new TSocket()); - init(socket, eventFlags, s); + init(socket, eventFlags, s, addr, addrLen); server_->incrementNumConnections(); } @@ -720,7 +733,8 @@ class TConnection { void checkIdleBufferMemLimit(size_t limit); /// Initialize - void init(int socket, short eventFlags, TNonblockingServer *s); + void init(int socket, short eventFlags, TNonblockingServer *s, + const sockaddr* addr, socklen_t addrLen); /** * This is called when the application transitions from one state into @@ -738,7 +752,7 @@ class TConnection { * @param v void* callback arg where we placed TConnection's "this". */ static void eventHandler(int fd, short /* which */, void* v) { - assert(fd == ((TConnection*)v)->socket_); + assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD()); ((TConnection*)v)->workSocket(); } @@ -799,6 +813,22 @@ class TConnection { TAppState getState() { return appState_; } + + /// return the TSocket transport wrapping this network connection + boost::shared_ptr getTSocket() const { + return tSocket_; + } + + /// return the server event handler if any + boost::shared_ptr getServerEventHandler() { + return serverEventHandler_; + } + + /// return the Thrift connection context if any + void* getConnectionContext() { + return connectionContext_; + } + }; }}} // apache::thrift::server diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp index ee76c3fe..7a48505a 100644 --- a/lib/cpp/src/transport/TSocket.cpp +++ b/lib/cpp/src/transport/TSocket.cpp @@ -357,6 +357,13 @@ void TSocket::close() { socket_ = -1; } +void TSocket::setSocketFD(int socket) { + if (socket_ >= 0) { + close(); + } + socket_ = socket; +} + uint32_t TSocket::read(uint8_t* buf, uint32_t len) { if (socket_ < 0) { throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket"); @@ -379,7 +386,13 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { try_again: // Read from the socket struct timeval begin; - gettimeofday(&begin, NULL); + if (recvTimeout_ > 0) { + gettimeofday(&begin, NULL); + } else { + // if there is no read timeout we don't need the TOD to determine whether + // an EAGAIN is due to a timeout or an out-of-resource condition. + begin.tv_sec = begin.tv_usec = 0; + } int got = recv(socket_, buf, len, 0); int errno_copy = errno; //gettimeofday can change errno ++g_socket_syscalls; @@ -387,6 +400,11 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { // Check for error on read if (got < 0) { if (errno_copy == EAGAIN) { + // if no timeout we can assume that resource exhaustion has occurred. + if (recvTimeout_ == 0) { + throw TTransportException(TTransportException::TIMED_OUT, + "EAGAIN (unavailable resources)"); + } // check if this is the lack of resources or timeout case struct timeval end; gettimeofday(&end, NULL); @@ -417,8 +435,8 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { if (errno_copy == ECONNRESET) { /* shigin: freebsd doesn't follow POSIX semantic of recv and fails with * ECONNRESET if peer performed shutdown + * edhall: eliminated close() since we do that in the destructor. */ - close(); return 0; } #endif @@ -447,7 +465,8 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { // The remote host has closed the socket if (got == 0) { - close(); + // edhall: we used to call close() here, but our caller may want to deal + // with the socket fd and we'll close() in our destructor in any case. return 0; } @@ -456,43 +475,57 @@ uint32_t TSocket::read(uint8_t* buf, uint32_t len) { } void TSocket::write(const uint8_t* buf, uint32_t len) { + uint32_t sent = 0; + + while (sent < len) { + uint32_t b = write_partial(buf + sent, len - sent); + if (b == 0) { + // We assume that we got 0 because send() errored with EAGAIN due to + // lack of system resources; release the CPU for a bit. + usleep(50); + } + sent += b; + } +} + +uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) { if (socket_ < 0) { throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket"); } uint32_t sent = 0; - while (sent < len) { + int flags = 0; +#ifdef MSG_NOSIGNAL + // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we + // check for the EPIPE return condition and close the socket in that case + flags |= MSG_NOSIGNAL; +#endif // ifdef MSG_NOSIGNAL - int flags = 0; - #ifdef MSG_NOSIGNAL - // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we - // check for the EPIPE return condition and close the socket in that case - flags |= MSG_NOSIGNAL; - #endif // ifdef MSG_NOSIGNAL - - int b = send(socket_, buf + sent, len - sent, flags); - ++g_socket_syscalls; + int b = send(socket_, buf + sent, len - sent, flags); + ++g_socket_syscalls; + if (b < 0) { + if (errno == EWOULDBLOCK || errno == EAGAIN) { + return 0; + } // Fail on a send error - if (b < 0) { - int errno_copy = errno; - GlobalOutput.perror("TSocket::write() send() " + getSocketInfo(), errno_copy); - - if (errno == EPIPE || errno == ECONNRESET || errno == ENOTCONN) { - close(); - throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy); - } + int errno_copy = errno; + GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy); + if (errno_copy == EPIPE || errno_copy == ECONNRESET || errno_copy == ENOTCONN) { + close(); + throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy); } - // Fail on blocked send - if (b == 0) { - throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0."); - } - sent += b; + throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy); + } + + // Fail on blocked send + if (b == 0) { + throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0."); } + return b; } std::string TSocket::getHost() { @@ -598,7 +631,12 @@ void TSocket::setMaxRecvRetries(int maxRecvRetries) { string TSocket::getSocketInfo() { std::ostringstream oss; - oss << ""; + if (host_.empty() || port_ == 0) { + oss << ""; + } else { + oss << ""; + } return oss.str(); } diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h index 97562c24..e89059f3 100644 --- a/lib/cpp/src/transport/TSocket.h +++ b/lib/cpp/src/transport/TSocket.h @@ -95,10 +95,15 @@ class TSocket : public TVirtualTransport { uint32_t read(uint8_t* buf, uint32_t len); /** - * Writes to the underlying socket. + * Writes to the underlying socket. Loops until done or fail. */ void write(const uint8_t* buf, uint32_t len); + /** + * Writes to the underlying socket. Does single send() and returns result. + */ + uint32_t write_partial(const uint8_t* buf, uint32_t len); + /** * Get the host that the socket is connected to * @@ -191,6 +196,15 @@ class TSocket : public TVirtualTransport { return socket_; } + /** + * (Re-)initialize a TSocket for the supplied descriptor. This is only + * intended for use by TNonblockingServer -- other use may result in + * unfortunate surprises. + * + * @param fd the descriptor for an already-connected socket + */ + void setSocketFD(int fd); + /* * Returns a cached copy of the peer address. */ @@ -211,16 +225,16 @@ class TSocket : public TVirtualTransport { */ TSocket(int socket); - protected: - /** connect, called by open */ - void openConnection(struct addrinfo *res); - /** * Set a cache of the peer address (used when trivially available: e.g. * accept() or connect()). Only caches IPV4 and IPV6; unset for others. */ void setCachedAddress(const sockaddr* addr, socklen_t len); + protected: + /** connect, called by open */ + void openConnection(struct addrinfo *res); + /** Host to connect to */ std::string host_; -- 2.17.1