From: Bryan Duxbury Date: Mon, 29 Aug 2011 20:28:23 +0000 (+0000) Subject: THRIFT-1305. cpp: make TConnection a private inner class of X-Git-Tag: 0.8.0~110 X-Git-Url: https://source.supwisdom.com/gerrit/gitweb?a=commitdiff_plain;h=526fa8e47f55bb74c87ff8a801204bbb91d50ba1;p=common%2Fthrift.git THRIFT-1305. cpp: make TConnection a private inner class of TNonblockingServer Patch: Adam Simpkins git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1162987 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp index f43a1c9c..ed1001ff 100644 --- a/lib/cpp/src/server/TNonblockingServer.cpp +++ b/lib/cpp/src/server/TNonblockingServer.cpp @@ -54,7 +54,277 @@ using namespace std; using apache::thrift::transport::TSocket; using apache::thrift::transport::TTransportException; -class TConnection::Task: public Runnable { +/// Three states for sockets: recv frame size, recv data, and send mode +enum TSocketState { + SOCKET_RECV_FRAMING, + SOCKET_RECV, + SOCKET_SEND +}; + +/** + * Five states for the nonblocking server: + * 1) initialize + * 2) read 4 byte frame size + * 3) read frame of data + * 4) send back data (if any) + * 5) force immediate connection close + */ +enum TAppState { + APP_INIT, + APP_READ_FRAME_SIZE, + APP_READ_REQUEST, + APP_WAIT_TASK, + APP_SEND_RESULT, + APP_CLOSE_CONNECTION +}; + +/** + * Represents a connection that is handled via libevent. This connection + * essentially encapsulates a socket that has some associated libevent state. + */ +class TNonblockingServer::TConnection { + private: + + /// Server handle + TNonblockingServer* server_; + + /// Object wrapping network socket + boost::shared_ptr tSocket_; + + /// Libevent object + struct event event_; + + /// Libevent flags + short eventFlags_; + + /// Socket mode + TSocketState socketState_; + + /// Application state + TAppState appState_; + + /// How much data needed to read + uint32_t readWant_; + + /// Where in the read buffer are we + uint32_t readBufferPos_; + + /// Read buffer + uint8_t* readBuffer_; + + /// Read buffer size + uint32_t readBufferSize_; + + /// Write buffer + uint8_t* writeBuffer_; + + /// Write buffer size + uint32_t writeBufferSize_; + + /// How far through writing are we? + uint32_t writeBufferPos_; + + /// Largest size of write buffer seen since buffer was constructed + size_t largestWriteBufferSize_; + + /// Count of the number of calls for use with getResizeBufferEveryN(). + int32_t callsForResize_; + + /// Task handle + int taskHandle_; + + /// Task event + struct event taskEvent_; + + /// Transport to read from + boost::shared_ptr inputTransport_; + + /// Transport that processor writes to + boost::shared_ptr outputTransport_; + + /// extra transport generated by transport factory (e.g. BufferedRouterTransport) + boost::shared_ptr factoryInputTransport_; + boost::shared_ptr factoryOutputTransport_; + + /// Protocol decoder + boost::shared_ptr inputProtocol_; + + /// Protocol encoder + boost::shared_ptr outputProtocol_; + + /// Server event handler, if any + boost::shared_ptr serverEventHandler_; + + /// Thrift call context, if any + void *connectionContext_; + + /// Go into read mode + void setRead() { + setFlags(EV_READ | EV_PERSIST); + } + + /// Go into write mode + void setWrite() { + setFlags(EV_WRITE | EV_PERSIST); + } + + /// Set socket idle + void setIdle() { + setFlags(0); + } + + /** + * Set event flags for this connection. + * + * @param eventFlags flags we pass to libevent for the connection. + */ + void setFlags(short eventFlags); + + /** + * Libevent handler called (via our static wrapper) when the connection + * socket had something happen. Rather than use the flags libevent passed, + * we use the connection state to determine whether we need to read or + * write the socket. + */ + void workSocket(); + + /// Close this connection and free or reset its resources. + void close(); + + public: + + class Task; + + /// Constructor + TConnection(int socket, short eventFlags, TNonblockingServer *s, + const sockaddr* addr, socklen_t addrLen) { + readBuffer_ = NULL; + readBufferSize_ = 0; + + // Allocate input and output transports + // these only need to be allocated once per TConnection (they don't need to be + // reallocated on init() call) + inputTransport_ = boost::shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); + outputTransport_ = boost::shared_ptr(new TMemoryBuffer(s->getWriteBufferDefaultSize())); + tSocket_.reset(new TSocket()); + + init(socket, eventFlags, s, addr, addrLen); + server_->incrementNumConnections(); + } + + ~TConnection() { + std::free(readBuffer_); + server_->decrementNumConnections(); + } + + /** + * Check buffers against any size limits and shrink it if exceeded. + * + * @param readLimit we reduce read buffer size to this (if nonzero). + * @param writeLimit if nonzero and write buffer is larger, replace it. + */ + void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); + + /// Initialize + void init(int socket, short eventFlags, TNonblockingServer *s, + const sockaddr* addr, socklen_t addrLen); + + /** + * This is called when the application transitions from one state into + * another. This means that it has finished writing the data that it needed + * to, or finished receiving the data that it needed to. + */ + void transition(); + + /** + * C-callable event handler for connection events. Provides a callback + * that libevent can understand which invokes connection_->workSocket(). + * + * @param fd the descriptor the event occurred on. + * @param which the flags associated with the event. + * @param v void* callback arg where we placed TConnection's "this". + */ + static void eventHandler(int fd, short /* which */, void* v) { + assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD()); + ((TConnection*)v)->workSocket(); + } + + /** + * C-callable event handler for signaling task completion. Provides a + * callback that libevent can understand that will read a connection + * object's address from a pipe and call connection->transition() for + * that object. + * + * @param fd the descriptor the event occurred on. + */ + static void taskHandler(int fd, short /* which */, void* /* v */) { + TConnection* connection; + ssize_t nBytes; + while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*))) + == sizeof(TConnection*)) { + connection->transition(); + } + if (nBytes > 0) { + throw TException("TConnection::taskHandler unexpected partial read"); + } + if (errno != EWOULDBLOCK && errno != EAGAIN) { + GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno); + } + } + + /** + * Notification to server that processing has ended on this request. + * Can be called either when processing is completed or when a waiting + * task has been preemptively terminated (on overload). + * + * @return true if successful, false if unable to notify (check errno). + */ + bool notifyServer() { + TConnection* connection = this; + if (write(server_->getNotificationSendFD(), (const void*)&connection, + sizeof(TConnection*)) != sizeof(TConnection*)) { + return false; + } + + return true; + } + + /// Force connection shutdown for this connection. + void forceClose() { + appState_ = APP_CLOSE_CONNECTION; + if (!notifyServer()) { + throw TException("TConnection::forceClose: failed write on notify pipe"); + } + } + + /// return the server this connection was initialized for. + TNonblockingServer* getServer() { + return server_; + } + + /// get state of connection. + TAppState getState() { + return appState_; + } + + /// return the TSocket transport wrapping this network connection + boost::shared_ptr getTSocket() const { + return tSocket_; + } + + /// return the server event handler if any + boost::shared_ptr getServerEventHandler() { + return serverEventHandler_; + } + + /// return the Thrift connection context if any + void* getConnectionContext() { + return connectionContext_; + } + +}; + +class TNonblockingServer::TConnection::Task: public Runnable { public: Task(boost::shared_ptr processor, boost::shared_ptr input, @@ -109,8 +379,10 @@ class TConnection::Task: public Runnable { void* connectionContext_; }; -void TConnection::init(int socket, short eventFlags, TNonblockingServer* s, - const sockaddr* addr, socklen_t addrLen) { +void TNonblockingServer::TConnection::init(int socket, short eventFlags, + TNonblockingServer* s, + const sockaddr* addr, + socklen_t addrLen) { tSocket_->setSocketFD(socket); tSocket_->setCachedAddress(addr, addrLen); @@ -150,7 +422,7 @@ void TConnection::init(int socket, short eventFlags, TNonblockingServer* s, } } -void TConnection::workSocket() { +void TNonblockingServer::TConnection::workSocket() { int got=0, left=0, sent=0; uint32_t fetch = 0; @@ -276,7 +548,10 @@ void TConnection::workSocket() { * another. This means that it has finished writing the data that it needed * to, or finished receiving the data that it needed to. */ -void TConnection::transition() { +void TNonblockingServer::TConnection::transition() { + + int sz = 0; + // Switch upon the state that we are currently in and move to a new state switch (appState_) { @@ -460,7 +735,7 @@ void TConnection::transition() { } } -void TConnection::setFlags(short eventFlags) { +void TNonblockingServer::TConnection::setFlags(short eventFlags) { // Catch the do nothing case if (eventFlags_ == eventFlags) { return; @@ -522,7 +797,7 @@ void TConnection::setFlags(short eventFlags) { /** * Closes a connection */ -void TConnection::close() { +void TNonblockingServer::TConnection::close() { // Delete the registered libevent if (event_del(&event_) == -1) { GlobalOutput.perror("TConnection::close() event_del", errno); @@ -543,8 +818,9 @@ void TConnection::close() { server_->returnConnection(this); } -void TConnection::checkIdleBufferMemLimit(size_t readLimit, - size_t writeLimit) { +void TNonblockingServer::TConnection::checkIdleBufferMemLimit( + size_t readLimit, + size_t writeLimit) { if (readLimit > 0 && readBufferSize_ > readLimit) { free(readBuffer_); readBuffer_ = NULL; @@ -585,9 +861,10 @@ TNonblockingServer::~TNonblockingServer() { * Creates a new connection either by reusing an object off the stack or * by allocating a new one entirely */ -TConnection* TNonblockingServer::createConnection(int socket, short flags, - const sockaddr* addr, - socklen_t addrLen) { +TNonblockingServer::TConnection* TNonblockingServer::createConnection( + int socket, short flags, + const sockaddr* addr, + socklen_t addrLen) { // Check the stack if (connectionStack_.empty()) { return new TConnection(socket, flags, this, addr, addrLen); diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h index 21b8d953..e4e0e648 100644 --- a/lib/cpp/src/server/TNonblockingServer.h +++ b/lib/cpp/src/server/TNonblockingServer.h @@ -41,9 +41,6 @@ using apache::thrift::protocol::TProtocol; using apache::thrift::concurrency::Runnable; using apache::thrift::concurrency::ThreadManager; -// Forward declaration of class -class TConnection; - #ifdef LIBEVENT_VERSION_NUMBER #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24) #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF) @@ -94,6 +91,8 @@ enum TOverloadAction { class TNonblockingServer : public TServer { private: + class TConnection; + /// Listen backlog static const int LISTEN_BACKLOG = 1024; @@ -698,276 +697,6 @@ class TNonblockingServer : public TServer { void stop(); }; -/// Three states for sockets: recv frame size, recv data, and send mode -enum TSocketState { - SOCKET_RECV_FRAMING, - SOCKET_RECV, - SOCKET_SEND -}; - -/** - * Five states for the nonblocking servr: - * 1) initialize - * 2) read 4 byte frame size - * 3) read frame of data - * 4) send back data (if any) - * 5) force immediate connection close - */ -enum TAppState { - APP_INIT, - APP_READ_FRAME_SIZE, - APP_READ_REQUEST, - APP_WAIT_TASK, - APP_SEND_RESULT, - APP_CLOSE_CONNECTION -}; - -/** - * Represents a connection that is handled via libevent. This connection - * essentially encapsulates a socket that has some associated libevent state. - */ -class TConnection { - private: - - /// Server handle - TNonblockingServer* server_; - - /// Object wrapping network socket - boost::shared_ptr tSocket_; - - /// Libevent object - struct event event_; - - /// Libevent flags - short eventFlags_; - - /// Socket mode - TSocketState socketState_; - - /// Application state - TAppState appState_; - - /// How much data needed to read - uint32_t readWant_; - - /// Where in the read buffer are we - uint32_t readBufferPos_; - - /// Read buffer - uint8_t* readBuffer_; - - /// Read buffer size - uint32_t readBufferSize_; - - /// Write buffer - uint8_t* writeBuffer_; - - /// Write buffer size - uint32_t writeBufferSize_; - - /// How far through writing are we? - uint32_t writeBufferPos_; - - /// Largest size of write buffer seen since buffer was constructed - size_t largestWriteBufferSize_; - - /// Count of the number of calls for use with getResizeBufferEveryN(). - int32_t callsForResize_; - - /// Task handle - int taskHandle_; - - /// Task event - struct event taskEvent_; - - /// Transport to read from - boost::shared_ptr inputTransport_; - - /// Transport that processor writes to - boost::shared_ptr outputTransport_; - - /// extra transport generated by transport factory (e.g. BufferedRouterTransport) - boost::shared_ptr factoryInputTransport_; - boost::shared_ptr factoryOutputTransport_; - - /// Protocol decoder - boost::shared_ptr inputProtocol_; - - /// Protocol encoder - boost::shared_ptr outputProtocol_; - - /// Server event handler, if any - boost::shared_ptr serverEventHandler_; - - /// Thrift call context, if any - void *connectionContext_; - - /// Go into read mode - void setRead() { - setFlags(EV_READ | EV_PERSIST); - } - - /// Go into write mode - void setWrite() { - setFlags(EV_WRITE | EV_PERSIST); - } - - /// Set socket idle - void setIdle() { - setFlags(0); - } - - /** - * Set event flags for this connection. - * - * @param eventFlags flags we pass to libevent for the connection. - */ - void setFlags(short eventFlags); - - /** - * Libevent handler called (via our static wrapper) when the connection - * socket had something happen. Rather than use the flags libevent passed, - * we use the connection state to determine whether we need to read or - * write the socket. - */ - void workSocket(); - - /// Close this connection and free or reset its resources. - void close(); - - public: - - class Task; - - /// Constructor - TConnection(int socket, short eventFlags, TNonblockingServer *s, - const sockaddr* addr, socklen_t addrLen) { - readBuffer_ = NULL; - readBufferSize_ = 0; - - // Allocate input and output tranpsorts - // these only need to be allocated once per TConnection (they don't need to be - // reallocated on init() call) - inputTransport_ = boost::shared_ptr(new TMemoryBuffer(readBuffer_, readBufferSize_)); - outputTransport_ = boost::shared_ptr(new TMemoryBuffer(s->getWriteBufferDefaultSize())); - tSocket_.reset(new TSocket()); - - init(socket, eventFlags, s, addr, addrLen); - server_->incrementNumConnections(); - } - - ~TConnection() { - std::free(readBuffer_); - server_->decrementNumConnections(); - } - - /** - * Check buffers against any size limits and shrink it if exceeded. - * - * @param readLimit we reduce read buffer size to this (if nonzero). - * @param writeLimit if nonzero and write buffer is larger, replace it. - */ - void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); - - /// Initialize - void init(int socket, short eventFlags, TNonblockingServer *s, - const sockaddr* addr, socklen_t addrLen); - - /** - * This is called when the application transitions from one state into - * another. This means that it has finished writing the data that it needed - * to, or finished receiving the data that it needed to. - */ - void transition(); - - /** - * C-callable event handler for connection events. Provides a callback - * that libevent can understand which invokes connection_->workSocket(). - * - * @param fd the descriptor the event occured on. - * @param which the flags associated with the event. - * @param v void* callback arg where we placed TConnection's "this". - */ - static void eventHandler(evutil_socket_t fd, short /* which */, void* v) { - assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD()); - ((TConnection*)v)->workSocket(); - } - - /** - * C-callable event handler for signaling task completion. Provides a - * callback that libevent can understand that will read a connection - * object's address from a pipe and call connection->transition() for - * that object. - * - * @param fd the descriptor the event occured on. - */ - static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) { - TConnection* connection; - ssize_t nBytes; - while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0)) - == sizeof(TConnection*)) { - connection->transition(); - } - if (nBytes > 0) { - throw TException("TConnection::taskHandler unexpected partial read"); - } - if (errno && errno != EWOULDBLOCK && errno != EAGAIN) { - GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno); - } - } - - /** - * Notification to server that processing has ended on this request. - * Can be called either when processing is completed or when a waiting - * task has been preemptively terminated (on overload). - * - * @return true if successful, false if unable to notify (check errno). - */ - bool notifyServer() { - TConnection* connection = this; - if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection), - sizeof(TConnection*), 0) != sizeof(TConnection*)) { - return false; - } - - return true; - } - - /// Force connection shutdown for this connection. - void forceClose() { - appState_ = APP_CLOSE_CONNECTION; - if (!notifyServer()) { - throw TException("TConnection::forceClose: failed write on notify pipe"); - } - } - - /// return the server this connection was initialized for. - TNonblockingServer* getServer() { - return server_; - } - - /// get state of connection. - TAppState getState() { - return appState_; - } - - /// return the TSocket transport wrapping this network connection - boost::shared_ptr getTSocket() const { - return tSocket_; - } - - /// return the server event handler if any - boost::shared_ptr getServerEventHandler() { - return serverEventHandler_; - } - - /// return the Thrift connection context if any - void* getConnectionContext() { - return connectionContext_; - } - -}; - }}} // apache::thrift::server #endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_