From f969bcb7d89f3f50dba4528a673464f668fb2905 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Wed, 6 Oct 2010 20:04:40 +0000 Subject: [PATCH] THRIFT-862. java: Async client issues / improvements This patch improves quite a large number of things about the async client code. Patch: Ning Liang git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005221 13f79535-47bb-0310-9956-ffa450edef68 --- .../thrift/async/AsyncMethodCallback.java | 5 +- .../org/apache/thrift/async/TAsyncClient.java | 10 +- .../thrift/async/TAsyncClientManager.java | 81 ++-- .../apache/thrift/async/TAsyncMethodCall.java | 34 +- .../transport/TNonblockingServerSocket.java | 8 +- .../thrift/async/TestTAsyncClientManager.java | 397 +++++++++--------- 6 files changed, 290 insertions(+), 245 deletions(-) diff --git a/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java b/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java index b8cd9ed6..00004b75 100644 --- a/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java +++ b/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java @@ -18,6 +18,7 @@ */ package org.apache.thrift.async; + public interface AsyncMethodCallback { /** * This method will be called when the remote side has completed invoking @@ -32,7 +33,7 @@ public interface AsyncMethodCallback { * This method will be called when there is an unexpected clientside * exception. This does not include application-defined exceptions that * appear in the IDL, but rather things like IOExceptions. - * @param throwable + * @param exception */ - public void onError(Throwable throwable); + public void onError(Exception exception); } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClient.java b/lib/java/src/org/apache/thrift/async/TAsyncClient.java index 5c05aaa7..468bc6ee 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncClient.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncClient.java @@ -26,7 +26,7 @@ public abstract class TAsyncClient { protected final TNonblockingTransport transport; protected final TAsyncClientManager manager; protected TAsyncMethodCall currentMethod; - private Throwable error; + private Exception error; private long timeout; public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) { @@ -44,7 +44,7 @@ public abstract class TAsyncClient { return protocolFactory; } - public long getTimeout() { + public long getTimeout() { return timeout; } @@ -68,7 +68,7 @@ public abstract class TAsyncClient { * Get the client's error - returns null if no error * @return */ - public Throwable getError() { + public Exception getError() { return error; } @@ -94,9 +94,9 @@ public abstract class TAsyncClient { /** * Called by delegate method on error */ - protected void onError(Throwable throwable) { + protected void onError(Exception exception) { transport.close(); currentMethod = null; - error = throwable; + error = exception; } } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java index d88b6cae..35fd3530 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java @@ -23,12 +23,13 @@ import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; -import java.util.HashSet; +import java.util.Comparator; import java.util.Iterator; -import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang.ObjectUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory; */ public class TAsyncClientManager { private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName()); - + private final SelectThread selectThread; private final ConcurrentLinkedQueue pendingCalls = new ConcurrentLinkedQueue(); @@ -48,6 +49,9 @@ public class TAsyncClientManager { } public void call(TAsyncMethodCall method) throws TException { + if (!isRunning()) { + throw new TException("SelectThread is not running"); + } method.prepareMethodCall(); pendingCalls.add(method); selectThread.getSelector().wakeup(); @@ -56,18 +60,21 @@ public class TAsyncClientManager { public void stop() { selectThread.finish(); } - + + public boolean isRunning() { + return selectThread.isAlive(); + } + private class SelectThread extends Thread { - // Selector waits at most SELECT_TIME milliseconds before waking - private static final long SELECT_TIME = 5; - private final Selector selector; private volatile boolean running; - private final Set timeoutWatchSet = new HashSet(); + private final TreeSet timeoutWatchSet = new TreeSet(new TAsyncMethodCallTimeoutComparator()); public SelectThread() throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.running = true; + this.setName("TAsyncClientManager#SelectorThread " + this.getId()); + // We don't want to hold up the JVM when shutting down setDaemon(true); } @@ -85,16 +92,29 @@ public class TAsyncClientManager { while (running) { try { try { - selector.select(SELECT_TIME); + if (timeoutWatchSet.size() == 0) { + // No timeouts, so select indefinitely + selector.select(); + } else { + // We have a timeout pending, so calculate the time until then and select appropriately + long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp(); + long selectTime = nextTimeout - System.currentTimeMillis(); + if (selectTime > 0) { + // Next timeout is in the future, select and wake up then + selector.select(selectTime); + } else { + // Next timeout is now or in past, select immediately so we can time out + selector.selectNow(); + } + } } catch (IOException e) { LOGGER.error("Caught IOException in TAsyncClientManager!", e); } - transitionMethods(); - timeoutIdleMethods(); + timeoutMethods(); startPendingMethods(); - } catch (Throwable throwable) { - LOGGER.error("Ignoring uncaught exception in SelectThread", throwable); + } catch (Exception exception) { + LOGGER.error("Ignoring uncaught exception in SelectThread", exception); } } } @@ -126,18 +146,16 @@ public class TAsyncClientManager { } // Timeout any existing method calls - private void timeoutIdleMethods() { + private void timeoutMethods() { Iterator iterator = timeoutWatchSet.iterator(); + long currentTime = System.currentTimeMillis(); while (iterator.hasNext()) { TAsyncMethodCall methodCall = iterator.next(); - long clientTimeout = methodCall.getClient().getTimeout(); - long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime(); - - if (timeElapsed > clientTimeout) { + if (currentTime >= methodCall.getTimeoutTimestamp()) { iterator.remove(); - methodCall.onError(new TimeoutException("Operation " + - methodCall.getClass() + " timed out after " + timeElapsed + - " milliseconds.")); + methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms.")); + } else { + break; } } } @@ -149,17 +167,30 @@ public class TAsyncClientManager { // Catch registration errors. method will catch transition errors and cleanup. try { methodCall.start(selector); - + // If timeout specified and first transition went smoothly, add to timeout watch set TAsyncClient client = methodCall.getClient(); if (client.hasTimeout() && !client.hasError()) { timeoutWatchSet.add(methodCall); } - } catch (Throwable e) { - LOGGER.warn("Caught throwable in TAsyncClientManager!", e); - methodCall.onError(e); + } catch (Exception exception) { + LOGGER.warn("Caught exception in TAsyncClientManager!", exception); + methodCall.onError(exception); } } } } + + // Comparator used in TreeSet + private static class TAsyncMethodCallTimeoutComparator implements Comparator { + @Override + public int compare(TAsyncMethodCall left, TAsyncMethodCall right) { + if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) { + return (int)(left.getSequenceId() - right.getSequenceId()); + } else { + return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp()); + } + } + } + } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index e75f4ab3..fcd50ea0 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.util.concurrent.atomic.AtomicLong; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; @@ -38,9 +39,10 @@ import org.apache.thrift.transport.TTransportException; * - public T getResult() throws , , ... * @param */ -public abstract class TAsyncMethodCall { +public abstract class TAsyncMethodCall { private static final int INITIAL_MEMORY_BUFFER_SIZE = 128; + private static AtomicLong sequenceIdCounter = new AtomicLong(0); public static enum State { CONNECTING, @@ -62,20 +64,21 @@ public abstract class TAsyncMethodCall { protected final TAsyncClient client; private final AsyncMethodCallback callback; private final boolean isOneway; - - private long lastTransitionTime; - + private long sequenceId; + private ByteBuffer sizeBuffer; private final byte[] sizeBufferArray = new byte[4]; private ByteBuffer frameBuffer; + private long startTime = System.currentTimeMillis(); + protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback callback, boolean isOneway) { this.transport = transport; this.callback = callback; this.protocolFactory = protocolFactory; this.client = client; this.isOneway = isOneway; - this.lastTransitionTime = System.currentTimeMillis(); + this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement(); } protected State getState() { @@ -86,13 +89,25 @@ public abstract class TAsyncMethodCall { return state == State.RESPONSE_READ; } - protected long getLastTransitionTime() { - return lastTransitionTime; + protected long getStartTime() { + return startTime; + } + + protected long getSequenceId() { + return sequenceId; } public TAsyncClient getClient() { return client; } + + public boolean hasTimeout() { + return client.hasTimeout(); + } + + public long getTimeoutTimestamp() { + return client.getTimeout() + startTime; + } protected abstract void write_args(TProtocol protocol) throws TException; @@ -181,15 +196,14 @@ public abstract class TAsyncMethodCall { throw new IllegalStateException("Method call in state " + state + " but selector called transition method. Seems like a bug..."); } - lastTransitionTime = System.currentTimeMillis(); - } catch (Throwable e) { + } catch (Exception e) { key.cancel(); key.attach(null); onError(e); } } - protected void onError(Throwable e) { + protected void onError(Exception e) { client.onError(e); callback.onError(e); state = State.ERROR; diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java index e26b9cfc..aa1e1e50 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java @@ -30,10 +30,15 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import org.apache.thrift.async.TAsyncClientManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Wrapper around ServerSocketChannel */ public class TNonblockingServerSocket extends TNonblockingServerTransport { + private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerTransport.class.getName()); /** * This channel is where all the nonblocking magic happens. @@ -152,8 +157,7 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { try { serverSocket_.close(); } catch (IOException iox) { - System.err.println("WARNING: Could not close server socket: " + - iox.getMessage()); + LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage()); } serverSocket_ = null; } diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index 187c8f83..72a57bce 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -18,6 +18,7 @@ */ package org.apache.thrift.async; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; @@ -45,31 +46,182 @@ import thrift.test.Srv.AsyncClient.primitiveMethod_call; import thrift.test.Srv.AsyncClient.voidMethod_call; public class TestTAsyncClientManager extends TestCase { - private static void fail(Throwable throwable) { - StringWriter sink = new StringWriter(); - throwable.printStackTrace(new PrintWriter(sink, true)); - fail("unexpected error " + sink.toString()); + + private THsHaServer server_; + private Thread serverThread_; + private TAsyncClientManager clientManager_; + + public void setUp() throws Exception { + server_ = new THsHaServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(ServerTestBase.PORT)); + serverThread_ = new Thread(new Runnable() { + public void run() { + server_.serve(); + } + }); + serverThread_.start(); + clientManager_ = new TAsyncClientManager(); + Thread.sleep(500); } - - private static abstract class FailureLessCallback implements AsyncMethodCallback { - @Override - public void onError(Throwable throwable) { - fail(throwable); + + public void tearDown() throws Exception { + server_.stop(); + clientManager_.stop(); + serverThread_.join(); + } + + public void testBasicCall() throws Exception { + Srv.AsyncClient client = getClient(); + basicCall(client); + } + + public void testBasicCallWithTimeout() throws Exception { + Srv.AsyncClient client = getClient(); + client.setTimeout(5000); + basicCall(client); + } + + public void testTimeoutCall() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + Srv.AsyncClient client = getClient(); + client.setTimeout(100); + client.primitiveMethod(new AsyncMethodCallback() { + @Override + public void onError(Exception exception) { + try { + if (!(exception instanceof TimeoutException)) { + StringWriter sink = new StringWriter(); + exception.printStackTrace(new PrintWriter(sink, true)); + fail("expected TimeoutException but got " + sink.toString()); + } + } finally { + latch.countDown(); + } + } + + @Override + public void onComplete(primitiveMethod_call response) { + try { + fail("Should not have finished timed out call."); + } finally { + latch.countDown(); + } + } + }); + latch.await(2, TimeUnit.SECONDS); + assertTrue(client.hasError()); + assertTrue(client.getError() instanceof TimeoutException); + } + + public void testVoidCall() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean returned = new AtomicBoolean(false); + Srv.AsyncClient client = getClient(); + client.voidMethod(new FailureLessCallback() { + @Override + public void onComplete(voidMethod_call response) { + try { + response.getResult(); + returned.set(true); + } catch (TException e) { + fail(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(1, TimeUnit.SECONDS); + assertTrue(returned.get()); + } + + public void testOnewayCall() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean returned = new AtomicBoolean(false); + Srv.AsyncClient client = getClient(); + client.onewayMethod(new FailureLessCallback() { + @Override + public void onComplete(onewayMethod_call response) { + try { + response.getResult(); + returned.set(true); + } catch (TException e) { + fail(e); + } finally { + latch.countDown(); + } + } + }); + latch.await(1, TimeUnit.SECONDS); + assertTrue(returned.get()); + } + + public void testParallelCalls() throws Exception { + // make multiple calls with deserialization in the selector thread (repro Eric's issue) + int numThreads = 50; + int numCallsPerThread = 100; + List runnables = new ArrayList(); + List threads = new ArrayList(); + for (int i = 0; i < numThreads; i++) { + JankyRunnable runnable = new JankyRunnable(numCallsPerThread); + Thread thread = new Thread(runnable); + thread.start(); + threads.add(thread); + runnables.add(runnable); } + for (Thread thread : threads) { + thread.join(); + } + int numSuccesses = 0; + for (JankyRunnable runnable : runnables) { + numSuccesses += runnable.getNumSuccesses(); + } + assertEquals(numThreads * numCallsPerThread, numSuccesses); + } + + private Srv.AsyncClient getClient() throws IOException { + TNonblockingSocket clientSocket = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT); + return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_, clientSocket); } - + + private void basicCall(Srv.AsyncClient client) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean returned = new AtomicBoolean(false); + client.Janky(1, new FailureLessCallback() { + @Override + public void onComplete(Janky_call response) { + try { + assertEquals(3, response.getResult()); + returned.set(true); + } catch (TException e) { + fail(e); + } finally { + latch.countDown(); + } + } + + @Override + public void onError(Exception exception) { + try { + StringWriter sink = new StringWriter(); + exception.printStackTrace(new PrintWriter(sink, true)); + fail("unexpected onError with exception " + sink.toString()); + } finally { + latch.countDown(); + } + } + }); + latch.await(100, TimeUnit.SECONDS); + assertTrue(returned.get()); + } + public class SrvHandler implements Iface { + // Use this method for a standard call testing @Override public int Janky(int arg) throws TException { assertEquals(1, arg); return 3; } - @Override - public void methodWithDefaultArgs(int something) throws TException { - } - - // Using this method for timeout testing + // Using this method for timeout testing - sleeps for 1 second before returning @Override public int primitiveMethod() throws TException { try { @@ -79,6 +231,9 @@ public class TestTAsyncClientManager extends TestCase { } return 0; } + + @Override + public void methodWithDefaultArgs(int something) throws TException { } @Override public CompactProtoTestStruct structMethod() throws TException { @@ -93,20 +248,29 @@ public class TestTAsyncClientManager extends TestCase { public void onewayMethod() throws TException { } } - - public class JankyRunnable implements Runnable { - private TAsyncClientManager acm_; + + private static abstract class FailureLessCallback implements AsyncMethodCallback { + @Override + public void onError(Exception exception) { + fail(exception); + } + } + + private static void fail(Exception exception) { + StringWriter sink = new StringWriter(); + exception.printStackTrace(new PrintWriter(sink, true)); + fail("unexpected error " + sink.toString()); + } + + private class JankyRunnable implements Runnable { private int numCalls_; private int numSuccesses_ = 0; private Srv.AsyncClient client_; - private TNonblockingSocket clientSocket_; - public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception { - this.acm_ = acm; - this.numCalls_ = numCalls; - this.clientSocket_ = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT); - this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_); - this.client_.setTimeout(20000); + public JankyRunnable(int numCalls) throws Exception { + numCalls_ = numCalls; + client_ = getClient(); + client_.setTimeout(20000); } public int getNumSuccesses() { @@ -119,14 +283,14 @@ public class TestTAsyncClientManager extends TestCase { try { // connect an async client final CountDownLatch latch = new CountDownLatch(1); - final AtomicBoolean jankyReturned = new AtomicBoolean(false); + final AtomicBoolean returned = new AtomicBoolean(false); client_.Janky(1, new AsyncMethodCallback() { - + @Override public void onComplete(Janky_call response) { try { assertEquals(3, response.getResult()); - jankyReturned.set(true); + returned.set(true); latch.countDown(); } catch (TException e) { latch.countDown(); @@ -135,10 +299,10 @@ public class TestTAsyncClientManager extends TestCase { } @Override - public void onError(Throwable throwable) { + public void onError(Exception exception) { try { StringWriter sink = new StringWriter(); - throwable.printStackTrace(new PrintWriter(sink, true)); + exception.printStackTrace(new PrintWriter(sink, true)); fail("unexpected onError on iteration " + iteration + ": " + sink.toString()); } finally { latch.countDown(); @@ -148,7 +312,7 @@ public class TestTAsyncClientManager extends TestCase { boolean calledBack = latch.await(30, TimeUnit.SECONDS); assertTrue("wasn't called back in time on iteration " + iteration, calledBack); - assertTrue("onComplete not called on iteration " + iteration, jankyReturned.get()); + assertTrue("onComplete not called on iteration " + iteration, returned.get()); this.numSuccesses_++; } catch (Exception e) { fail(e); @@ -156,173 +320,4 @@ public class TestTAsyncClientManager extends TestCase { } } } - - public void standardCallTest(Srv.AsyncClient client) throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicBoolean jankyReturned = new AtomicBoolean(false); - client.Janky(1, new FailureLessCallback() { - @Override - public void onComplete(Janky_call response) { - try { - assertEquals(3, response.getResult()); - jankyReturned.set(true); - } catch (TException e) { - fail(e); - } finally { - latch.countDown(); - } - } - }); - - latch.await(100, TimeUnit.SECONDS); - assertTrue(jankyReturned.get()); - } - - public void testIt() throws Exception { - // put up a server - final THsHaServer s = new THsHaServer(new Srv.Processor(new SrvHandler()), - new TNonblockingServerSocket(ServerTestBase.PORT)); - new Thread(new Runnable() { - @Override - public void run() { - s.serve(); - } - }).start(); - Thread.sleep(1000); - - // set up async client manager - TAsyncClientManager acm = new TAsyncClientManager(); - - // connect an async client - TNonblockingSocket clientSock = new TNonblockingSocket( - ServerTestBase.HOST, ServerTestBase.PORT); - Srv.AsyncClient client = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm, clientSock); - - // make a standard method call - standardCallTest(client); - - // make a standard method call that succeeds within timeout - assertFalse(s.isStopped()); - client.setTimeout(5000); - standardCallTest(client); - - // make a void method call - assertFalse(s.isStopped()); - final CountDownLatch voidLatch = new CountDownLatch(1); - final AtomicBoolean voidMethodReturned = new AtomicBoolean(false); - client.voidMethod(new FailureLessCallback() { - @Override - public void onComplete(voidMethod_call response) { - try { - response.getResult(); - voidMethodReturned.set(true); - } catch (TException e) { - fail(e); - } finally { - voidLatch.countDown(); - } - } - }); - voidLatch.await(1, TimeUnit.SECONDS); - assertTrue(voidMethodReturned.get()); - - // make a oneway method call - assertFalse(s.isStopped()); - final CountDownLatch onewayLatch = new CountDownLatch(1); - final AtomicBoolean onewayReturned = new AtomicBoolean(false); - client.onewayMethod(new FailureLessCallback() { - @Override - public void onComplete(onewayMethod_call response) { - try { - response.getResult(); - onewayReturned.set(true); - } catch (TException e) { - fail(e); - } finally { - onewayLatch.countDown(); - } - } - }); - onewayLatch.await(1, TimeUnit.SECONDS); - assertTrue(onewayReturned.get()); - - // make another standard method call - assertFalse(s.isStopped()); - final CountDownLatch voidAfterOnewayLatch = new CountDownLatch(1); - final AtomicBoolean voidAfterOnewayReturned = new AtomicBoolean(false); - client.voidMethod(new FailureLessCallback() { - @Override - public void onComplete(voidMethod_call response) { - try { - response.getResult(); - voidAfterOnewayReturned.set(true); - } catch (TException e) { - fail(e); - } finally { - voidAfterOnewayLatch.countDown(); - } - } - }); - voidAfterOnewayLatch.await(1, TimeUnit.SECONDS); - assertTrue(voidAfterOnewayReturned.get()); - - // make multiple calls with deserialization in the selector thread (repro Eric's issue) - assertFalse(s.isStopped()); - int numThreads = 50; - int numCallsPerThread = 100; - List runnables = new ArrayList(); - List threads = new ArrayList(); - for (int i = 0; i < numThreads; i++) { - JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread); - Thread thread = new Thread(runnable); - thread.start(); - threads.add(thread); - runnables.add(runnable); - } - for (Thread thread : threads) { - thread.join(); - } - int numSuccesses = 0; - for (JankyRunnable runnable : runnables) { - numSuccesses += runnable.getNumSuccesses(); - } - assertEquals(numThreads * numCallsPerThread, numSuccesses); - - // check that timeouts work - assertFalse(s.isStopped()); - assertTrue(clientSock.isOpen()); - final CountDownLatch timeoutLatch = new CountDownLatch(1); - client.setTimeout(100); - client.primitiveMethod(new AsyncMethodCallback() { - - @Override - public void onError(Throwable throwable) { - try { - if (!(throwable instanceof TimeoutException)) { - StringWriter sink = new StringWriter(); - throwable.printStackTrace(new PrintWriter(sink, true)); - fail("expected TimeoutException but got " + sink.toString()); - } - } finally { - timeoutLatch.countDown(); - } - } - - @Override - public void onComplete(primitiveMethod_call response) { - try { - fail("should not have finished timed out call."); - } finally { - timeoutLatch.countDown(); - } - } - - }); - timeoutLatch.await(2, TimeUnit.SECONDS); - assertTrue(client.hasError()); - assertTrue(client.getError() instanceof TimeoutException); - - // error closes socket and make sure isOpen reflects that - assertFalse(clientSock.isOpen()); - } -} +} \ No newline at end of file -- 2.17.1