import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
}
// Handle any ready channels calls
- Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- keys.remove();
- if (!key.isValid()) {
- // this should only have happened if the method call experienced an
- // error and the key was cancelled. just skip it.
- continue;
+ try {
+ Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+ while (keys.hasNext()) {
+ SelectionKey key = keys.next();
+ keys.remove();
+ if (!key.isValid()) {
+ // this should only have happened if the method call experienced an
+ // error and the key was cancelled. just skip it.
+ continue;
+ }
+ TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
+ method.transition(key);
}
- TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
- method.transition(key);
+ } catch (ClosedSelectorException e) {
+ LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
}
// Start any new calls
TAsyncMethodCall methodCall;
while ((methodCall = pendingCalls.poll()) != null) {
+ // Catch registration errors. Method will catch transition errors and cleanup.
try {
SelectionKey key = methodCall.registerWithSelector(selector);
methodCall.transition(key);
- } catch (IOException e) {
- LOGGER.warn("Caught IOException in TAsyncClientManager!", e);
- }
+ } catch (ClosedChannelException e) {
+ methodCall.onError(e);
+ LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e);
+ } catch (CancelledKeyException e) {
+ methodCall.onError(e);
+ LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e);
+ } catch (Exception e) {
+ methodCall.onError(e);
+ LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e);
+ }
}
}
}
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingSocket;
+import java.util.List;
+import java.util.ArrayList;
+
import thrift.test.CompactProtoTestStruct;
import thrift.test.Srv;
import thrift.test.Srv.Iface;
public void onewayMethod() throws TException {
}
}
+
+ public class JankyRunnable implements Runnable {
+ private TAsyncClientManager acm_;
+ private int numCalls_;
+ private int numSuccesses_ = 0;
+ private Srv.AsyncClient client_;
+ private TNonblockingSocket clientSocket_;
+
+ public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception {
+ this.acm_ = acm;
+ this.numCalls_ = numCalls;
+ this.clientSocket_ = new TNonblockingSocket("localhost", 12345);
+ this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
+ }
+
+ public int getNumSuccesses() {
+ return numSuccesses_;
+ }
+
+ public void run() {
+ for (int i = 0; i < numCalls_; i++) {
+ try {
+ // connect an async client
+ final Object o = new Object();
+
+ final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+ client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
+ @Override
+ public void onComplete(Janky_call response) {
+ try {
+ assertEquals(3, response.getResult());
+ jankyReturned.set(true);
+ synchronized(o) {
+ o.notifyAll();
+ }
+ } catch (TException e) {
+ e.printStackTrace();
+ synchronized(o) {
+ o.notifyAll();
+ }
+ fail("unexpected exception: " + e);
+ }
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ synchronized(o) {
+ o.notifyAll();
+ }
+ fail("unexpected exception: " + throwable);
+ }
+ });
+
+ synchronized(o) {
+ o.wait(1000);
+ }
+
+ assertTrue(jankyReturned.get());
+ this.numSuccesses_++;
+ } catch (Exception e) {
+ fail("Unexpected " + e);
+ }
+ }
+ }
+ }
public void testIt() throws Exception {
// put up a server
synchronized(o) {
o.wait(1000);
}
-
assertTrue(voidAfterOnewayReturned.get());
+
+ // make multiple calls with deserialization in the selector thread (repro Eric's issue)
+ int numThreads = 500;
+ int numCallsPerThread = 100;
+ List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numThreads; i++) {
+ JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread);
+ Thread thread = new Thread(runnable);
+ thread.start();
+ threads.add(thread);
+ runnables.add(runnable);
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ int numSuccesses = 0;
+ for (JankyRunnable runnable : runnables) {
+ numSuccesses += runnable.getNumSuccesses();
+ }
+ assertEquals(numSuccesses, numThreads * numCallsPerThread);
}
}