THRIFT-1305. cpp: make TConnection a private inner class of
TNonblockingServer

Patch: Adam Simpkins

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1162987 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 21b8d95..e4e0e64 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -41,9 +41,6 @@
 using apache::thrift::concurrency::Runnable;
 using apache::thrift::concurrency::ThreadManager;
 
-// Forward declaration of class
-class TConnection;
-
 #ifdef LIBEVENT_VERSION_NUMBER
 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
@@ -94,6 +91,8 @@
 
 class TNonblockingServer : public TServer {
  private:
+  class TConnection;
+
   /// Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
@@ -698,276 +697,6 @@
   void stop();
 };
 
-/// Three states for sockets: recv frame size, recv data, and send mode
-enum TSocketState {
-  SOCKET_RECV_FRAMING,
-  SOCKET_RECV,
-  SOCKET_SEND
-};
-
-/**
- * Five states for the nonblocking servr:
- *  1) initialize
- *  2) read 4 byte frame size
- *  3) read frame of data
- *  4) send back data (if any)
- *  5) force immediate connection close
- */
-enum TAppState {
-  APP_INIT,
-  APP_READ_FRAME_SIZE,
-  APP_READ_REQUEST,
-  APP_WAIT_TASK,
-  APP_SEND_RESULT,
-  APP_CLOSE_CONNECTION
-};
-
-/**
- * Represents a connection that is handled via libevent. This connection
- * essentially encapsulates a socket that has some associated libevent state.
- */
-class TConnection {
- private:
-
-  /// Server handle
-  TNonblockingServer* server_;
-
-  /// Object wrapping network socket
-  boost::shared_ptr<TSocket> tSocket_;
-
-  /// Libevent object
-  struct event event_;
-
-  /// Libevent flags
-  short eventFlags_;
-
-  /// Socket mode
-  TSocketState socketState_;
-
-  /// Application state
-  TAppState appState_;
-
-  /// How much data needed to read
-  uint32_t readWant_;
-
-  /// Where in the read buffer are we
-  uint32_t readBufferPos_;
-
-  /// Read buffer
-  uint8_t* readBuffer_;
-
-  /// Read buffer size
-  uint32_t readBufferSize_;
-
-  /// Write buffer
-  uint8_t* writeBuffer_;
-
-  /// Write buffer size
-  uint32_t writeBufferSize_;
-
-  /// How far through writing are we?
-  uint32_t writeBufferPos_;
-
-  /// Largest size of write buffer seen since buffer was constructed
-  size_t largestWriteBufferSize_;
-
-  /// Count of the number of calls for use with getResizeBufferEveryN().
-  int32_t callsForResize_;
-
-  /// Task handle
-  int taskHandle_;
-
-  /// Task event
-  struct event taskEvent_;
-
-  /// Transport to read from
-  boost::shared_ptr<TMemoryBuffer> inputTransport_;
-
-  /// Transport that processor writes to
-  boost::shared_ptr<TMemoryBuffer> outputTransport_;
-
-  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
-  boost::shared_ptr<TTransport> factoryInputTransport_;
-  boost::shared_ptr<TTransport> factoryOutputTransport_;
-
-  /// Protocol decoder
-  boost::shared_ptr<TProtocol> inputProtocol_;
-
-  /// Protocol encoder
-  boost::shared_ptr<TProtocol> outputProtocol_;
-
-  /// Server event handler, if any
-  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
-
-  /// Thrift call context, if any
-  void *connectionContext_;
-
-  /// Go into read mode
-  void setRead() {
-    setFlags(EV_READ | EV_PERSIST);
-  }
-
-  /// Go into write mode
-  void setWrite() {
-    setFlags(EV_WRITE | EV_PERSIST);
-  }
-
-  /// Set socket idle
-  void setIdle() {
-    setFlags(0);
-  }
-
-  /**
-   * Set event flags for this connection.
-   *
-   * @param eventFlags flags we pass to libevent for the connection.
-   */
-  void setFlags(short eventFlags);
-
-  /**
-   * Libevent handler called (via our static wrapper) when the connection
-   * socket had something happen.  Rather than use the flags libevent passed,
-   * we use the connection state to determine whether we need to read or
-   * write the socket.
-   */
-  void workSocket();
-
-  /// Close this connection and free or reset its resources.
-  void close();
-
- public:
-
-  class Task;
-
-  /// Constructor
-  TConnection(int socket, short eventFlags, TNonblockingServer *s,
-              const sockaddr* addr, socklen_t addrLen) {
-    readBuffer_ = NULL;
-    readBufferSize_ = 0;
-
-    // Allocate input and output tranpsorts
-    // these only need to be allocated once per TConnection (they don't need to be
-    // reallocated on init() call)
-    inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
-    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
-    tSocket_.reset(new TSocket());
-
-    init(socket, eventFlags, s, addr, addrLen);
-    server_->incrementNumConnections();
-  }
-
-  ~TConnection() {
-    std::free(readBuffer_);
-    server_->decrementNumConnections();
-  }
-
- /**
-   * Check buffers against any size limits and shrink it if exceeded.
-   *
-   * @param readLimit we reduce read buffer size to this (if nonzero).
-   * @param writeLimit if nonzero and write buffer is larger, replace it.
-   */
-  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
-
-  /// Initialize
-  void init(int socket, short eventFlags, TNonblockingServer *s,
-            const sockaddr* addr, socklen_t addrLen);
-
-  /**
-   * This is called when the application transitions from one state into
-   * another. This means that it has finished writing the data that it needed
-   * to, or finished receiving the data that it needed to.
-   */
-  void transition();
-
-  /**
-   * C-callable event handler for connection events.  Provides a callback
-   * that libevent can understand which invokes connection_->workSocket().
-   *
-   * @param fd the descriptor the event occured on.
-   * @param which the flags associated with the event.
-   * @param v void* callback arg where we placed TConnection's "this".
-   */
-  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
-    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
-    ((TConnection*)v)->workSocket();
-  }
-
-  /**
-   * C-callable event handler for signaling task completion.  Provides a
-   * callback that libevent can understand that will read a connection
-   * object's address from a pipe and call connection->transition() for
-   * that object.
-   *
-   * @param fd the descriptor the event occured on.
-   */
-  static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
-    TConnection* connection;
-    ssize_t nBytes;
-    while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
-        == sizeof(TConnection*)) {
-      connection->transition();
-    }
-    if (nBytes > 0) {
-      throw TException("TConnection::taskHandler unexpected partial read");
-    }
-    if (errno && errno != EWOULDBLOCK && errno != EAGAIN) {
-      GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
-    }
-  }
-
-  /**
-   * Notification to server that processing has ended on this request.
-   * Can be called either when processing is completed or when a waiting
-   * task has been preemptively terminated (on overload).
-   *
-   * @return true if successful, false if unable to notify (check errno).
-   */
-  bool notifyServer() {
-    TConnection* connection = this;
-    if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
-             sizeof(TConnection*), 0) != sizeof(TConnection*)) {
-      return false;
-    }
-
-    return true;
-  }
-
-  /// Force connection shutdown for this connection.
-  void forceClose() {
-    appState_ = APP_CLOSE_CONNECTION;
-    if (!notifyServer()) {
-      throw TException("TConnection::forceClose: failed write on notify pipe");
-    }
-  }
-
-  /// return the server this connection was initialized for.
-  TNonblockingServer* getServer() {
-    return server_;
-  }
-
-  /// get state of connection.
-  TAppState getState() {
-    return appState_;
-  }
-
-  /// return the TSocket transport wrapping this network connection
-  boost::shared_ptr<TSocket> getTSocket() const {
-    return tSocket_;
-  }
-
-  /// return the server event handler if any
-  boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
-    return serverEventHandler_;
-  }
-
-  /// return the Thrift connection context if any
-  void* getConnectionContext() {
-    return connectionContext_;
-  }
-
-};
-
 }}} // apache::thrift::server
 
 #endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_