Task(boost::shared_ptr<TProcessor> processor,
        boost::shared_ptr<TProtocol> input,
        boost::shared_ptr<TProtocol> output,
-       int taskHandle) :
+       TConnection* connection) :
     processor_(processor),
     input_(input),
     output_(output),
-    taskHandle_(taskHandle) {}
+    connection_(connection) {}
 
   void run() {
     try {
       cerr << "TNonblockingServer uncaught exception." << endl;
     }
 
-    // Signal completion back to the libevent thread via a socketpair
-    int8_t b = 0;
-    if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
-      GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
-    }
-    if (-1 == ::close(taskHandle_)) {
-      GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
+    // Signal completion back to the libevent thread via a pipe
+    if (!connection_->notifyServer()) {
+      throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
     }
   }
 
+  TConnection* getTConnection() {
+    return connection_;
+  }
+
  private:
   boost::shared_ptr<TProcessor> processor_;
   boost::shared_ptr<TProtocol> input_;
   boost::shared_ptr<TProtocol> output_;
-  int taskHandle_;
+  TConnection* connection_;
 };
 
 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
   socketState_ = SOCKET_RECV;
   appState_ = APP_INIT;
 
-  taskHandle_ = -1;
-
   // Set flags, which also registers the event
   setFlags(eventFlags);
 
     outputTransport_->getWritePtr(4);
     outputTransport_->wroteBytes(4);
 
+    server_->incrementActiveProcessors();
+
     if (server_->isThreadPoolProcessing()) {
       // We are setting up a Task to do this work and we will wait on it
-      int sv[2];
-      if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
-        GlobalOutput.perror("TConnection::socketpair() failed ", errno);
-        // Now we will fall through to the APP_WAIT_TASK block with no response
-      } else {
-        // Create task and dispatch to the thread manager
-        boost::shared_ptr<Runnable> task =
-          boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
-                                               inputProtocol_,
-                                               outputProtocol_,
-                                               sv[1]));
-        // The application is now waiting on the task to finish
-        appState_ = APP_WAIT_TASK;
-
-        // Create an event to be notified when the task finishes
-        event_set(&taskEvent_,
-                  taskHandle_ = sv[0],
-                  EV_READ,
-                  TConnection::taskHandler,
-                  this);
-
-        // Attach to the base
-        event_base_set(server_->getEventBase(), &taskEvent_);
-
-        // Add the event and start up the server
-        if (-1 == event_add(&taskEvent_, 0)) {
-          GlobalOutput("TNonblockingServer::serve(): coult not event_add");
-          return;
-        }
+
+      // Create task and dispatch to the thread manager
+      boost::shared_ptr<Runnable> task =
+        boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+                                             inputProtocol_,
+                                             outputProtocol_,
+                                             this));
+      // The application is now waiting on the task to finish
+      appState_ = APP_WAIT_TASK;
+
         try {
           server_->addTask(task);
         } catch (IllegalStateException & ise) {
           close();
         }
 
-        // Set this connection idle so that libevent doesn't process more
-        // data on it while we're still waiting for the threadmanager to
-        // finish this task
-        setIdle();
-        return;
-      }
+      // Set this connection idle so that libevent doesn't process more
+      // data on it while we're still waiting for the threadmanager to
+      // finish this task
+      setIdle();
+      return;
     } else {
       try {
         // Invoke the processor
         server_->getProcessor()->process(inputProtocol_, outputProtocol_);
       } catch (TTransportException &ttx) {
         GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+        server_->decrementActiveProcessors();
         close();
         return;
       } catch (TException &x) {
         GlobalOutput.printf("TException: Server::process() %s", x.what());
+        server_->decrementActiveProcessors();
         close();
         return;
       } catch (...) {
         GlobalOutput.printf("Server::process() unknown exception");
+        server_->decrementActiveProcessors();
         close();
         return;
       }
     // into the outputTransport_, so we grab its contents and place them into
     // the writeBuffer_ for actual writing by the libevent thread
 
+    server_->decrementActiveProcessors();
     // Get the result of the operation
     outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
 
 
     return;
 
+  case APP_CLOSE_CONNECTION:
+    server_->decrementActiveProcessors();
+    close();
+    return;
+
   default:
     GlobalOutput.printf("Unexpected Application State %d", appState_);
     assert(0);
     return;
   }
 
