From df971daf373ceda6de2f76e5f0713c64b6d2e242 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Fri, 4 Nov 2011 00:23:30 +0000 Subject: [PATCH] THRIFT-1420. java: Nonblocking and HsHa server should make sure to close all their socket connections when the selector exits This patch makes the selector threads close out all of their open sockets before completely exiting. In testing, this appears to alleviate issues with hanging clients. Patch: Thomas Kielbus git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1197370 13f79535-47bb-0310-9956-ffa450edef68 --- .../thrift/server/TNonblockingServer.java | 11 +++-- .../server/TThreadedSelectorServer.java | 3 ++ .../thrift/server/TestNonblockingServer.java | 44 +++++++++++++++++++ 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 7afd4b35..dccae527 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -20,14 +20,14 @@ package org.apache.thrift.server; -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.util.Iterator; - import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransportException; +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.util.Iterator; + /** * A nonblocking TServer implementation. This allows for fairness amongst all * connected clients in terms of invocations. @@ -154,6 +154,9 @@ public class TNonblockingServer extends AbstractNonblockingServer { select(); processInterestChanges(); } + for (SelectionKey selectionKey : selector.keys()) { + cleanupSelectionKey(selectionKey); + } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java index 4cf5f1b5..04179e64 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -537,6 +537,9 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { processAcceptedConnections(); processInterestChanges(); } + for (SelectionKey selectionKey : selector.keys()) { + cleanupSelectionKey(selectionKey); + } } catch (Throwable t) { LOGGER.error("run() exiting due to uncaught error", t); } finally { diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java index 52b62c34..597074ed 100644 --- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java +++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java @@ -20,16 +20,22 @@ package org.apache.thrift.server; import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TNonblockingServer.Args; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import thrift.test.ThriftTest; public class TestNonblockingServer extends ServerTestBase { private Thread serverThread; private TServer server; + private static final int NUM_QUERIES = 10000; protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) { return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory)); @@ -71,4 +77,42 @@ public class TestNonblockingServer extends ServerTestBase { public TTransport getClientTransport(TTransport underlyingTransport) throws Exception { return new TFramedTransport(underlyingTransport); } + + + public void testCleanupAllSelectionKeys() throws Exception { + for (TProtocolFactory protoFactory : getProtocols()) { + TestHandler handler = new TestHandler(); + ThriftTest.Processor processor = new ThriftTest.Processor(handler); + + startServer(processor, protoFactory); + + TSocket socket = new TSocket(HOST, PORT); + socket.setTimeout(SOCKET_TIMEOUT); + TTransport transport = getClientTransport(socket); + + TProtocol protocol = protoFactory.getProtocol(transport); + ThriftTest.Client testClient = new ThriftTest.Client(protocol); + + open(transport); + + for (int i = 0; i < NUM_QUERIES; ++i) { + testClient.testI32(1); + } + server.stop(); + for (int i = 0; i < NUM_QUERIES; ++i) { + try { + testClient.testI32(1); + } catch(TTransportException e) { + System.err.println(e); + e.printStackTrace(); + if (e.getCause() instanceof java.net.SocketTimeoutException) { + fail("timed out when it should have thrown another kind of error!"); + } + } + } + + transport.close(); + stopServer(); + } + } } -- 2.17.1