From 36af4d3b406569123b7e488b688bfc1d1fcb92f9 Mon Sep 17 00:00:00 2001 From: Jake Luciani Date: Thu, 16 May 2013 08:22:18 -0400 Subject: [PATCH] THRIFT-1972 Add async server processors to Java Patch: Jake Luciani --- compiler/cpp/src/generate/t_java_generator.cc | 228 +++++++++++++++++- .../apache/thrift/AsyncProcessFunction.java | 56 +++++ .../apache/thrift/TBaseAsyncProcessor.java | 92 +++++++ .../org/apache/thrift/TProcessorFactory.java | 4 + .../server/AbstractNonblockingServer.java | 70 ++++-- .../thrift/server/TNonblockingServer.java | 8 +- .../server/TThreadedSelectorServer.java | 5 +- .../apache/thrift/server/ServerTestBase.java | 119 ++++++++- .../apache/thrift/server/TestAsyncServer.java | 28 +++ 9 files changed, 579 insertions(+), 31 deletions(-) create mode 100644 lib/java/src/org/apache/thrift/AsyncProcessFunction.java create mode 100644 lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java create mode 100644 lib/java/test/org/apache/thrift/server/TestAsyncServer.java diff --git a/compiler/cpp/src/generate/t_java_generator.cc b/compiler/cpp/src/generate/t_java_generator.cc index 2985a78a..c9a3e562 100644 --- a/compiler/cpp/src/generate/t_java_generator.cc +++ b/compiler/cpp/src/generate/t_java_generator.cc @@ -144,7 +144,10 @@ public: void generate_service_client (t_service* tservice); void generate_service_async_client(t_service* tservice); void generate_service_server (t_service* tservice); + void generate_service_async_server (t_service* tservice); void generate_process_function (t_service* tservice, t_function* tfunction); + void generate_process_async_function (t_service* tservice, t_function* tfunction); + void generate_java_union(t_struct* tstruct); void generate_union_constructor(ofstream& out, t_struct* tstruct); @@ -373,6 +376,8 @@ string t_java_generator::java_type_imports() { "import org.apache.thrift.protocol.TProtocolException;\n" + "import org.apache.thrift.EncodingUtils;\n" + "import org.apache.thrift.TException;\n" + + "import org.apache.thrift.async.AsyncMethodCallback;\n"+ + "import org.apache.thrift.server.AbstractNonblockingServer.*;\n"+ "import java.util.List;\n" + "import java.util.ArrayList;\n" + "import java.util.Map;\n" + @@ -2225,6 +2230,7 @@ void t_java_generator::generate_service(t_service* tservice) { generate_service_client(tservice); generate_service_async_client(tservice); generate_service_server(tservice); + generate_service_async_server(tservice); generate_service_helpers(tservice); indent_down(); @@ -2279,6 +2285,7 @@ void t_java_generator::generate_service_async_interface(t_service* tservice) { f_service_ << indent() << "}" << endl << endl; } + /** * Generates structs for all the service args and return types * @@ -2640,6 +2647,61 @@ void t_java_generator::generate_service_server(t_service* tservice) { indent(f_service_) << "}" << endl << endl; } +/** + * Generates a service server definition. + * + * @param tservice The service to generate a server for. + */ +void t_java_generator::generate_service_async_server(t_service* tservice) { + // Generate the dispatch methods + vector functions = tservice->get_functions(); + vector::iterator f_iter; + + // Extends stuff + string extends = ""; + string extends_processor = ""; + if (tservice->get_extends() == NULL) { + extends_processor = "org.apache.thrift.TBaseAsyncProcessor"; + } else { + extends = type_name(tservice->get_extends()); + extends_processor = extends + ".AsyncProcessor"; + } + + // Generate the header portion + indent(f_service_) << + "public static class AsyncProcessor extends " << extends_processor << " {" << endl; + indent_up(); + + indent(f_service_) << "private static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class.getName());" << endl; + + indent(f_service_) << "public AsyncProcessor(I iface) {" << endl; + indent(f_service_) << " super(iface, getProcessMap(new HashMap>()));" << endl; + indent(f_service_) << "}" << endl << endl; + + indent(f_service_) << "protected AsyncProcessor(I iface, Map> processMap) {" << endl; + indent(f_service_) << " super(iface, getProcessMap(processMap));" << endl; + indent(f_service_) << "}" << endl << endl; + + + indent(f_service_) << "private static Map> getProcessMap(Map> processMap) {" << endl; + indent_up(); + for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { + indent(f_service_) << "processMap.put(\"" << (*f_iter)->get_name() << "\", new " << (*f_iter)->get_name() << "());" << endl; + } + indent(f_service_) << "return processMap;" << endl; + indent_down(); + indent(f_service_) << "}" << endl << endl; + + // Generate the process subfunctions + for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) { + generate_process_async_function(tservice, *f_iter); + } + + indent_down(); + indent(f_service_) << "}" << endl << endl; +} + + /** * Generates a struct and helpers for a function. * @@ -2666,6 +2728,159 @@ void t_java_generator::generate_function_helpers(t_function* tfunction) { generate_java_struct_definition(f_service_, &result, false, true, true); } + + +/** + * Generates a process function definition. + * + * @param tfunction The function to write a dispatcher for + */ +void t_java_generator::generate_process_async_function(t_service* tservice, + t_function* tfunction) { + string argsname = tfunction->get_name() + "_args"; + + string resultname = tfunction->get_name() + "_result"; + if (tfunction->is_oneway()) { + resultname = "org.apache.thrift.TBase"; + } + + string resulttype = type_name(tfunction->get_returntype(),true); + + + (void) tservice; + // Open class + indent(f_service_) << + "public static class " << tfunction->get_name() << " extends org.apache.thrift.AsyncProcessFunction {" << endl; + indent_up(); + + indent(f_service_) << "public " << tfunction->get_name() << "() {" << endl; + indent(f_service_) << " super(\"" << tfunction->get_name() << "\");" << endl; + indent(f_service_) << "}" << endl << endl; + + indent(f_service_) << "public " << argsname << " getEmptyArgsInstance() {" << endl; + indent(f_service_) << " return new " << argsname << "();" << endl; + indent(f_service_) << "}" << endl << endl; + + indent(f_service_) << "public AsyncMethodCallback<"< getResultHandler(final AsyncFrameBuffer fb, final int seqid) {" << endl; + indent_up(); + indent(f_service_) << "final org.apache.thrift.AsyncProcessFunction fcall = this;"<() { " << endl; + indent_up(); + indent(f_service_) << "public void onComplete(" << resulttype <<" o) {" << endl; + + indent_up(); + if (!tfunction->is_oneway()) { + indent(f_service_) <get_returntype()->is_void()) { + indent(f_service_) << "result.success = o;"<get_returntype())) { + indent(f_service_) << "result.set" << get_cap_name("success") << get_cap_name("isSet") << "(true);" << endl; + } + } + + indent(f_service_) << "try {"<is_oneway()) { + indent(f_service_) <<"byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;"<get_xceptions(); + const std::vector& xceptions = xs->get_members(); + vector::const_iterator x_iter; + bool first = true; + if (xceptions.size() > 0) { + for (x_iter = xceptions.begin(); x_iter != xceptions.end(); ++x_iter) { + first ? first = false : indent(f_service_) << "else "; + indent(f_service_) << "if (e instanceof " << type_name((*x_iter)->get_type(), false, false)<<") {" << endl; + indent(f_service_) << indent() << "result." << (*x_iter)->get_name() << " = (" << type_name((*x_iter)->get_type(), false, false) << ") e;" << endl; + indent(f_service_) << indent() << "result.set" << get_cap_name((*x_iter)->get_name()) << get_cap_name("isSet") << "(true);" << endl; + indent(f_service_) << indent() << "msg = result;"<is_oneway())?"true":"false") << ";" << endl; + indent(f_service_) << "}" << endl << endl; + + indent(f_service_) << "public void start(I iface, " << argsname << " args, org.apache.thrift.async.AsyncMethodCallback<"< resultHandler) throws TException {" << endl; + indent_up(); + + // Generate the function call + t_struct* arg_struct = tfunction->get_arglist(); + const std::vector& fields = arg_struct->get_members(); + vector::const_iterator f_iter; + f_service_ << indent(); + + f_service_ << "iface." << tfunction->get_name() << "("; + bool first = true; + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + if (first) { + first = false; + } else { + f_service_ << ", "; + } + f_service_ << "args." << (*f_iter)->get_name(); + } + if (!first) + f_service_ << ","; + f_service_ << "resultHandler"; + f_service_ << ");" << endl; + + + indent_down(); + indent(f_service_) << "}"; + + // Close function + f_service_ << endl; + + // Close class + indent_down(); + f_service_ << indent() << "}" << endl << endl; +} + /** * Generates a process function definition. * @@ -2773,6 +2988,7 @@ void t_java_generator::generate_process_function(t_service* tservice, f_service_ << indent() << "}" << endl << endl; } + /** * Deserializes a field of any type. * @@ -3270,7 +3486,7 @@ string t_java_generator::base_type_name(t_base_type* type, switch (tbase) { case t_base_type::TYPE_VOID: - return "void"; + return (in_container ? "Void" : "void"); case t_base_type::TYPE_STRING: if (type->is_binary()) { return "ByteBuffer"; @@ -3395,14 +3611,8 @@ string t_java_generator::async_function_call_arglist(t_function* tfunc, bool use arglist = argument_list(tfunc->get_arglist(), include_types) + ", "; } - std::string ret_type = ""; - if (use_base_method) { - ret_type += "AsyncClient."; - } - ret_type += tfunc->get_name() + "_call"; - if (include_types) { - arglist += "org.apache.thrift.async.AsyncMethodCallback<" + ret_type + "> "; + arglist += "org.apache.thrift.async.AsyncMethodCallback "; } arglist += "resultHandler"; @@ -3453,7 +3663,7 @@ string t_java_generator::async_argument_list(t_function* tfunct, t_struct* tstru result += ", "; } if (include_types) { - result += "org.apache.thrift.async.AsyncMethodCallback<" + tfunct->get_name() + "_call" + "> "; + result += "org.apache.thrift.async.AsyncMethodCallback "; } result += "resultHandler"; return result; diff --git a/lib/java/src/org/apache/thrift/AsyncProcessFunction.java b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java new file mode 100644 index 00000000..799e02d5 --- /dev/null +++ b/lib/java/src/org/apache/thrift/AsyncProcessFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift; + +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.protocol.TMessage; +import org.apache.thrift.protocol.TMessageType; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.AbstractNonblockingServer; + +public abstract class AsyncProcessFunction { + final String methodName; + + public AsyncProcessFunction(String methodName) { + this.methodName = methodName; + } + + protected abstract boolean isOneway(); + + public abstract void start(I iface, T args, AsyncMethodCallback resultHandler) throws TException; + + public abstract T getEmptyArgsInstance(); + + public abstract AsyncMethodCallback getResultHandler(final AbstractNonblockingServer.AsyncFrameBuffer fb, int seqid); + + public String getMethodName() { + return methodName; + } + + public void sendResponse(final AbstractNonblockingServer.AsyncFrameBuffer fb, final TBase result, final byte type, final int seqid) throws TException { + TProtocol oprot = fb.getOutputProtocol(); + + oprot.writeMessageBegin(new TMessage(getMethodName(), type, seqid)); + result.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + + fb.responseReady(); + } +} diff --git a/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java new file mode 100644 index 00000000..da41620f --- /dev/null +++ b/lib/java/src/org/apache/thrift/TBaseAsyncProcessor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift; + +import org.apache.thrift.protocol.*; + +import org.apache.thrift.server.AbstractNonblockingServer.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Map; + +public class TBaseAsyncProcessor implements TProcessor { + protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); + + final I iface; + final Map> processMap; + + public TBaseAsyncProcessor(I iface, Map> processMap) { + this.iface = iface; + this.processMap = processMap; + } + + public Map> getProcessMapView() { + return Collections.unmodifiableMap(processMap); + } + + public boolean process(final AsyncFrameBuffer fb) throws TException { + + final TProtocol in = fb.getInputProtocol(); + final TProtocol out = fb.getOutputProtocol(); + + //Find processing function + final TMessage msg = in.readMessageBegin(); + AsyncProcessFunction fn = processMap.get(msg.name); + if (fn == null) { + TProtocolUtil.skip(in, TType.STRUCT); + in.readMessageEnd(); + TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'"); + out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); + x.write(out); + out.writeMessageEnd(); + out.getTransport().flush(); + fb.responseReady(); + return true; + } + + //Get Args + TBase args = (TBase)fn.getEmptyArgsInstance(); + + try { + args.read(in); + } catch (TProtocolException e) { + in.readMessageEnd(); + TApplicationException x = new TApplicationException(TApplicationException.PROTOCOL_ERROR, e.getMessage()); + out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid)); + x.write(out); + out.writeMessageEnd(); + out.getTransport().flush(); + fb.responseReady(); + return true; + } + in.readMessageEnd(); + + + //start off processing function + fn.start(iface, args,fn.getResultHandler(fb,msg.seqid)); + return true; + } + + @Override + public boolean process(TProtocol in, TProtocol out) throws TException { + return false; + } +} diff --git a/lib/java/src/org/apache/thrift/TProcessorFactory.java b/lib/java/src/org/apache/thrift/TProcessorFactory.java index bcd8a38f..f6dfb14e 100644 --- a/lib/java/src/org/apache/thrift/TProcessorFactory.java +++ b/lib/java/src/org/apache/thrift/TProcessorFactory.java @@ -36,4 +36,8 @@ public class TProcessorFactory { public TProcessor getProcessor(TTransport trans) { return processor_; } + + public boolean isAsyncProcessor() { + return processor_ instanceof TBaseAsyncProcessor; + } } diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index 97afc0b9..80da6cad 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import org.apache.thrift.TBaseAsyncProcessor; import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; @@ -62,12 +63,12 @@ public abstract class AbstractNonblockingServer extends TServer { * time. Without this limit, the server will gladly allocate client buffers * right into an out of memory exception, rather than waiting. */ - private final long MAX_READ_BUFFER_BYTES; + final long MAX_READ_BUFFER_BYTES; /** * How many bytes are currently allocated to read buffers. */ - private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); + final AtomicLong readBufferBytesAllocated = new AtomicLong(0); public AbstractNonblockingServer(AbstractNonblockingServerArgs args) { super(args); @@ -265,40 +266,42 @@ public abstract class AbstractNonblockingServer extends TServer { * response data back to the client. In the process it manages flipping the * read and write bits on the selection key for its client. */ - protected class FrameBuffer { + public class FrameBuffer { + private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); + // the actual transport hooked up to the client. - public final TNonblockingTransport trans_; + protected final TNonblockingTransport trans_; // the SelectionKey that corresponds to our transport - private final SelectionKey selectionKey_; + protected final SelectionKey selectionKey_; // the SelectThread that owns the registration of our transport - private final AbstractSelectThread selectThread_; + protected final AbstractSelectThread selectThread_; // where in the process of reading/writing are we? - private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE; + protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE; // the ByteBuffer we'll be using to write and read, depending on the state - private ByteBuffer buffer_; + protected ByteBuffer buffer_; - private final TByteArrayOutputStream response_; + protected final TByteArrayOutputStream response_; // the frame that the TTransport should wrap. - private final TMemoryInputTransport frameTrans_; + protected final TMemoryInputTransport frameTrans_; // the transport that should be used to connect to clients - private final TTransport inTrans_; + protected final TTransport inTrans_; - private final TTransport outTrans_; + protected final TTransport outTrans_; // the input protocol to use on frames - private final TProtocol inProt_; + protected final TProtocol inProt_; // the output protocol to use on frames - private final TProtocol outProt_; + protected final TProtocol outProt_; // context associated with this connection - private final ServerContext context_; + protected final ServerContext context_; public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, @@ -561,7 +564,7 @@ public abstract class AbstractNonblockingServer extends TServer { * current thread is this FrameBuffer's select thread, then it just does the * interest change immediately. */ - private void requestSelectInterestChange() { + protected void requestSelectInterestChange() { if (Thread.currentThread() == this.selectThread_) { changeSelectInterests(); } else { @@ -569,4 +572,39 @@ public abstract class AbstractNonblockingServer extends TServer { } } } // FrameBuffer + + public class AsyncFrameBuffer extends FrameBuffer { + public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) { + super(trans, selectionKey, selectThread); + } + + public TProtocol getInputProtocol() { + return inProt_; + } + + public TProtocol getOutputProtocol() { + return outProt_; + } + + + public void invoke() { + frameTrans_.reset(buffer_.array()); + response_.reset(); + + try { + if (eventHandler_ != null) { + eventHandler_.processContext(context_, inTrans_, outTrans_); + } + ((TBaseAsyncProcessor)processorFactory_.getProcessor(inTrans_)).process(this); + return; + } catch (TException te) { + LOGGER.warn("Exception while invoking!", te); + } catch (Throwable t) { + LOGGER.error("Unexpected throwable while invoking!", t); + } + // This will only be reached when there is a throwable. + state_ = FrameBufferState.AWAITING_CLOSE; + requestSelectInterestChange(); + } + } } diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 240b1235..a6e74769 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -224,9 +224,11 @@ public class TNonblockingServer extends AbstractNonblockingServer { clientKey = client.registerSelector(selector, SelectionKey.OP_READ); // add this key to the map - FrameBuffer frameBuffer = new FrameBuffer(client, clientKey, - SelectAcceptThread.this); - clientKey.attach(frameBuffer); + FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ? + new AsyncFrameBuffer(client, clientKey,SelectAcceptThread.this) : + new FrameBuffer(client, clientKey,SelectAcceptThread.this); + + clientKey.attach(frameBuffer); } catch (TTransportException tte) { // something went wrong accepting. LOGGER.warn("Exception trying to accept!", tte); diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java index 29eabb12..8a68632d 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -606,7 +606,10 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { try { clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); - FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); + FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ? + new AsyncFrameBuffer(accepted, clientKey, SelectorThread.this) : + new FrameBuffer(accepted, clientKey, SelectorThread.this); + clientKey.attach(frameBuffer); } catch (IOException e) { LOGGER.warn("Failed to register accepted connection to selector!", e); diff --git a/lib/java/test/org/apache/thrift/server/ServerTestBase.java b/lib/java/test/org/apache/thrift/server/ServerTestBase.java index 209038be..4cbb511c 100755 --- a/lib/java/test/org/apache/thrift/server/ServerTestBase.java +++ b/lib/java/test/org/apache/thrift/server/ServerTestBase.java @@ -30,6 +30,7 @@ import junit.framework.TestCase; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; @@ -374,11 +375,14 @@ public abstract class ServerTestBase extends TestCase { System.out.print("}\n"); } + public boolean useAsyncProcessor() { + return false; + } + public void testIt() throws Exception { for (TProtocolFactory protoFactory : getProtocols()) { - TestHandler handler = new TestHandler(); - ThriftTest.Processor processor = new ThriftTest.Processor(handler); + TProcessor processor = useAsyncProcessor() ? new ThriftTest.AsyncProcessor(new AsyncTestHandler()) : new ThriftTest.Processor(new TestHandler()); startServer(processor, protoFactory); @@ -557,4 +561,115 @@ public abstract class ServerTestBase extends TestCase { }*/ } + + public static class AsyncTestHandler implements ThriftTest.AsyncIface { + + TestHandler handler = new TestHandler(); + + @Override + public void testVoid(AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(null); + } + + @Override + public void testString(String thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testString(thing)); + } + + @Override + public void testByte(byte thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testByte(thing)); + } + + @Override + public void testI32(int thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testI32(thing)); + } + + @Override + public void testI64(long thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testI64(thing)); + } + + @Override + public void testDouble(double thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testDouble(thing)); + } + + @Override + public void testStruct(Xtruct thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testStruct(thing)); + } + + @Override + public void testNest(Xtruct2 thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testNest(thing)); + } + + @Override + public void testMap(Map thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testMap(thing)); + } + + @Override + public void testStringMap(Map thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testStringMap(thing)); + } + + @Override + public void testSet(Set thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testSet(thing)); + } + + @Override + public void testList(List thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testList(thing)); + } + + @Override + public void testEnum(Numberz thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testEnum(thing)); + } + + @Override + public void testTypedef(long thing, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testTypedef(thing)); + } + + @Override + public void testMapMap(int hello, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testMapMap(hello)); + } + + @Override + public void testInsanity(Insanity argument, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testInsanity(argument)); + } + + @Override + public void testMulti(byte arg0, int arg1, long arg2, Map arg3, Numberz arg4, long arg5, AsyncMethodCallback resultHandler) throws TException { + resultHandler.onComplete(handler.testMulti(arg0,arg1,arg2,arg3,arg4,arg5)); + } + + @Override + public void testException(String arg, AsyncMethodCallback resultHandler) throws TException { + try { + // handler.testException(); + } catch (Exception e) { + + } + } + + @Override + public void testMultiException(String arg0, String arg1, AsyncMethodCallback resultHandler) throws TException { + //To change body of implemented methods use File | Settings | File Templates. + } + + @Override + public void testOneway(int secondsToSleep, AsyncMethodCallback resultHandler) throws TException { + handler.testOneway(secondsToSleep); + resultHandler.onComplete(null); + } + } + } diff --git a/lib/java/test/org/apache/thrift/server/TestAsyncServer.java b/lib/java/test/org/apache/thrift/server/TestAsyncServer.java new file mode 100644 index 00000000..29c54cbb --- /dev/null +++ b/lib/java/test/org/apache/thrift/server/TestAsyncServer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.server; + +public class TestAsyncServer extends TestNonblockingServer { + + @Override + public boolean useAsyncProcessor(){ + return true; + } + +} -- 2.17.1