C++ Thrift coding style changes

Summary: Make underscore for class members consistent


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664818 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cc b/lib/cpp/src/server/TNonblockingServer.cc
new file mode 100644
index 0000000..14fb5bc
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.cc
@@ -0,0 +1,476 @@
+#include "TNonblockingServer.h"
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <assert.h>
+
+namespace facebook { namespace thrift { namespace server { 
+
+void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
+  socket_ = socket;
+  server_ = s;
+  appState_ = APP_INIT;
+  eventFlags_ = 0;
+
+  readBufferPos_ = 0;
+  readWant_ = 0;
+
+  writeBuffer_ = NULL;
+  writeBufferSize_ = 0;
+  writeBufferPos_ = 0;
+
+  socketState_ = SOCKET_RECV;
+  appState_ = APP_INIT;
+  
+  // Set flags, which also registers the event
+  setFlags(eventFlags);
+}
+
+void TConnection::workSocket() {
+  int flags;
+
+  switch (socketState_) {
+  case SOCKET_RECV:
+    // It is an error to be in this state if we already have all the data
+    assert(readBufferPos_ < readWant_);
+
+    // How much space is availble, and how much will we fetch
+    uint32_t avail = readBufferSize_ - readBufferPos_;
+    uint32_t fetch = readWant_ - readBufferPos_;
+
+    // Double the buffer size until it is big enough
+    if (fetch > avail) {
+      while (fetch > avail) {
+        readBufferSize_ *= 2;
+      }
+      readBuffer_ = (uint8_t*)realloc(readBuffer_, readBufferSize_);
+      if (readBuffer_ == NULL) {
+        perror("TConnection::workSocket() realloc");
+        close();
+        return;
+      }
+    }
+
+    // Read from the socket
+    int got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
+    
+    if (got > 0) {
+      // Move along in the buffer
+      readBufferPos_ += got;
+
+      // Check that we did not overdo it
+      assert(readBufferPos_ <= readWant_);
+    
+      // We are done reading, move onto the next state
+      if (readBufferPos_ == readWant_) {
+        transition();
+      }
+      return;
+    } else if (got == -1) {
+      // Blocking errors are okay, just move on
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        return;
+      }
+
+      if (errno != ECONNRESET) {
+        perror("TConnection::workSocket() recv -1");
+      }
+    }
+
+    // Whenever we get down here it means a remote disconnect
+    close();
+    
+    return;
+
+  case SOCKET_SEND:
+    // Should never have position past size
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // If there is no data to send, then let us move on
+    if (writeBufferPos_ == writeBufferSize_) {
+      fprintf(stderr, "WARNING: Send state with no data to send\n");
+      transition();
+      return;
+    }
+
+    flags = 0;
+    #ifdef MSG_NOSIGNAL
+    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+    // check for the EPIPE return condition and close the socket in that case
+    flags |= MSG_NOSIGNAL;
+    #endif // ifdef MSG_NOSIGNAL
+
+    int left = writeBufferSize_ - writeBufferPos_;
+    int sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
+
+    if (sent <= 0) {
+      // Blocking errors are okay, just move on
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        return;
+      }
+      if (errno != EPIPE) {
+        perror("TConnection::workSocket() send -1");
+      }
+      close();
+      return;
+    }
+
+    writeBufferPos_ += sent;
+
+    // Did we overdo it?
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // We are  done!
+    if (writeBufferPos_ == writeBufferSize_) {
+      transition();
+    }
+
+    return;
+
+  default:
+    fprintf(stderr, "Shit Got Ill. Socket State %d\n", socketState_);
+    assert(0);
+  }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TConnection::transition() {
+  // Switch upon the state that we are currently in and move to a new state
+  switch (appState_) {
+
+  case APP_READ_REQUEST:
+    // We are done reading the request, package the read buffer into transport
+    // and get back some data from the dispatch function
+    inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+    outputTransport_->resetBuffer();
+
+    try {
+      // Invoke the processor
+      server_->getProcessor()->process(inputTransport_, outputTransport_);
+    } catch (TTransportException &x) {
+      fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
+      close();
+      return;    
+    } catch (...) {
+      fprintf(stderr, "Server::process() unknown exception\n");
+      close();
+      return;
+    }
+
+
+    // Get the result of the operation
+    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+    // If the function call generated return data, then move into the send
+    // state and get going
+    if (writeBufferSize_ > 0) {
+
+      // Move into write state
+      writeBufferPos_ = 0;
+      socketState_ = SOCKET_SEND;
+      appState_ = APP_SEND_RESULT;
+
+      // Socket into write mode
+      setWrite();
+
+      // Try to work the socket immediately
+      workSocket();
+
+      return;
+    }
+
+    // In this case, the request was asynchronous and we should fall through
+    // right back into the read frame header state
+
+  case APP_SEND_RESULT:
+
+    // N.B.: We also intentionally fall through here into the INIT state!
+
+  case APP_INIT:
+
+    // Clear write buffer variables
+    writeBuffer_ = NULL;
+    writeBufferPos_ = 0;
+    writeBufferSize_ = 0;
+
+    // Set up read buffer for getting 4 bytes
+    readBufferPos_ = 0;
+    readWant_ = 4;
+
+    // Into read4 state we go
+    socketState_ = SOCKET_RECV;
+    appState_ = APP_READ_FRAME_SIZE;
+
+    // Register read event
+    setRead();
+
+    // Try to work the socket right away
+    workSocket();
+
+    return;
+
+  case APP_READ_FRAME_SIZE:
+    // We just read the request length, deserialize it
+    int sz = *(int32_t*)readBuffer_;
+    sz = (int32_t)ntohl(sz);
+
+    if (sz <= 0) {
+      fprintf(stderr, "TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
+      close();
+      return;
+    }
+
+    // Reset the read buffer
+    readWant_ = (uint32_t)sz;
+    readBufferPos_= 0;
+
+    // Move into read request state
+    appState_ = APP_READ_REQUEST;
+
+    // Work the socket right away
+    workSocket();
+
+    return;
+
+  default:
+    fprintf(stderr, "Totally Fucked. Application State %d\n", appState_);
+    assert(0);
+  }
+}
+
+void TConnection::setFlags(short eventFlags) {
+  // Catch the do nothing case
+  if (eventFlags_ == eventFlags) {
+    return;
+  }
+
+  // Delete a previously existing event
+  if (eventFlags_ != 0) {
+    if (event_del(&event_) == -1) {
+      perror("TConnection::setFlags event_del");
+      return;
+    }
+  }
+
+  // Update in memory structure
+  eventFlags_ = eventFlags;
+
+  /**
+   * event_set:
+   *
+   * Prepares the event structure &event to be used in future calls to
+   * event_add() and event_del().  The event will be prepared to call the
+   * event_handler using the 'sock' file descriptor to monitor events.
+   *
+   * The events can be either EV_READ, EV_WRITE, or both, indicating
+   * that an application can read or write from the file respectively without
+   * blocking.
+   *
+   * The event_handler will be called with the file descriptor that triggered
+   * the event and the type of event which will be one of: EV_TIMEOUT,
+   * EV_SIGNAL, EV_READ, EV_WRITE.
+   *
+   * The additional flag EV_PERSIST makes an event_add() persistent until
+   * event_del() has been called.
+   *
+   * Once initialized, the &event struct can be used repeatedly with
+   * event_add() and event_del() and does not need to be reinitialized unless
+   * the event_handler and/or the argument to it are to be changed.  However,
+   * when an ev structure has been added to libevent using event_add() the
+   * structure must persist until the event occurs (assuming EV_PERSIST
+   * is not set) or is removed using event_del().  You may not reuse the same
+   * ev structure for multiple monitored descriptors; each descriptor needs
+   * its own ev.
+   */
+  event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
+
+  // Add the event
+  if (event_add(&event_, 0) == -1) {
+    perror("TConnection::setFlags(): coult not event_add");
+  }
+}
+
+/**
+ * Closes a connection
+ */
+void TConnection::close() {
+  // Delete the registered libevent
+  if (event_del(&event_) == -1) {
+    perror("TConnection::close() event_del");
+  }
+
+  // Close the socket
+  if (socket_ > 0) {
+    ::close(socket_);
+  }
+  socket_ = 0;
+
+  // Give this object back to the server that owns it
+  server_->returnConnection(this);
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TConnection* TNonblockingServer::createConnection(int socket, short flags) {
+  // Check the stack
+  if (connectionStack_.empty()) {
+    return new TConnection(socket, flags, this);
+  } else {
+    TConnection* result = connectionStack_.top();
+    connectionStack_.pop();
+    result->init(socket, flags, this);
+    return result;
+  }
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+  connectionStack_.push(connection);
+}
+
+/**
+ * Server socket had something happen
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+  // Make sure that libevent didn't fuck up the socket handles
+  assert(fd == serverSocket_);
+  
+  // Server socket accepted a new connection
+  socklen_t addrLen;
+  struct sockaddr addr;
+  addrLen = sizeof(addr);   
+  
+  // Going to accept a new client socket
+  int clientSocket;
+  
+  // Accept as many new clients as possible, even though libevent signaled only
+  // one, this helps us to avoid having to go back into the libevent engine so
+  // many times
+  while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
+
+    // Explicitly set this socket to NONBLOCK mode
+    int flags;
+    if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+        fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+      perror("thriftServerEventHandler: set O_NONBLOCK");
+      close(clientSocket);
+      return;
+    }
+
+    // Create a new TConnection for this client socket.
+    TConnection* clientConnection =
+      createConnection(clientSocket, EV_READ | EV_PERSIST);
+
+    // Fail fast if we could not create a TConnection object
+    if (clientConnection == NULL) {
+      fprintf(stderr, "thriftServerEventHandler: failed TConnection factory");
+      close(clientSocket);
+      return;
+    }
+
+    // Put this client connection into the proper state
+    clientConnection->transition();
+  }
+  
+  // Done looping accept, now we have to make sure the error is due to
+  // blocking. Any other error is a problem
+  if (errno != EAGAIN && errno != EWOULDBLOCK) {
+    perror("thriftServerEventHandler: accept()");
+  }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+  // Initialize libevent
+  event_init();
+
+  // Print some libevent stats
+  fprintf(stderr,
+          "libevent %s method %s\n",
+          event_get_version(),
+          event_get_method());
+
+  // Create the server socket
+  serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
+  if (serverSocket_ == -1) {
+    perror("TNonblockingServer::serve() socket() -1");
+    return;
+  }
+
+  // Set socket to nonblocking mode
+  int flags;
+  if ((flags = fcntl(serverSocket_, F_GETFL, 0)) < 0 ||
+      fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK) < 0) {
+    perror("TNonblockingServer::serve() O_NONBLOCK");
+    ::close(serverSocket_);
+    return;
+  }
+
+  int one = 1;
+  struct linger ling = {0, 0};
+  
+  // Set reuseaddr to avoid 2MSL delay on server restart
+  setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+
+  // Keepalive to ensure full result flushing
+  setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
+
+  // Turn linger off to avoid hung sockets
+  setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
+
+  // Set TCP nodelay if available, MAC OS X Hack
+  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+  #ifndef TCP_NOPUSH
+  setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
+  #endif
+
+  struct sockaddr_in addr;
+  addr.sin_family = AF_INET;
+  addr.sin_port = htons(port_);
+  addr.sin_addr.s_addr = INADDR_ANY;
+
+  if (bind(serverSocket_, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
+    perror("TNonblockingServer::serve() bind");
+    close(serverSocket_);
+    return;
+  }
+
+  if (listen(serverSocket_, LISTEN_BACKLOG) == -1) {
+    perror("TNonblockingServer::serve() listen");
+    close(serverSocket_);
+    return;
+  }
+
+  // Register the server event
+  struct event serverEvent;
+  event_set(&serverEvent,
+            serverSocket_,
+            EV_READ | EV_PERSIST,
+            TNonblockingServer::eventHandler,
+            this);
+
+  // Add the event and start up the server
+  if (event_add(&serverEvent, 0) == -1) {
+    perror("TNonblockingServer::serve(): coult not event_add");
+    return;
+  }
+
+  // Run libevent engine, never returns, invokes calls to event_handler
+  event_loop(0);
+}
+
+}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
new file mode 100644
index 0000000..565486c
--- /dev/null
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -0,0 +1,195 @@
+#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
+#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
+
+#include "Thrift.h"
+#include "server/TServer.h"
+#include "transport/TMemoryBuffer.h"
+#include <stack>
+#include <event.h>
+
+#
+
+namespace facebook { namespace thrift { namespace server { 
+
+using boost::shared_ptr;
+
+// Forward declaration of class
+class TConnection;
+
+/**
+ * This is a non-blocking server in C++ for high performance that operates a
+ * single IO thread. It assumes that all incoming requests are framed with a
+ * 4 byte length indicator and writes out responses using the same framing.
+ *
+ * It does not use the TServerTransport framework, but rather has socket
+ * operations hardcoded for use with select.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TNonblockingServer : public TServer {
+ private:
+
+  // Listen backlog
+  static const int LISTEN_BACKLOG = 1024;
+
+  // Server socket file descriptor
+  int serverSocket_;
+
+  // Port server runs on
+  int port_;
+
+  /**
+   * This is a stack of all the objects that have been created but that
+   * are NOT currently in use. When we close a connection, we place it on this
+   * stack so that the object can be reused later, rather than freeing the
+   * memory and reallocating a new object later.
+   */
+  std::stack<TConnection*> connectionStack_;
+
+  void handleEvent(int fd, short which);
+
+ public:
+  TNonblockingServer(shared_ptr<TProcessor> processor,
+                     shared_ptr<TServerOptions> options,
+                     int port) :
+    TServer(processor, options), serverSocket_(0), port_(port) {}
+    
+  ~TNonblockingServer() {}
+
+  TConnection* createConnection(int socket, short flags);
+
+  void returnConnection(TConnection* connection);
+
+  static void eventHandler(int fd, short which, void* v) {
+    ((TNonblockingServer*)v)->handleEvent(fd, which);
+  }
+
+  void serve();
+};
+
+/**
+ * Two states for sockets, recv and send mode
+ */
+enum TSocketState {
+  SOCKET_RECV,
+  SOCKET_SEND
+};
+
+/**
+ * Four states for the nonblocking servr:
+ *  1) initialize
+ *  2) read 4 byte frame size
+ *  3) read frame of data
+ *  4) send back data (if any)
+ */
+enum TAppState {
+  APP_INIT,
+  APP_READ_FRAME_SIZE,
+  APP_READ_REQUEST,
+  APP_SEND_RESULT
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TConnection {
+ private:
+
+  // Server handle
+  TNonblockingServer* server_;
+
+  // Socket handle
+  int socket_;
+
+  // Libevent object
+  struct event event_;
+
+  // Libevent flags
+  short eventFlags_;
+
+  // Socket mode
+  TSocketState socketState_;
+
+  // Application state
+  TAppState appState_;
+
+  // How much data needed to read
+  uint32_t readWant_;
+
+  // Where in the read buffer are we
+  uint32_t readBufferPos_;
+
+  // Read buffer
+  uint8_t* readBuffer_;
+
+  // Read buffer size
+  uint32_t readBufferSize_;
+
+  // Write buffer
+  uint8_t* writeBuffer_;
+  
+  // Write buffer size
+  uint32_t writeBufferSize_;
+
+  // How far through writing are we?
+  uint32_t writeBufferPos_;
+
+  // Transport to read from
+  shared_ptr<TMemoryBuffer> inputTransport_;
+
+  // Transport that processor writes to
+  shared_ptr<TMemoryBuffer> outputTransport_;
+
+  // Go into read mode
+  void setRead() {
+    setFlags(EV_READ | EV_PERSIST);
+  }
+
+  // Go into write mode
+  void setWrite() {
+    setFlags(EV_WRITE | EV_PERSIST);
+  }
+
+  // Set event flags
+  void setFlags(short eventFlags);
+
+  // Libevent handlers
+  void workSocket();
+
+  // Close this client and reset
+  void close();
+
+ public:
+
+  // Constructor
+  TConnection(int socket, short eventFlags, TNonblockingServer *s) {
+    readBuffer_ = (uint8_t*)malloc(1024);
+    if (readBuffer_ == NULL) {
+      throw new facebook::thrift::Exception("Out of memory.");
+    }
+    readBufferSize_ = 1024;
+    
+    // Allocate input and output tranpsorts
+    inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
+    outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+    
+    init(socket, eventFlags, s);
+  }
+
+  // Initialize
+  void init(int socket, short eventFlags, TNonblockingServer *s);
+
+  // Transition into a new state
+  void transition();
+
+  // Handler wrapper
+  static void eventHandler(int fd, short which, void* v) {
+    assert(fd = ((TConnection*)v)->socket_);
+    ((TConnection*)v)->workSocket();
+  }
+};
+
+}}} // facebook::thrift::server
+
+#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index ddb320d..eb23b45 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -26,6 +26,10 @@
   virtual ~TServer() {}
   virtual void serve() = 0;
   
