THRIFT-1305. cpp: make TConnection a private inner class of
TNonblockingServer
Patch: Adam Simpkins
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1162987 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 21b8d95..e4e0e64 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -41,9 +41,6 @@
using apache::thrift::concurrency::Runnable;
using apache::thrift::concurrency::ThreadManager;
-// Forward declaration of class
-class TConnection;
-
#ifdef LIBEVENT_VERSION_NUMBER
#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
@@ -94,6 +91,8 @@
class TNonblockingServer : public TServer {
private:
+ class TConnection;
+
/// Listen backlog
static const int LISTEN_BACKLOG = 1024;
@@ -698,276 +697,6 @@
void stop();
};
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
- SOCKET_RECV_FRAMING,
- SOCKET_RECV,
- SOCKET_SEND
-};
-
-/**
- * Five states for the nonblocking servr:
- * 1) initialize
- * 2) read 4 byte frame size
- * 3) read frame of data
- * 4) send back data (if any)
- * 5) force immediate connection close
- */
-enum TAppState {
- APP_INIT,
- APP_READ_FRAME_SIZE,
- APP_READ_REQUEST,
- APP_WAIT_TASK,
- APP_SEND_RESULT,
- APP_CLOSE_CONNECTION
-};
-
-/**
- * Represents a connection that is handled via libevent. This connection
- * essentially encapsulates a socket that has some associated libevent state.
- */
-class TConnection {
- private:
-
- /// Server handle
- TNonblockingServer* server_;
-
- /// Object wrapping network socket
- boost::shared_ptr<TSocket> tSocket_;
-
- /// Libevent object
- struct event event_;
-
- /// Libevent flags
- short eventFlags_;
-
- /// Socket mode
- TSocketState socketState_;
-
- /// Application state
- TAppState appState_;
-
- /// How much data needed to read
- uint32_t readWant_;
-
- /// Where in the read buffer are we
- uint32_t readBufferPos_;
-
- /// Read buffer
- uint8_t* readBuffer_;
-
- /// Read buffer size
- uint32_t readBufferSize_;
-
- /// Write buffer
- uint8_t* writeBuffer_;
-
- /// Write buffer size
- uint32_t writeBufferSize_;
-
- /// How far through writing are we?
- uint32_t writeBufferPos_;
-
- /// Largest size of write buffer seen since buffer was constructed
- size_t largestWriteBufferSize_;
-
- /// Count of the number of calls for use with getResizeBufferEveryN().
- int32_t callsForResize_;
-
- /// Task handle
- int taskHandle_;
-
- /// Task event
- struct event taskEvent_;
-
- /// Transport to read from
- boost::shared_ptr<TMemoryBuffer> inputTransport_;
-
- /// Transport that processor writes to
- boost::shared_ptr<TMemoryBuffer> outputTransport_;
-
- /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
- boost::shared_ptr<TTransport> factoryInputTransport_;
- boost::shared_ptr<TTransport> factoryOutputTransport_;
-
- /// Protocol decoder
- boost::shared_ptr<TProtocol> inputProtocol_;
-
- /// Protocol encoder
- boost::shared_ptr<TProtocol> outputProtocol_;
-
- /// Server event handler, if any
- boost::shared_ptr<TServerEventHandler> serverEventHandler_;
-
- /// Thrift call context, if any
- void *connectionContext_;
-
- /// Go into read mode
- void setRead() {
- setFlags(EV_READ | EV_PERSIST);
- }
-
- /// Go into write mode
- void setWrite() {
- setFlags(EV_WRITE | EV_PERSIST);
- }
-
- /// Set socket idle
- void setIdle() {
- setFlags(0);
- }
-
- /**
- * Set event flags for this connection.
- *
- * @param eventFlags flags we pass to libevent for the connection.
- */
- void setFlags(short eventFlags);
-
- /**
- * Libevent handler called (via our static wrapper) when the connection
- * socket had something happen. Rather than use the flags libevent passed,
- * we use the connection state to determine whether we need to read or
- * write the socket.
- */
- void workSocket();
-
- /// Close this connection and free or reset its resources.
- void close();
-
- public:
-
- class Task;
-
- /// Constructor
- TConnection(int socket, short eventFlags, TNonblockingServer *s,
- const sockaddr* addr, socklen_t addrLen) {
- readBuffer_ = NULL;
- readBufferSize_ = 0;
-
- // Allocate input and output tranpsorts
- // these only need to be allocated once per TConnection (they don't need to be
- // reallocated on init() call)
- inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
- outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
- tSocket_.reset(new TSocket());
-
- init(socket, eventFlags, s, addr, addrLen);
- server_->incrementNumConnections();
- }
-
- ~TConnection() {
- std::free(readBuffer_);
- server_->decrementNumConnections();
- }
-
- /**
- * Check buffers against any size limits and shrink it if exceeded.
- *
- * @param readLimit we reduce read buffer size to this (if nonzero).
- * @param writeLimit if nonzero and write buffer is larger, replace it.
- */
- void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
-
- /// Initialize
- void init(int socket, short eventFlags, TNonblockingServer *s,
- const sockaddr* addr, socklen_t addrLen);
-
- /**
- * This is called when the application transitions from one state into
- * another. This means that it has finished writing the data that it needed
- * to, or finished receiving the data that it needed to.
- */
- void transition();
-
- /**
- * C-callable event handler for connection events. Provides a callback
- * that libevent can understand which invokes connection_->workSocket().
- *
- * @param fd the descriptor the event occured on.
- * @param which the flags associated with the event.
- * @param v void* callback arg where we placed TConnection's "this".
- */
- static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
- assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
- ((TConnection*)v)->workSocket();
- }
-
- /**
- * C-callable event handler for signaling task completion. Provides a
- * callback that libevent can understand that will read a connection
- * object's address from a pipe and call connection->transition() for
- * that object.
- *
- * @param fd the descriptor the event occured on.
- */
- static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
- TConnection* connection;
- ssize_t nBytes;
- while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
- == sizeof(TConnection*)) {
- connection->transition();
- }
- if (nBytes > 0) {
- throw TException("TConnection::taskHandler unexpected partial read");
- }
- if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
- GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
- }
- }
-
- /**
- * Notification to server that processing has ended on this request.
- * Can be called either when processing is completed or when a waiting
- * task has been preemptively terminated (on overload).
- *
- * @return true if successful, false if unable to notify (check errno).
- */
- bool notifyServer() {
- TConnection* connection = this;
- if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
- sizeof(TConnection*), 0) != sizeof(TConnection*)) {
- return false;
- }
-
- return true;
- }
-
- /// Force connection shutdown for this connection.
- void forceClose() {
- appState_ = APP_CLOSE_CONNECTION;
- if (!notifyServer()) {
- throw TException("TConnection::forceClose: failed write on notify pipe");
- }
- }
-
- /// return the server this connection was initialized for.
- TNonblockingServer* getServer() {
- return server_;
- }
-
- /// get state of connection.
- TAppState getState() {
- return appState_;
- }
-
- /// return the TSocket transport wrapping this network connection
- boost::shared_ptr<TSocket> getTSocket() const {
- return tSocket_;
- }
-
- /// return the server event handler if any
- boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
- return serverEventHandler_;
- }
-
- /// return the Thrift connection context if any
- void* getConnectionContext() {
- return connectionContext_;
- }
-
-};
-
}}} // apache::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_