return;
}
-
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND;
- appState_ = APP_SEND_RESULT;
+
+ if (server_->getFrameResponses()) {
+ // Put the frame size into the write buffer
+ appState_ = APP_SEND_FRAME_SIZE;
+ frameSize_ = (int32_t)htonl(writeBufferSize_);
+ writeBuffer_ = (uint8_t*)&frameSize_;
+ writeBufferSize_ = 4;
+ } else {
+ // Go straight into sending the result, do not frame it
+ appState_ = APP_SEND_RESULT;
+ }
// Socket into write mode
setWrite();
// In this case, the request was asynchronous and we should fall through
// right back into the read frame header state
+ goto LABEL_APP_INIT;
+
+ case APP_SEND_FRAME_SIZE:
+
+ // Refetch the result of the operation since we put the frame size into
+ // writeBuffer_
+ outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+ writeBufferPos_ = 0;
+
+ // Now in send result state
+ appState_ = APP_SEND_RESULT;
+
+ // Go to work on the socket right away, probably still writeable
+ workSocket();
+
+ return;
case APP_SEND_RESULT:
// N.B.: We also intentionally fall through here into the INIT state!
+ LABEL_APP_INIT:
case APP_INIT:
// Clear write buffer variables
#include <stack>
#include <event.h>
-#
-
namespace facebook { namespace thrift { namespace server {
using boost::shared_ptr;
// Port server runs on
int port_;
+ // Whether to frame responses
+ bool frameResponses_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
TNonblockingServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerOptions> options,
int port) :
- TServer(processor, options), serverSocket_(0), port_(port) {}
+ TServer(processor, options),
+ serverSocket_(0),
+ port_(port),
+ frameResponses_(true) {}
~TNonblockingServer() {}
+ void setFrameResponses(bool frameResponses) {
+ frameResponses_ = frameResponses;
+ }
+
+ bool getFrameResponses() {
+ return frameResponses_;
+ }
+
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
+ APP_SEND_FRAME_SIZE,
APP_SEND_RESULT
};
// How far through writing are we?
uint32_t writeBufferPos_;
+ // Frame size
+ int32_t frameSize_;
+
// Transport to read from
shared_ptr<TMemoryBuffer> inputTransport_;