+  shared_ptr<TProcessor> getProcessor() {
+    return processor_;
+  }
+  
 protected:
   TServer(shared_ptr<TProcessor> processor,
           shared_ptr<TServerTransport> serverTransport,
diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc
index 4285b05..43f7463 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cc
+++ b/lib/cpp/src/server/TThreadPoolServer.cc
@@ -12,19 +12,15 @@
 using namespace facebook::thrift::transport;
 
 class TThreadPoolServer::Task: public Runnable {
-    
-  shared_ptr<TProcessor> _processor;
-  shared_ptr<TTransport> _input;
-  shared_ptr<TTransport> _output;
-    
+       
 public:
     
   Task(shared_ptr<TProcessor> processor,
        shared_ptr<TTransport> input,
        shared_ptr<TTransport> output) :
-    _processor(processor),
-    _input(input),
-    _output(output) {
+    processor_(processor),
+    input_(input),
+    output_(output) {
   }
 
   ~Task() {}
@@ -32,16 +28,22 @@
   void run() {     
     while(true) {
       try {
-	_processor->process(_input, _output);
+	processor_->process(input_, output_);
       } catch (TTransportException& ttx) {
         break;
       } catch(...) {
         break;
       }
     }
-    _input->close();
-    _output->close();
+    input_->close();
+    output_->close();
   }
+
+ private:
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TTransport> input_;
+  shared_ptr<TTransport> output_;
+
 };
   
 TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,