package org.apache.thrift.server;
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
* connected clients in terms of invocations.
select();
processInterestChanges();
}
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer.Args;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import thrift.test.ThriftTest;
public class TestNonblockingServer extends ServerTestBase {
private Thread serverThread;
private TServer server;
+ private static final int NUM_QUERIES = 10000;
protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
public TTransport getClientTransport(TTransport underlyingTransport) throws Exception {
return new TFramedTransport(underlyingTransport);
}
+
+
+ public void testCleanupAllSelectionKeys() throws Exception {
+ for (TProtocolFactory protoFactory : getProtocols()) {
+ TestHandler handler = new TestHandler();
+ ThriftTest.Processor processor = new ThriftTest.Processor(handler);
+
+ startServer(processor, protoFactory);
+
+ TSocket socket = new TSocket(HOST, PORT);
+ socket.setTimeout(SOCKET_TIMEOUT);
+ TTransport transport = getClientTransport(socket);
+
+ TProtocol protocol = protoFactory.getProtocol(transport);
+ ThriftTest.Client testClient = new ThriftTest.Client(protocol);
+
+ open(transport);
+
+ for (int i = 0; i < NUM_QUERIES; ++i) {
+ testClient.testI32(1);
+ }
+ server.stop();
+ for (int i = 0; i < NUM_QUERIES; ++i) {
+ try {
+ testClient.testI32(1);
+ } catch(TTransportException e) {
+ System.err.println(e);
+ e.printStackTrace();
+ if (e.getCause() instanceof java.net.SocketTimeoutException) {
+ fail("timed out when it should have thrown another kind of error!");
+ }
+ }
+ }
+
+ transport.close();
+ stopServer();
+ }
+ }
}