From 9cda78844de5097554414e3ef30e62d482679b81 Mon Sep 17 00:00:00 2001 From: Roger Meier Date: Sun, 24 Mar 2013 21:42:35 +0100 Subject: [PATCH] THRIFT-1864 java: implement event handler for non-blocking server Patch: Vitali Lovich --- .../server/AbstractNonblockingServer.java | 72 +++++++++++-------- .../thrift/server/TNonblockingServer.java | 4 ++ .../server/TThreadedSelectorServer.java | 4 ++ 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index e5e26b2f..97afc0b9 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -281,7 +281,24 @@ public abstract class AbstractNonblockingServer extends TServer { // the ByteBuffer we'll be using to write and read, depending on the state private ByteBuffer buffer_; - private TByteArrayOutputStream response_; + private final TByteArrayOutputStream response_; + + // the frame that the TTransport should wrap. + private final TMemoryInputTransport frameTrans_; + + // the transport that should be used to connect to clients + private final TTransport inTrans_; + + private final TTransport outTrans_; + + // the input protocol to use on frames + private final TProtocol inProt_; + + // the output protocol to use on frames + private final TProtocol outProt_; + + // context associated with this connection + private final ServerContext context_; public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, @@ -290,6 +307,19 @@ public abstract class AbstractNonblockingServer extends TServer { selectionKey_ = selectionKey; selectThread_ = selectThread; buffer_ = ByteBuffer.allocate(4); + + frameTrans_ = new TMemoryInputTransport(); + response_ = new TByteArrayOutputStream(); + inTrans_ = inputTransportFactory_.getTransport(frameTrans_); + outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); + inProt_ = inputProtocolFactory_.getProtocol(inTrans_); + outProt_ = outputProtocolFactory_.getProtocol(outTrans_); + + if (eventHandler_ != null) { + context_ = eventHandler_.createContext(inProt_, outProt_); + } else { + context_ = null; + } } /** @@ -426,6 +456,9 @@ public abstract class AbstractNonblockingServer extends TServer { readBufferBytesAllocated.addAndGet(-buffer_.array().length); } trans_.close(); + if (eventHandler_ != null) { + eventHandler_.deleteContext(context_, inProt_, outProt_); + } } /** @@ -470,12 +503,14 @@ public abstract class AbstractNonblockingServer extends TServer { * Actually invoke the method signified by this FrameBuffer. */ public void invoke() { - TTransport inTrans = getInputTransport(); - TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); - TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); - + frameTrans_.reset(buffer_.array()); + response_.reset(); + try { - processorFactory_.getProcessor(inTrans).process(inProt, outProt); + if (eventHandler_ != null) { + eventHandler_.processContext(context_, inTrans_, outTrans_); + } + processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_); responseReady(); return; } catch (TException te) { @@ -488,22 +523,6 @@ public abstract class AbstractNonblockingServer extends TServer { requestSelectInterestChange(); } - /** - * Wrap the read buffer in a memory-based transport so a processor can read - * the data it needs to handle an invocation. - */ - private TTransport getInputTransport() { - return inputTransportFactory_.getTransport(new TMemoryInputTransport(buffer_.array())); - } - - /** - * Get the transport that should be used by the invoker for responding. - */ - private TTransport getOutputTransport() { - response_ = new TByteArrayOutputStream(); - return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); - } - /** * Perform a read into buffer. * @@ -550,13 +569,4 @@ public abstract class AbstractNonblockingServer extends TServer { } } } // FrameBuffer - - public void setServerEventHandler(TServerEventHandler eventHandler) { - throw new UnsupportedOperationException("Not supported yet."); - } - - public TServerEventHandler getEventHandler() { - throw new UnsupportedOperationException("Not supported yet."); - } - } diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 169ae5cb..240b1235 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -150,6 +150,10 @@ public class TNonblockingServer extends AbstractNonblockingServer { */ public void run() { try { + if (eventHandler_ != null) { + eventHandler_.preServe(); + } + while (!stopped_) { select(); processInterestChanges(); diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java index 23ec842a..29eabb12 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -371,6 +371,10 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { */ public void run() { try { + if (eventHandler_ != null) { + eventHandler_.preServe(); + } + while (!stopped_) { select(); } -- 2.17.1