-  /**
+  /*
    * event_set:
    *
    * Prepares the event structure &event to be used in future calls to
   }
 
   // Close the socket
-  if (socket_ > 0) {
+  if (socket_ >= 0) {
     ::close(socket_);
   }
-  socket_ = 0;
+  socket_ = -1;
 
   // close any factory produced transports
   factoryInputTransport_->close();
   server_->returnConnection(this);
 }
 
-void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+void TConnection::checkIdleBufferMemLimit(size_t limit) {
   if (readBufferSize_ > limit) {
     readBufferSize_ = limit;
     readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
   // one, this helps us to avoid having to go back into the libevent engine so
   // many times
   while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
-
+    // If we're overloaded, take action here
+    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+      nConnectionsDropped_++;
+      nTotalConnectionsDropped_++;
+      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+        close(clientSocket);
+        continue;
+      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+        if (!drainPendingTask()) {
+          // Nothing left to discard, so we drop connection instead.
+          close(clientSocket);
+          continue;
+        }
+      }
+    }
     // Explicitly set this socket to NONBLOCK mode
     int flags;
     if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
   serverSocket_ = s;
 }
 
+void TNonblockingServer::createNotificationPipe() {
+  if (pipe(notificationPipeFDs_) != 0) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+      throw TException("can't create notification pipe");
+  }
+}
+
 /**
  * Register the core libevent events onto the proper base.
  */
   if (-1 == event_add(&serverEvent_, 0)) {
     throw TException("TNonblockingServer::serve(): coult not event_add");
   }
