From 855294b21340b01d145bcbee438a6817c127a32e Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Wed, 8 Sep 2010 21:47:45 +0000 Subject: [PATCH] THRIFT-888. java: async client should also have nonblocking connect This patch adds optional nonblocking connect behavior. Patch: Eric Jensen git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@995262 13f79535-47bb-0310-9956-ffa450edef68 --- .../thrift/async/TAsyncClientManager.java | 29 ++-- .../apache/thrift/async/TAsyncMethodCall.java | 64 ++++++-- .../thrift/server/TNonblockingServer.java | 23 ++- .../thrift/transport/TNonblockingSocket.java | 128 ++++++++------- .../transport/TNonblockingTransport.java | 17 ++ .../thrift/async/TestTAsyncClientManager.java | 147 +++++++++--------- 6 files changed, 239 insertions(+), 169 deletions(-) diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java index 5464d7e0..d88b6cae 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; /** * Contains selector thread which transitions method call objects */ -@SuppressWarnings("unchecked") public class TAsyncClientManager { private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName()); @@ -60,7 +59,7 @@ public class TAsyncClientManager { private class SelectThread extends Thread { // Selector waits at most SELECT_TIME milliseconds before waking - private static final long SELECT_TIME = 200; + private static final long SELECT_TIME = 5; private final Selector selector; private volatile boolean running; @@ -85,14 +84,18 @@ public class TAsyncClientManager { public void run() { while (running) { try { - selector.select(SELECT_TIME); - } catch (IOException e) { - LOGGER.error("Caught IOException in TAsyncClientManager!", e); - } + try { + selector.select(SELECT_TIME); + } catch (IOException e) { + LOGGER.error("Caught IOException in TAsyncClientManager!", e); + } - transitionMethods(); - timeoutIdleMethods(); - startPendingMethods(); + transitionMethods(); + timeoutIdleMethods(); + startPendingMethods(); + } catch (Throwable throwable) { + LOGGER.error("Ignoring uncaught exception in SelectThread", throwable); + } } } @@ -129,10 +132,11 @@ public class TAsyncClientManager { TAsyncMethodCall methodCall = iterator.next(); long clientTimeout = methodCall.getClient().getTimeout(); long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime(); + if (timeElapsed > clientTimeout) { iterator.remove(); - methodCall.onError(new TimeoutException("Operation " + - methodCall.getClass() + " timed out after " + timeElapsed + + methodCall.onError(new TimeoutException("Operation " + + methodCall.getClass() + " timed out after " + timeElapsed + " milliseconds.")); } } @@ -144,8 +148,7 @@ public class TAsyncClientManager { while ((methodCall = pendingCalls.poll()) != null) { // Catch registration errors. method will catch transition errors and cleanup. try { - SelectionKey key = methodCall.registerWithSelector(selector); - methodCall.transition(key); + methodCall.start(selector); // If timeout specified and first transition went smoothly, add to timeout watch set TAsyncClient client = methodCall.getClient(); diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index 5568afb3..e75f4ab3 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -39,7 +39,11 @@ import org.apache.thrift.transport.TTransportException; * @param */ public abstract class TAsyncMethodCall { + + private static final int INITIAL_MEMORY_BUFFER_SIZE = 128; + public static enum State { + CONNECTING, WRITING_REQUEST_SIZE, WRITING_REQUEST_BODY, READING_RESPONSE_SIZE, @@ -48,20 +52,22 @@ public abstract class TAsyncMethodCall { ERROR; } - private static final int INITIAL_MEMORY_BUFFER_SIZE = 128; + /** + * Next step in the call, initialized by start() + */ + private State state = null; protected final TNonblockingTransport transport; private final TProtocolFactory protocolFactory; protected final TAsyncClient client; private final AsyncMethodCallback callback; private final boolean isOneway; + private long lastTransitionTime; private ByteBuffer sizeBuffer; private final byte[] sizeBufferArray = new byte[4]; - private ByteBuffer frameBuffer; - private State state; protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback callback, boolean isOneway) { this.transport = transport; @@ -69,8 +75,7 @@ public abstract class TAsyncMethodCall { this.protocolFactory = protocolFactory; this.client = client; this.isOneway = isOneway; - - this.state = State.WRITING_REQUEST_SIZE; + this.lastTransitionTime = System.currentTimeMillis(); } protected State getState() { @@ -91,6 +96,10 @@ public abstract class TAsyncMethodCall { protected abstract void write_args(TProtocol protocol) throws TException; + /** + * Initialize buffers. + * @throws TException if buffer initialization fails + */ protected void prepareMethodCall() throws TException { TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE); TProtocol protocol = protocolFactory.getProtocol(memoryBuffer); @@ -103,10 +112,32 @@ public abstract class TAsyncMethodCall { sizeBuffer = ByteBuffer.wrap(sizeBufferArray); } - SelectionKey registerWithSelector(Selector sel) throws IOException { - SelectionKey key = transport.registerSelector(sel, SelectionKey.OP_WRITE); + /** + * Register with selector and start first state, which could be either connecting or writing. + * @throws IOException if register or starting fails + */ + void start(Selector sel) throws IOException { + SelectionKey key; + if (transport.isOpen()) { + state = State.WRITING_REQUEST_SIZE; + key = transport.registerSelector(sel, SelectionKey.OP_WRITE); + } else { + state = State.CONNECTING; + key = transport.registerSelector(sel, SelectionKey.OP_CONNECT); + + // non-blocking connect can complete immediately, + // in which case we should not expect the OP_CONNECT + if (transport.startConnect()) { + registerForFirstWrite(key); + } + } + key.attach(this); - return key; + } + + void registerForFirstWrite(SelectionKey key) throws IOException { + state = State.WRITING_REQUEST_SIZE; + key.interestOps(SelectionKey.OP_WRITE); } protected ByteBuffer getFrameBuffer() { @@ -131,6 +162,9 @@ public abstract class TAsyncMethodCall { // Transition function try { switch (state) { + case CONNECTING: + doConnecting(key); + break; case WRITING_REQUEST_SIZE: doWritingRequestSize(); break; @@ -143,9 +177,8 @@ public abstract class TAsyncMethodCall { case READING_RESPONSE_BODY: doReadingResponseBody(key); break; - case RESPONSE_READ: - case ERROR: - throw new IllegalStateException("Method call in state " + state + default: // RESPONSE_READ, ERROR, or bug + throw new IllegalStateException("Method call in state " + state + " but selector called transition method. Seems like a bug..."); } lastTransitionTime = System.currentTimeMillis(); @@ -157,9 +190,9 @@ public abstract class TAsyncMethodCall { } protected void onError(Throwable e) { - state = State.ERROR; client.onError(e); callback.onError(e); + state = State.ERROR; } private void doReadingResponseBody(SelectionKey key) throws IOException { @@ -213,4 +246,11 @@ public abstract class TAsyncMethodCall { state = State.WRITING_REQUEST_BODY; } } + + private void doConnecting(SelectionKey key) throws IOException { + if (!key.isConnectable() || !transport.finishConnect()) { + throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT"); + } + registerForFirstWrite(key); + } } diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 31a6e243..8b980316 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -62,7 +62,7 @@ public class TNonblockingServer extends TServer { LoggerFactory.getLogger(TNonblockingServer.class.getName()); // Flag for stopping the server - private volatile boolean stopped_; + private volatile boolean stopped_ = true; private SelectThread selectThread_; @@ -218,6 +218,7 @@ public class TNonblockingServer extends TServer { // start the selector try { selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_); + stopped_ = false; selectThread_.start(); return true; } catch (IOException e) { @@ -265,6 +266,10 @@ public class TNonblockingServer extends TServer { selectThread_.requestSelectInterestChange(frameBuffer); } + public boolean isStopped() { + return selectThread_.isStopped(); + } + /** * The thread that will be doing all the selecting, managing new connections * and those that still need to be read. @@ -288,14 +293,24 @@ public class TNonblockingServer extends TServer { serverTransport.registerSelector(selector); } + public boolean isStopped() { + return stopped_; + } + /** * The work loop. Handles both selecting (all IO operations) and managing * the selection preferences of all existing connections. */ public void run() { - while (!stopped_) { - select(); - processInterestChanges(); + try { + while (!stopped_) { + select(); + processInterestChanges(); + } + } catch (Throwable t) { + LOGGER.error("run() exiting due to uncaught error", t); + } finally { + stopped_ = true; } } diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java index 313ef85a..c3787a2f 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -23,102 +23,92 @@ package org.apache.thrift.transport; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import org.apache.thrift.async.TAsyncMethodCall; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** - * Socket implementation of the TTransport interface. To be commented soon! + * Transport for use with async client. */ public class TNonblockingSocket extends TNonblockingTransport { - private SocketChannel socketChannel = null; + private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName()); /** - * Wrapped Socket object + * Host and port if passed in, used for lazy non-blocking connect. */ - private Socket socket_ = null; + private SocketAddress socketAddress_ = null; - /** - * Socket timeout - */ - private int timeout_ = 0; + private final SocketChannel socketChannel_; + + private final Socket socket_; + + public TNonblockingSocket(String host, int port) throws IOException { + this(host, port, 0); + } /** - * Create a new nonblocking socket transport connected to host:port. + * Create a new nonblocking socket transport that will be connected to host:port. * @param host * @param port * @throws TTransportException * @throws IOException */ - public TNonblockingSocket(String host, int port) throws TTransportException, IOException { - this(SocketChannel.open(new InetSocketAddress(host, port))); + public TNonblockingSocket(String host, int port, int timeout) throws IOException { + this(SocketChannel.open(), timeout); + socketAddress_ = new InetSocketAddress(host, port); } /** * Constructor that takes an already created socket. * * @param socketChannel Already created SocketChannel object - * @throws TTransportException if there is an error setting up the streams + * @throws IOException if there is an error setting up the streams */ - public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException { - try { - // make it a nonblocking channel - socketChannel.configureBlocking(false); - } catch (IOException e) { - throw new TTransportException(e); - } + public TNonblockingSocket(SocketChannel socketChannel) throws IOException { + this(socketChannel, 0); + if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected"); + } - this.socketChannel = socketChannel; - this.socket_ = socketChannel.socket(); - try { - socket_.setSoLinger(false, 0); - socket_.setTcpNoDelay(true); - } catch (SocketException sx) { - sx.printStackTrace(); - } + private TNonblockingSocket(SocketChannel socketChannel, int timeout) throws IOException { + socketChannel_ = socketChannel; + socket_ = socketChannel.socket(); + + // make it a nonblocking channel + socketChannel.configureBlocking(false); + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + setTimeout(timeout); } /** - * Register this socket with the specified selector for both read and write - * operations. + * Register the new SocketChannel with our Selector, indicating + * we'd like to be notified when it's ready for I/O. * * @param selector * @return the selection key for this socket. */ public SelectionKey registerSelector(Selector selector, int interests) throws IOException { - // Register the new SocketChannel with our Selector, indicating - // we'd like to be notified when there's data waiting to be read - return socketChannel.register(selector, interests); + return socketChannel_.register(selector, interests); } /** - * Initializes the socket object - */ - private void initSocket() { - socket_ = new Socket(); - try { - socket_.setSoLinger(false, 0); - socket_.setTcpNoDelay(true); - socket_.setSoTimeout(timeout_); - } catch (SocketException sx) { - sx.printStackTrace(); - } - } - - /** - * Sets the socket timeout + * Sets the socket timeout, although this implementation never uses blocking operations so it is unused. * * @param timeout Milliseconds timeout */ public void setTimeout(int timeout) { - timeout_ = timeout; try { socket_.setSoTimeout(timeout); } catch (SocketException sx) { - sx.printStackTrace(); + LOGGER.warn("Could not set socket timeout.", sx); } } @@ -126,9 +116,6 @@ public class TNonblockingSocket extends TNonblockingTransport { * Returns a reference to the underlying socket. */ public Socket getSocket() { - if (socket_ == null) { - initSocket(); - } return socket_; } @@ -136,24 +123,21 @@ public class TNonblockingSocket extends TNonblockingTransport { * Checks whether the socket is connected. */ public boolean isOpen() { - if (socket_ == null) { - return false; - } return socket_.isConnected(); } /** - * Connects the socket, creating a new socket object if necessary. + * Do not call, the implementation provides its own lazy non-blocking connect. */ public void open() throws TTransportException { - throw new RuntimeException("Not implemented yet"); + throw new RuntimeException("open() is not implemented for TNonblockingSocket"); } /** * Perform a nonblocking read into buffer. */ public int read(ByteBuffer buffer) throws IOException { - return socketChannel.read(buffer); + return socketChannel_.read(buffer); } @@ -161,12 +145,12 @@ public class TNonblockingSocket extends TNonblockingTransport { * Reads from the underlying input stream if not null. */ public int read(byte[] buf, int off, int len) throws TTransportException { - if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) { + if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from write-only socket channel"); } try { - return socketChannel.read(ByteBuffer.wrap(buf, off, len)); + return socketChannel_.read(ByteBuffer.wrap(buf, off, len)); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } @@ -176,26 +160,26 @@ public class TNonblockingSocket extends TNonblockingTransport { * Perform a nonblocking write of the data in buffer; */ public int write(ByteBuffer buffer) throws IOException { - return socketChannel.write(buffer); + return socketChannel_.write(buffer); } /** * Writes to the underlying output stream if not null. */ public void write(byte[] buf, int off, int len) throws TTransportException { - if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) { + if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to write-only socket channel"); } try { - socketChannel.write(ByteBuffer.wrap(buf, off, len)); + socketChannel_.write(ByteBuffer.wrap(buf, off, len)); } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } } /** - * Flushes the underlying output stream if not null. + * Noop. */ public void flush() throws TTransportException { // Not supported by SocketChannel. @@ -206,10 +190,20 @@ public class TNonblockingSocket extends TNonblockingTransport { */ public void close() { try { - socketChannel.close(); - } catch (IOException e) { - // silently ignore. + socketChannel_.close(); + } catch (IOException iox) { + LOGGER.warn("Could not close socket.", iox); } } + /** {@inheritDoc} */ + public boolean startConnect() throws IOException { + return socketChannel_.connect(socketAddress_); + } + + /** {@inheritDoc} */ + public boolean finishConnect() throws IOException { + return socketChannel_.finishConnect(); + } + } diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java index 517eacb7..faf501f8 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -24,8 +24,25 @@ import java.nio.channels.Selector; import java.nio.channels.SelectionKey; import java.nio.ByteBuffer; +import org.apache.thrift.async.TAsyncMethodCall; + public abstract class TNonblockingTransport extends TTransport { + + /** + * Non-blocking connection initialization. + * @see java.nio.channels.SocketChannel#connect(SocketAddress remote) + */ + public abstract boolean startConnect() throws IOException; + + /** + * Non-blocking connection completion. + * @see java.nio.channels.SocketChannel#finishConnect() + */ + public abstract boolean finishConnect() throws IOException; + public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; + public abstract int read(ByteBuffer buffer) throws IOException; + public abstract int write(ByteBuffer buffer) throws IOException; } diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java index 2e26aad3..5e1084c5 100644 --- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java +++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java @@ -18,8 +18,12 @@ */ package org.apache.thrift.async; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,10 +31,10 @@ import junit.framework.TestCase; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.ServerTestBase; +import org.apache.thrift.server.THsHaServer; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.server.ServerTestBase; import thrift.test.CompactProtoTestStruct; import thrift.test.Srv; @@ -41,11 +45,16 @@ import thrift.test.Srv.AsyncClient.primitiveMethod_call; import thrift.test.Srv.AsyncClient.voidMethod_call; public class TestTAsyncClientManager extends TestCase { + private static void fail(Throwable throwable) { + StringWriter sink = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sink, true)); + fail("unexpected error " + sink.toString()); + } + private static abstract class FailureLessCallback implements AsyncMethodCallback { @Override public void onError(Throwable throwable) { - throwable.printStackTrace(); - fail("unexpected error " + throwable); + fail(throwable); } } @@ -66,7 +75,6 @@ public class TestTAsyncClientManager extends TestCase { try { Thread.sleep(1000); } catch (InterruptedException e) { - // TODO Auto-generated catch block e.printStackTrace(); } return 0; @@ -98,6 +106,7 @@ public class TestTAsyncClientManager extends TestCase { this.numCalls_ = numCalls; this.clientSocket_ = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT); this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_); + this.client_.setTimeout(20000); } public int getNumSuccesses() { @@ -105,55 +114,51 @@ public class TestTAsyncClientManager extends TestCase { } public void run() { - for (int i = 0; i < numCalls_; i++) { + for (int i = 0; i < numCalls_ && !client_.hasError(); i++) { + final int iteration = i; try { // connect an async client - final Object o = new Object(); - + final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean jankyReturned = new AtomicBoolean(false); client_.Janky(1, new AsyncMethodCallback() { + @Override public void onComplete(Janky_call response) { try { assertEquals(3, response.getResult()); jankyReturned.set(true); - synchronized(o) { - o.notifyAll(); - } + latch.countDown(); } catch (TException e) { - e.printStackTrace(); - synchronized(o) { - o.notifyAll(); - } - fail("unexpected exception: " + e); + latch.countDown(); + fail(e); } } @Override public void onError(Throwable throwable) { - System.out.println(throwable.toString()); - synchronized(o) { - o.notifyAll(); + try { + StringWriter sink = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sink, true)); + fail("unexpected onError on iteration " + iteration + ": " + sink.toString()); + } finally { + latch.countDown(); } - fail("unexpected exception: " + throwable); } }); - synchronized(o) { - o.wait(1000); - } - - assertTrue(jankyReturned.get()); + boolean calledBack = latch.await(30, TimeUnit.SECONDS); + assertTrue("wasn't called back in time on iteration " + iteration, calledBack); + assertTrue("onComplete not called on iteration " + iteration, jankyReturned.get()); this.numSuccesses_++; } catch (Exception e) { - fail("Unexpected " + e); + fail(e); } } } } public void standardCallTest(Srv.AsyncClient client) throws Exception { - final Object o = new Object(); + final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean jankyReturned = new AtomicBoolean(false); client.Janky(1, new FailureLessCallback() { @Override @@ -162,23 +167,20 @@ public class TestTAsyncClientManager extends TestCase { assertEquals(3, response.getResult()); jankyReturned.set(true); } catch (TException e) { - fail("unexpected exception: " + e); - } - synchronized(o) { - o.notifyAll(); + fail(e); + } finally { + latch.countDown(); } } }); - synchronized(o) { - o.wait(100000); - } + latch.await(100, TimeUnit.SECONDS); assertTrue(jankyReturned.get()); } public void testIt() throws Exception { // put up a server - final TNonblockingServer s = new TNonblockingServer(new Srv.Processor(new SrvHandler()), + final THsHaServer s = new THsHaServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(ServerTestBase.PORT)); new Thread(new Runnable() { @Override @@ -196,16 +198,17 @@ public class TestTAsyncClientManager extends TestCase { ServerTestBase.HOST, ServerTestBase.PORT); Srv.AsyncClient client = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm, clientSock); - final Object o = new Object(); - // make a standard method call standardCallTest(client); // make a standard method call that succeeds within timeout + assertFalse(s.isStopped()); client.setTimeout(5000); standardCallTest(client); // make a void method call + assertFalse(s.isStopped()); + final CountDownLatch voidLatch = new CountDownLatch(1); final AtomicBoolean voidMethodReturned = new AtomicBoolean(false); client.voidMethod(new FailureLessCallback() { @Override @@ -214,20 +217,18 @@ public class TestTAsyncClientManager extends TestCase { response.getResult(); voidMethodReturned.set(true); } catch (TException e) { - fail("unexpected exception " + e); - } - synchronized (o) { - o.notifyAll(); + fail(e); + } finally { + voidLatch.countDown(); } } }); - - synchronized(o) { - o.wait(1000); - } + voidLatch.await(1, TimeUnit.SECONDS); assertTrue(voidMethodReturned.get()); - + // make a oneway method call + assertFalse(s.isStopped()); + final CountDownLatch onewayLatch = new CountDownLatch(1); final AtomicBoolean onewayReturned = new AtomicBoolean(false); client.onewayMethod(new FailureLessCallback() { @Override @@ -236,20 +237,18 @@ public class TestTAsyncClientManager extends TestCase { response.getResult(); onewayReturned.set(true); } catch (TException e) { - fail("unexpected exception " + e); - } - synchronized(o) { - o.notifyAll(); + fail(e); + } finally { + onewayLatch.countDown(); } } }); - synchronized(o) { - o.wait(1000); - } - + onewayLatch.await(1, TimeUnit.SECONDS); assertTrue(onewayReturned.get()); // make another standard method call + assertFalse(s.isStopped()); + final CountDownLatch voidAfterOnewayLatch = new CountDownLatch(1); final AtomicBoolean voidAfterOnewayReturned = new AtomicBoolean(false); client.voidMethod(new FailureLessCallback() { @Override @@ -258,20 +257,18 @@ public class TestTAsyncClientManager extends TestCase { response.getResult(); voidAfterOnewayReturned.set(true); } catch (TException e) { - fail("unexpected exception " + e); - } - synchronized(o) { - o.notifyAll(); + fail(e); + } finally { + voidAfterOnewayLatch.countDown(); } } }); - synchronized(o) { - o.wait(1000); - } + voidAfterOnewayLatch.await(1, TimeUnit.SECONDS); assertTrue(voidAfterOnewayReturned.get()); // make multiple calls with deserialization in the selector thread (repro Eric's issue) - int numThreads = 200; + assertFalse(s.isStopped()); + int numThreads = 50; int numCallsPerThread = 100; List runnables = new ArrayList(); List threads = new ArrayList(); @@ -289,34 +286,38 @@ public class TestTAsyncClientManager extends TestCase { for (JankyRunnable runnable : runnables) { numSuccesses += runnable.getNumSuccesses(); } - assertEquals(numSuccesses, numThreads * numCallsPerThread); + assertEquals(numThreads * numCallsPerThread, numSuccesses); // check that timeouts work + assertFalse(s.isStopped()); + final CountDownLatch timeoutLatch = new CountDownLatch(1); client.setTimeout(100); client.primitiveMethod(new AsyncMethodCallback() { @Override public void onError(Throwable throwable) { - if (!(throwable instanceof TimeoutException)) { - fail("should have received timeout exception"); - synchronized(o) { - o.notifyAll(); + try { + if (!(throwable instanceof TimeoutException)) { + StringWriter sink = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sink, true)); + fail("expected TimeoutException but got " + sink.toString()); } + } finally { + timeoutLatch.countDown(); } } @Override public void onComplete(primitiveMethod_call response) { - fail("should not have finished timed out call."); - synchronized(o) { - o.notifyAll(); + try { + fail("should not have finished timed out call."); + } finally { + timeoutLatch.countDown(); } } }); - synchronized(o) { - o.wait(2000); - } + timeoutLatch.await(2, TimeUnit.SECONDS); assertTrue(client.hasError()); assertTrue(client.getError() instanceof TimeoutException); } -- 2.17.1