// the ByteBuffer we'll be using to write and read, depending on the state
private ByteBuffer buffer_;
- private TByteArrayOutputStream response_;
+ private final TByteArrayOutputStream response_;
+
+ // the frame that the TTransport should wrap.
+ private final TMemoryInputTransport frameTrans_;
+
+ // the transport that should be used to connect to clients
+ private final TTransport inTrans_;
+
+ private final TTransport outTrans_;
+
+ // the input protocol to use on frames
+ private final TProtocol inProt_;
+
+ // the output protocol to use on frames
+ private final TProtocol outProt_;
+
+ // context associated with this connection
+ private final ServerContext context_;
public FrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
selectionKey_ = selectionKey;
selectThread_ = selectThread;
buffer_ = ByteBuffer.allocate(4);
+
+ frameTrans_ = new TMemoryInputTransport();
+ response_ = new TByteArrayOutputStream();
+ inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
+ outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+ inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
+ outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
+
+ if (eventHandler_ != null) {
+ context_ = eventHandler_.createContext(inProt_, outProt_);
+ } else {
+ context_ = null;
+ }
}
/**
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
}
trans_.close();
+ if (eventHandler_ != null) {
+ eventHandler_.deleteContext(context_, inProt_, outProt_);
+ }
}
/**
* Actually invoke the method signified by this FrameBuffer.
*/
public void invoke() {
- TTransport inTrans = getInputTransport();
- TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
- TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
-
+ frameTrans_.reset(buffer_.array());
+ response_.reset();
+
try {
- processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+ if (eventHandler_ != null) {
+ eventHandler_.processContext(context_, inTrans_, outTrans_);
+ }
+ processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
requestSelectInterestChange();
}
- /**
- * Wrap the read buffer in a memory-based transport so a processor can read
- * the data it needs to handle an invocation.
- */
- private TTransport getInputTransport() {
- return inputTransportFactory_.getTransport(new TMemoryInputTransport(buffer_.array()));
- }
-
- /**
- * Get the transport that should be used by the invoker for responding.
- */
- private TTransport getOutputTransport() {
- response_ = new TByteArrayOutputStream();
- return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
- }
-
/**
* Perform a read into buffer.
*
}
}
} // FrameBuffer
-
- public void setServerEventHandler(TServerEventHandler eventHandler) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
- public TServerEventHandler getEventHandler() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
}