+  if (threadPoolProcessing_) {
+    // Create an event to be notified when a task finishes
+    event_set(¬ificationEvent_,
+              getNotificationRecvFD(),
+              EV_READ | EV_PERSIST,
+              TConnection::taskHandler,
+              this);
+    
+    // Attach to the base
+    event_base_set(eventBase_, ¬ificationEvent_);
+
+    // Add the event and start up the server
+    if (-1 == event_add(¬ificationEvent_, 0)) {
+      throw TException("TNonblockingServer::serve(): notification event_add fail");
+    }
+  }
+}
+
+bool  TNonblockingServer::serverOverloaded() {
+  size_t activeConnections = numTConnections_ - connectionStack_.size();
+  if (numActiveProcessors_ > maxActiveProcessors_ ||
+      activeConnections > maxConnections_) {
+    if (!overloaded_) {
+      GlobalOutput.printf("thrift non-blocking server overload condition");
+      overloaded_ = true;
+    }
+  } else {
+    if (overloaded_ &&
+        (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+        (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+      GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+                          nConnectionsDropped_, nTotalConnectionsDropped_);
+      nConnectionsDropped_ = 0;
+      overloaded_ = false;
+    }
+  }
+
+  return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+  if (threadManager_) {
+    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+    if (task) {
+      TConnection* connection =
+        static_cast<TConnection::Task*>(task.get())->getTConnection();
+      assert(connection && connection->getServer()
+             && connection->getState() == APP_WAIT_TASK);
+      connection->forceClose();
+      return true;
+    }
+  }
+  return false;
 }
 
 /**
   // Init socket
   listenSocket();
 
+  if (threadPoolProcessing_) {
+    // Init task completion notification pipe
+    createNotificationPipe();
+  }
+
   // Initialize libevent core
   registerEvents(static_cast<event_base*>(event_init()));
 
 
 #include <server/TServer.h>
 #include <transport/TBufferTransports.h>
 #include <concurrency/ThreadManager.h>
+#include <climits>
 #include <stack>
 #include <string>
 #include <errno.h>
  * operations hardcoded for use with select.
  *
  */
+
+
+/// Overload condition actions.
+enum TOverloadAction {
+  T_OVERLOAD_NO_ACTION,        ///< Don't handle overload */
+  T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
+  T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
+};
+
 class TNonblockingServer : public TServer {
  private:
-
-  // Listen backlog
+  /// Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
-  // Default limit on size of idle connection pool
+  /// Default limit on size of idle connection pool
   static const size_t CONNECTION_STACK_LIMIT = 1024;
 
-  // Maximum size of buffer allocated to idle connection
+  /// Maximum size of buffer allocated to idle connection
   static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
 
-  // Server socket file descriptor
+  /// Default limit on total number of connected sockets
+  static const int MAX_CONNECTIONS = INT_MAX;
+
+  /// Default limit on connections in handler/task processing
+  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+
+  /// Server socket file descriptor
   int serverSocket_;
 
-  // Port server runs on
+  /// Port server runs on
   int port_;
 
-  // For processing via thread pool, may be NULL
+  /// For processing via thread pool, may be NULL
   boost::shared_ptr<ThreadManager> threadManager_;
 
-  // Is thread pool processing?
+  /// Is thread pool processing?
   bool threadPoolProcessing_;
 
-  // The event base for libevent
+  /// The event base for libevent
   event_base* eventBase_;
 
-  // Event struct, for use with eventBase_
+  /// Event struct, used with eventBase_ for connection events
   struct event serverEvent_;
 
-  // Number of TConnection object we've created
+  /// Event struct, used with eventBase_ for task completion notification
+  struct event notificationEvent_;
+
+  /// Number of TConnection object we've created
   size_t numTConnections_;
 
-  // Limit for how many TConnection objects to cache
+  /// Number of Connections processing or waiting to process 
+  size_t numActiveProcessors_;
+
+  /// Limit for how many TConnection objects to cache
   size_t connectionStackLimit_;
 
+  /// Limit for number of connections processing or waiting to process
+  size_t maxActiveProcessors_;
+
+  /// Limit for number of open connections
+  size_t maxConnections_;
+
+  /**
+   * Hysteresis for overload state.  This is the fraction of the overload
+   * value that needs to be reached before the overload state is cleared;
+   * must be <= 1.0.
+   */
+  double overloadHysteresis_;
+
+  /// Action to take when we're overloaded.
+  TOverloadAction overloadAction_;
+
   /**
    * Max read buffer size for an idle connection.  When we place an idle
    * TConnection into connectionStack_, we insure that its read buffer is
    * reduced to this size to insure that idle connections don't hog memory.
    */
-  uint32_t idleBufferMemLimit_;
+  size_t idleBufferMemLimit_;
+
+  /// Set if we are currently in an overloaded state.
+  bool overloaded_;
+
+  /// Count of connections dropped since overload started
+  uint32_t nConnectionsDropped_;
+
+  /// Count of connections dropped on overload since server started
+  uint64_t nTotalConnectionsDropped_;
+
+  /// File descriptors for pipe used for task completion notification.
+  int notificationPipeFDs_[2];
 
   /**
    * This is a stack of all the objects that have been created but that
    */
   std::stack<TConnection*> connectionStack_;
 
+  /**
+   * Called when server socket had something happen.  We accept all waiting
+   * client connections on listen socket fd and assign TConnection objects
+   * to handle those requests.
+   *
+   * @param fd the listen socket.
+   * @param which the event flag that triggered the handler.
+   */
   void handleEvent(int fd, short which);
 
  public:
     threadPoolProcessing_(false),
     eventBase_(NULL),
     numTConnections_(0),
+    numActiveProcessors_(0),
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
+    maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+    maxConnections_(MAX_CONNECTIONS),
+    overloadHysteresis_(0.8),
+    overloadAction_(T_OVERLOAD_NO_ACTION),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    overloaded_(false),
+    nConnectionsDropped_(0),
+    nTotalConnectionsDropped_(0) {}
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
     threadManager_(threadManager),
     eventBase_(NULL),
     numTConnections_(0),
+    numActiveProcessors_(0),
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+    maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+    maxConnections_(MAX_CONNECTIONS),
+    overloadHysteresis_(0.8),
+    overloadAction_(T_OVERLOAD_NO_ACTION),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    overloaded_(false),
+    nConnectionsDropped_(0),
+    nTotalConnectionsDropped_(0) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
                      int port,
                      boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
-    serverSocket_(0),
+    serverSocket_(-1),
     port_(port),
     threadManager_(threadManager),
     eventBase_(NULL),
     numTConnections_(0),
+    numActiveProcessors_(0),
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+    maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+    maxConnections_(MAX_CONNECTIONS),
+    overloadHysteresis_(0.8),
+    overloadAction_(T_OVERLOAD_NO_ACTION),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    overloaded_(false),
+    nConnectionsDropped_(0),
+    nTotalConnectionsDropped_(0)  {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
     return eventBase_;
   }
 
+  /// Increment our count of the number of connected sockets.
   void incrementNumConnections() {
     ++numTConnections_;
   }
 
+  /// Decrement our count of the number of connected sockets.
   void decrementNumConnections() {
     --numTConnections_;
   }
 
-  size_t getNumConnections() {
+  /**
+   * Return the count of sockets currently connected to.
+   *
+   * @return count of connected sockets.
+   */
+  size_t getNumConnections() const {
     return numTConnections_;
   }
 
-  size_t getNumIdleConnections() {
+  /**
+   * Return the count of connection objects allocated but not in use.
+   *
+   * @return count of idle connection objects.
+   */
+  size_t getNumIdleConnections() const {
     return connectionStack_.size();
   }
 
+  /**
+   * Return count of number of connections which are currently processing.
+   * This is defined as a connection where all data has been received and
+   * either assigned a task (when threading) or passed to a handler (when
+   * not threading), and where the handler has not yet returned.
+   *
+   * @return # of connections currently processing.
+   */
+  size_t getNumActiveProcessors() const {
+    return numActiveProcessors_;
+  }
+
+  /// Increment the count of connections currently processing.
+  void incrementActiveProcessors() {
+    ++numActiveProcessors_;
+  }
+
+  /// Decrement the count of connections currently processing.
+  void decrementActiveProcessors() {
+    if (numActiveProcessors_ > 0) {
+      --numActiveProcessors_;
+    }
+  }
+
+  /**
+   * Get the maximum # of connections allowed before overload.
+   *
+   * @return current setting.
+   */
+  size_t getMaxConnections() const {
+    return maxConnections_;
+  }
+
+  /**
+   * Set the maximum # of connections allowed before overload.
+   *
+   * @param maxConnections new setting for maximum # of connections.
+   */
+  void setMaxConnections(size_t maxConnections) {
+    maxConnections_ = maxConnections;
+  }
+
+  /**
+   * Get the maximum # of connections waiting in handler/task before overload.
+   *
+   * @return current setting.
+   */
+  size_t getMaxActiveProcessors() const {
+    return maxActiveProcessors_;
+  }
+
+  /**
+   * Set the maximum # of connections waiting in handler/task before overload.
+   *
+   * @param maxActiveProcessors new setting for maximum # of active processes.
+   */
+  void setMaxActiveProcessors(size_t maxActiveProcessors) {
+    maxActiveProcessors_ = maxActiveProcessors;
+  }
+
+  /**
+   * Get fraction of maximum limits before an overload condition is cleared.
+   *
+   * @return hysteresis fraction
+   */
+  double getOverloadHysteresis() const {
+    return overloadHysteresis_;
+  }
+
+  /**
+   * Set fraction of maximum limits before an overload condition is cleared.
+   * A good value would probably be between 0.5 and 0.9.
+   *
+   * @param hysteresisFraction fraction <= 1.0.
+   */
+  void setOverloadHysteresis(double hysteresisFraction) {
+    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
+      overloadHysteresis_ = hysteresisFraction;
+    }
+  }
+
+  /**
+   * Get the action the server will take on overload.
+   *
+   * @return a TOverloadAction enum value for the currently set action.
+   */
+  TOverloadAction getOverloadAction() const {
+    return overloadAction_;
+  }
+
+  /**
+   * Set the action the server is to take on overload.
+   *
+   * @param overloadAction a TOverloadAction enum value for the action.
+   */
+  void setOverloadAction(TOverloadAction overloadAction) {
+    overloadAction_ = overloadAction;
+  }
+
+  /**
+   * Determine if the server is currently overloaded.
+   * This function checks the maximums for open connections and connections
+   * currently in processing, and sets an overload condition if they are
+   * exceeded.  The overload will persist until both values are below the
+   * current hysteresis fraction of their maximums.
+   *
+   * @return true if an overload condition exists, false if not.
+   */
+  bool serverOverloaded();
+
+  /** Pop and discard next task on threadpool wait queue.
+   *
+   * @return true if a task was discarded, false if the wait queue was empty.
+   */
+  bool drainPendingTask();
+
   /**
    * Get the maximum limit of memory allocated to idle TConnection objects.
    *
     idleBufferMemLimit_ = limit;
   }
 
+  /**
+   * Return an initialized connection object.  Creates or recovers from
+   * pool a TConnection and initializes it with the provided socket FD
+   * and flags.
+   *
+   * @param socket FD of socket associated with this connection.
+   * @param flags initial lib_event flags for this connection.
+   * @return pointer to initialized TConnection object.
+   */
   TConnection* createConnection(int socket, short flags);
 
+  /**
+   * Returns a connection to pool or deletion.  If the connection pool
+   * (a stack) isn't full, place the connection object on it, otherwise
+   * just delete it.
+   *
+   * @param connection the TConection being returned.
+   */
   void returnConnection(TConnection* connection);
 
+  /**
+   * C-callable event handler for listener events.  Provides a callback
+   * that libevent can understand which invokes server->handleEvent().
+   *
+   * @param fd the descriptor the event occured on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TNonblockingServer's "this".
+   */
   static void eventHandler(int fd, short which, void* v) {
     ((TNonblockingServer*)v)->handleEvent(fd, which);
   }
 
+  /// Creates a socket to listen on and binds it to the local port.
   void listenSocket();
 
+  /**
+   * Takes a socket created by listenSocket() and sets various options on it
+   * to prepare for use in the server.
+   *
+   * @param fd descriptor of socket to be initialized/
+   */
   void listenSocket(int fd);
 
+  /// Create the pipe used to notify I/O process of task completion.
+  void createNotificationPipe();
+
+  /**
+   * Get notification pipe send descriptor.
+   *
+   * @return write fd for pipe.
+   */
+  int getNotificationSendFD() const {
+    return notificationPipeFDs_[1];
+  }
+
+  /**
+   * Get notification pipe receive descriptor.
+   *
+   * @return read fd of pipe.
+   */
+  int getNotificationRecvFD() const {
+    return notificationPipeFDs_[0];
+  }
+
+  /**
+   * Register the core libevent events onto the proper base.
+   *
+   * @param base pointer to the event base to be initialized.
+   */
   void registerEvents(event_base* base);
 
+  /**
+   * Main workhorse function, starts up the server listening on a port and
+   * loops over the libevent handler.
+   */
   void serve();
 };
 
-/**
- * Two states for sockets, recv and send mode
- */
+/// Two states for sockets, recv and send mode
 enum TSocketState {
   SOCKET_RECV,
   SOCKET_SEND
 };
 
 /**
- * Four states for the nonblocking servr:
+ * Five states for the nonblocking servr:
  *  1) initialize
  *  2) read 4 byte frame size
  *  3) read frame of data
  *  4) send back data (if any)
+ *  5) force immediate connection close
  */
 enum TAppState {
   APP_INIT,
   APP_READ_FRAME_SIZE,
   APP_READ_REQUEST,
   APP_WAIT_TASK,
-  APP_SEND_RESULT
+  APP_SEND_RESULT,
+  APP_CLOSE_CONNECTION
 };
 
 /**
 class TConnection {
  private:
 
-  class Task;
+  /// Starting size for new connection buffer
+  static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
 
-  // Server handle
+  /// Server handle
   TNonblockingServer* server_;
 
-  // Socket handle
+  /// Socket handle
   int socket_;
 
-  // Libevent object
+  /// Libevent object
   struct event event_;
 
-  // Libevent flags
+  /// Libevent flags
   short eventFlags_;
 
-  // Socket mode
+  /// Socket mode
   TSocketState socketState_;
 
-  // Application state
+  /// Application state
   TAppState appState_;
 
-  // How much data needed to read
+  /// How much data needed to read
   uint32_t readWant_;
 
-  // Where in the read buffer are we
+  /// Where in the read buffer are we
   uint32_t readBufferPos_;
 
-  // Read buffer
+  /// Read buffer
   uint8_t* readBuffer_;
 
-  // Read buffer size
+  /// Read buffer size
   uint32_t readBufferSize_;
 
-  // Write buffer
+  /// Write buffer
   uint8_t* writeBuffer_;
 
-  // Write buffer size
+  /// Write buffer size
   uint32_t writeBufferSize_;
 
-  // How far through writing are we?
+  /// How far through writing are we?
   uint32_t writeBufferPos_;
 
-  // How many times have we read since our last buffer reset?
+  /// How many times have we read since our last buffer reset?
   uint32_t numReadsSinceReset_;
 
-  // How many times have we written since our last buffer reset?
+  /// How many times have we written since our last buffer reset?
   uint32_t numWritesSinceReset_;
 
-  // Task handle
+  /// Task handle
   int taskHandle_;
 
-  // Task event
+  /// Task event
   struct event taskEvent_;
 
-  // Transport to read from
+  /// Transport to read from
   boost::shared_ptr<TMemoryBuffer> inputTransport_;
 
-  // Transport that processor writes to
+  /// Transport that processor writes to
   boost::shared_ptr<TMemoryBuffer> outputTransport_;
 
-  // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
   boost::shared_ptr<TTransport> factoryInputTransport_;
   boost::shared_ptr<TTransport> factoryOutputTransport_;
 
-  // Protocol decoder
+  /// Protocol decoder
   boost::shared_ptr<TProtocol> inputProtocol_;
 
-  // Protocol encoder
+  /// Protocol encoder
   boost::shared_ptr<TProtocol> outputProtocol_;
 
-  // Go into read mode
+  /// Go into read mode
   void setRead() {
     setFlags(EV_READ | EV_PERSIST);
   }
 
-  // Go into write mode
+  /// Go into write mode
   void setWrite() {
     setFlags(EV_WRITE | EV_PERSIST);
   }
 
-  // Set socket idle
+  /// Set socket idle
   void setIdle() {
     setFlags(0);
   }
 
-  // Set event flags
+  /**
+   * Set event flags for this connection.
+   *
+   * @param eventFlags flags we pass to libevent for the connection.
+   */
   void setFlags(short eventFlags);
 
-  // Libevent handlers
+  /**
+   * Libevent handler called (via our static wrapper) when the connection
+   * socket had something happen.  Rather than use the flags libevent passed,
+   * we use the connection state to determine whether we need to read or
+   * write the socket.
+   */
   void workSocket();
 
-  // Close this client and reset
+  /// Close this connection and free or reset its resources.
   void close();
 
  public:
 
-  // Constructor
+  class Task;
+
+  /// Constructor
   TConnection(int socket, short eventFlags, TNonblockingServer *s) {
-    readBuffer_ = (uint8_t*)std::malloc(1024);
+    readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
     if (readBuffer_ == NULL) {
       throw new apache::thrift::TException("Out of memory.");
     }
-    readBufferSize_ = 1024;
+    readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
 
     numReadsSinceReset_ = 0;
     numWritesSinceReset_ = 0;
    *
    * @param limit we limit buffer size to.
    */
-  void checkIdleBufferMemLimit(uint32_t limit);
+  void checkIdleBufferMemLimit(size_t limit);
 
-  // Initialize
+  /// Initialize
   void init(int socket, short eventFlags, TNonblockingServer *s);
 
-  // Transition into a new state
+  /**
+   * This is called when the application transitions from one state into
+   * another. This means that it has finished writing the data that it needed
+   * to, or finished receiving the data that it needed to.
+   */
   void transition();
 
-  // Handler wrapper
+  /**
+   * C-callable event handler for connection events.  Provides a callback
+   * that libevent can understand which invokes connection_->workSocket().
+   *
+   * @param fd the descriptor the event occured on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TConnection's "this".
+   */
   static void eventHandler(int fd, short /* which */, void* v) {
     assert(fd == ((TConnection*)v)->socket_);
     ((TConnection*)v)->workSocket();
   }
 
-  // Handler wrapper for task block
-  static void taskHandler(int fd, short /* which */, void* v) {
-    assert(fd == ((TConnection*)v)->taskHandle_);
-    if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
-      GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
+  /**
+   * C-callable event handler for signaling task completion.  Provides a
+   * callback that libevent can understand that will read a connection
+   * object's address from a pipe and call connection->transition() for
+   * that object.
+   *
+   * @param fd the descriptor the event occured on.
+   */
+  static void taskHandler(int fd, short /* which */, void* /* v */) {
+    TConnection* connection;
+    if (read(fd, (void*)&connection, sizeof(TConnection*))
+        != sizeof(TConnection*)) {
+      GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
+      return;
+    }
+
+    connection->transition();
+  }
+
+  /**
+   * Notification to server that processing has ended on this request.
+   * Can be called either when processing is completed or when a waiting
+   * task has been preemptively terminated (on overload).
+   *
+   * @return true if successful, false if unable to notify (check errno). 
+   */
+  bool notifyServer() {
+    TConnection* connection = this;
+    if (write(server_->getNotificationSendFD(), (const void*)&connection,
+             sizeof(TConnection*)) != sizeof(TConnection*)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /// Force connection shutdown for this connection.
+  void forceClose() {
+    appState_ = APP_CLOSE_CONNECTION;
+    if (!notifyServer()) {
+      throw TException("TConnection::forceClose: failed write on notify pipe");
     }
-    ((TConnection*)v)->transition();
   }
 
+  /// return the server this connection was initialized for.
+  TNonblockingServer* getServer() {
+    return server_;
+  }
+
+  /// get state of connection.
+  TAppState getState() {
+    return appState_;
+  }
 };
 
 }}} // apache::thrift::server