From 3adf8aad36eb46bf94c7c8ab7fc3a743775b46b2 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Thu, 19 Aug 2010 21:41:58 +0000 Subject: [PATCH] THRIFT-845. java: async client does not respect timeout This patch adds timeout handling to async method calls through TAsyncClientManager. Patch: Ning Liang git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@987323 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/thrift/async/TAsyncClient.java | 18 +++ .../thrift/async/TAsyncClientManager.java | 108 ++++++++++------ .../apache/thrift/async/TAsyncMethodCall.java | 16 ++- .../thrift/async/TestTAsyncClientManager.java | 118 ++++++++++++------ 4 files changed, 187 insertions(+), 73 deletions(-) diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClient.java b/lib/java/src/org/apache/thrift/async/TAsyncClient.java index 2e8dea3a..0355f802 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncClient.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncClient.java @@ -27,17 +27,35 @@ public abstract class TAsyncClient { protected final TAsyncClientManager manager; private TAsyncMethodCall currentMethod; private Throwable error; + private long timeout; public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) { + this(protocolFactory, manager, transport, 0); + } + + public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) { this.protocolFactory = protocolFactory; this.manager = manager; this.transport = transport; + this.timeout = timeout; } public TProtocolFactory getProtocolFactory() { return protocolFactory; } + public long getTimeout() { + return timeout; + } + + public boolean hasTimeout() { + return timeout > 0; + } + + public void setTimeout(long timeout) { + this.timeout = timeout; + } + /** * Is the client in an error state? * @return diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java index 1d32ace1..5464d7e0 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java @@ -19,14 +19,15 @@ package org.apache.thrift.async; import java.io.IOException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.ClosedSelectorException; +import java.util.HashSet; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; /** * Contains selector thread which transitions method call objects */ +@SuppressWarnings("unchecked") public class TAsyncClientManager { private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName()); @@ -57,8 +59,12 @@ public class TAsyncClientManager { } private class SelectThread extends Thread { + // Selector waits at most SELECT_TIME milliseconds before waking + private static final long SELECT_TIME = 200; + private final Selector selector; private volatile boolean running; + private final Set timeoutWatchSet = new HashSet(); public SelectThread() throws IOException { this.selector = SelectorProvider.provider().openSelector(); @@ -79,46 +85,76 @@ public class TAsyncClientManager { public void run() { while (running) { try { - selector.select(); + selector.select(SELECT_TIME); } catch (IOException e) { LOGGER.error("Caught IOException in TAsyncClientManager!", e); } - // Handle any ready channels calls - try { - Iterator keys = selector.selectedKeys().iterator(); - while (keys.hasNext()) { - SelectionKey key = keys.next(); - keys.remove(); - if (!key.isValid()) { - // this should only have happened if the method call experienced an - // error and the key was cancelled. just skip it. - continue; - } - TAsyncMethodCall method = (TAsyncMethodCall)key.attachment(); - method.transition(key); + transitionMethods(); + timeoutIdleMethods(); + startPendingMethods(); + } + } + + // Transition methods for ready keys + private void transitionMethods() { + try { + Iterator keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + keys.remove(); + if (!key.isValid()) { + // this can happen if the method call experienced an error and the key was cancelled + // this can also happen if we timeout a method, which results in a channel close + // just skip + continue; + } + TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment(); + methodCall.transition(key); + + // If done or error occurred, remove from timeout watch set + if (methodCall.isFinished() || methodCall.getClient().hasError()) { + timeoutWatchSet.remove(methodCall); } - } catch (ClosedSelectorException e) { - LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e); } + } catch (ClosedSelectorException e) { + LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e); + } + } - // Start any new calls - TAsyncMethodCall methodCall; - while ((methodCall = pendingCalls.poll()) != null) { - // Catch registration errors. Method will catch transition errors and cleanup. - try { - SelectionKey key = methodCall.registerWithSelector(selector); - methodCall.transition(key); - } catch (ClosedChannelException e) { - methodCall.onError(e); - LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e); - } catch (CancelledKeyException e) { - methodCall.onError(e); - LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e); - } catch (Exception e) { - methodCall.onError(e); - LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e); - } + // Timeout any existing method calls + private void timeoutIdleMethods() { + Iterator iterator = timeoutWatchSet.iterator(); + while (iterator.hasNext()) { + TAsyncMethodCall methodCall = iterator.next(); + long clientTimeout = methodCall.getClient().getTimeout(); + long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime(); + if (timeElapsed > clientTimeout) { + iterator.remove(); + methodCall.onError(new TimeoutException("Operation " + + methodCall.getClass() + " timed out after " + timeElapsed + + " milliseconds.")); + } + } + } + + // Start any new calls + private void startPendingMethods() { + TAsyncMethodCall methodCall; + while ((methodCall = pendingCalls.poll()) != null) { + // Catch registration errors. method will catch transition errors and cleanup. + try { + SelectionKey key = methodCall.registerWithSelector(selector); + methodCall.transition(key); + + // If timeout specified and first transition went smoothly, add to timeout watch set + TAsyncClient client = methodCall.getClient(); + if (client.hasTimeout() && !client.hasError()) { + timeoutWatchSet.add(methodCall); + } + } catch (Throwable e) { + LOGGER.warn("Caught throwable in TAsyncClientManager!", e); + methodCall.onError(e); } } } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index eca321b3..5568afb3 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -55,6 +55,7 @@ public abstract class TAsyncMethodCall { protected final TAsyncClient client; private final AsyncMethodCallback callback; private final boolean isOneway; + private long lastTransitionTime; private ByteBuffer sizeBuffer; private final byte[] sizeBufferArray = new byte[4]; @@ -76,6 +77,18 @@ public abstract class TAsyncMethodCall { return state; } + protected boolean isFinished() { + return state == State.RESPONSE_READ; + } + + protected long getLastTransitionTime() { + return lastTransitionTime; + } + + public TAsyncClient getClient() { + return client; + } + protected abstract void write_args(TProtocol protocol) throws TException; protected void prepareMethodCall() throws TException { @@ -135,13 +148,14 @@ public abstract class TAsyncMethodCall { throw new IllegalStateException("Method call in state " + state + " but selector called transition method. Seems like a bug..."); } + lastTransitionTime = System.currentTimeMillis(); } catch (Throwable e) { key.cancel(); key.attach(null); onError(e); } } - + protected void onError(Throwable e) { state = State.ERROR; client.onError(e); diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index 25350672..55b054a8 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -18,6 +18,9 @@ */ package org.apache.thrift.async; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import junit.framework.TestCase; @@ -28,14 +31,12 @@ import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingSocket; -import java.util.List; -import java.util.ArrayList; - import thrift.test.CompactProtoTestStruct; import thrift.test.Srv; import thrift.test.Srv.Iface; import thrift.test.Srv.AsyncClient.Janky_call; import thrift.test.Srv.AsyncClient.onewayMethod_call; +import thrift.test.Srv.AsyncClient.primitiveMethod_call; import thrift.test.Srv.AsyncClient.voidMethod_call; public class TestTAsyncClientManager extends TestCase { @@ -58,8 +59,15 @@ public class TestTAsyncClientManager extends TestCase { public void methodWithDefaultArgs(int something) throws TException { } + // Using this method for timeout testing @Override public int primitiveMethod() throws TException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } return 0; } @@ -76,31 +84,31 @@ public class TestTAsyncClientManager extends TestCase { public void onewayMethod() throws TException { } } - + public class JankyRunnable implements Runnable { private TAsyncClientManager acm_; private int numCalls_; private int numSuccesses_ = 0; private Srv.AsyncClient client_; private TNonblockingSocket clientSocket_; - + public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception { this.acm_ = acm; this.numCalls_ = numCalls; this.clientSocket_ = new TNonblockingSocket("localhost", 12345); this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_); } - + public int getNumSuccesses() { return numSuccesses_; } - + public void run() { for (int i = 0; i < numCalls_; i++) { - try { + try { // connect an async client final Object o = new Object(); - + final AtomicBoolean jankyReturned = new AtomicBoolean(false); client_.Janky(1, new AsyncMethodCallback() { @Override @@ -112,28 +120,28 @@ public class TestTAsyncClientManager extends TestCase { o.notifyAll(); } } catch (TException e) { - e.printStackTrace(); + e.printStackTrace(); synchronized(o) { o.notifyAll(); } fail("unexpected exception: " + e); - } - + } } - + @Override public void onError(Throwable throwable) { + System.out.println(throwable.toString()); synchronized(o) { o.notifyAll(); } - fail("unexpected exception: " + throwable); + fail("unexpected exception: " + throwable); } }); - + synchronized(o) { o.wait(1000); } - + assertTrue(jankyReturned.get()); this.numSuccesses_++; } catch (Exception e) { @@ -143,6 +151,30 @@ public class TestTAsyncClientManager extends TestCase { } } + public void standardCallTest(Srv.AsyncClient client) throws Exception { + final Object o = new Object(); + final AtomicBoolean jankyReturned = new AtomicBoolean(false); + client.Janky(1, new FailureLessCallback() { + @Override + public void onComplete(Janky_call response) { + try { + assertEquals(3, response.getResult()); + jankyReturned.set(true); + } catch (TException e) { + fail("unexpected exception: " + e); + } + synchronized(o) { + o.notifyAll(); + } + } + }); + + synchronized(o) { + o.wait(100000); + } + assertTrue(jankyReturned.get()); + } + public void testIt() throws Exception { // put up a server final TNonblockingServer s = new TNonblockingServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(12345)); @@ -164,26 +196,11 @@ public class TestTAsyncClientManager extends TestCase { final Object o = new Object(); // make a standard method call - final AtomicBoolean jankyReturned = new AtomicBoolean(false); - client.Janky(1, new FailureLessCallback() { - @Override - public void onComplete(Janky_call response) { - try { - assertEquals(3, response.getResult()); - jankyReturned.set(true); - } catch (TException e) { - fail("unexpected exception: " + e); - } - synchronized(o) { - o.notifyAll(); - } - } - }); + standardCallTest(client); - synchronized(o) { - o.wait(100000); - } - assertTrue(jankyReturned.get()); + // make a standard method call that succeeds within timeout + client.setTimeout(5000); + standardCallTest(client); // make a void method call final AtomicBoolean voidMethodReturned = new AtomicBoolean(false); @@ -249,7 +266,7 @@ public class TestTAsyncClientManager extends TestCase { o.wait(1000); } assertTrue(voidAfterOnewayReturned.get()); - + // make multiple calls with deserialization in the selector thread (repro Eric's issue) int numThreads = 500; int numCallsPerThread = 100; @@ -270,5 +287,34 @@ public class TestTAsyncClientManager extends TestCase { numSuccesses += runnable.getNumSuccesses(); } assertEquals(numSuccesses, numThreads * numCallsPerThread); + + // check that timeouts work + client.setTimeout(100); + client.primitiveMethod(new AsyncMethodCallback() { + + @Override + public void onError(Throwable throwable) { + if (!(throwable instanceof TimeoutException)) { + fail("should have received timeout exception"); + synchronized(o) { + o.notifyAll(); + } + } + } + + @Override + public void onComplete(primitiveMethod_call response) { + fail("should not have finished timed out call."); + synchronized(o) { + o.notifyAll(); + } + } + + }); + synchronized(o) { + o.wait(2000); + } + assertTrue(client.hasError()); + assertTrue(client.getError() instanceof TimeoutException); } } -- 2.17.1