THRIFT-1420. java: Nonblocking and HsHa server should make sure to close all their...
authorBryan Duxbury <bryanduxbury@apache.org>
Fri, 4 Nov 2011 00:23:30 +0000 (00:23 +0000)
committerBryan Duxbury <bryanduxbury@apache.org>
Fri, 4 Nov 2011 00:23:30 +0000 (00:23 +0000)
This patch makes the selector threads close out all of their open sockets before completely exiting. In testing, this appears to alleviate issues with hanging clients.

Patch: Thomas Kielbus

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1197370 13f79535-47bb-0310-9956-ffa450edef68

lib/java/src/org/apache/thrift/server/TNonblockingServer.java
lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
lib/java/test/org/apache/thrift/server/TestNonblockingServer.java

index 7afd4b3..dccae52 100644 (file)
 
 package org.apache.thrift.server;
 
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
 import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingTransport;
 import org.apache.thrift.transport.TTransportException;
 
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
 /**
  * A nonblocking TServer implementation. This allows for fairness amongst all
  * connected clients in terms of invocations.
@@ -154,6 +154,9 @@ public class TNonblockingServer extends AbstractNonblockingServer {
           select();
           processInterestChanges();
         }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
       } catch (Throwable t) {
         LOGGER.error("run() exiting due to uncaught error", t);
       } finally {
index 4cf5f1b..04179e6 100644 (file)
@@ -537,6 +537,9 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
           processAcceptedConnections();
           processInterestChanges();
         }
+        for (SelectionKey selectionKey : selector.keys()) {
+          cleanupSelectionKey(selectionKey);
+        }
       } catch (Throwable t) {
         LOGGER.error("run() exiting due to uncaught error", t);
       } finally {
index 52b62c3..597074e 100644 (file)
@@ -20,16 +20,22 @@ package org.apache.thrift.server;
 
 
 import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TNonblockingServer.Args;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import thrift.test.ThriftTest;
 
 public class TestNonblockingServer extends ServerTestBase {
 
   private Thread serverThread;
   private TServer server;
+  private static final int NUM_QUERIES = 10000;
 
   protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
     return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
@@ -71,4 +77,42 @@ public class TestNonblockingServer extends ServerTestBase {
   public TTransport getClientTransport(TTransport underlyingTransport) throws Exception {
     return new TFramedTransport(underlyingTransport);
   }
+
+
+  public void testCleanupAllSelectionKeys() throws Exception {
+    for (TProtocolFactory protoFactory : getProtocols()) {
+      TestHandler handler = new TestHandler();
+      ThriftTest.Processor processor = new ThriftTest.Processor(handler);
+
+      startServer(processor, protoFactory);
+
+      TSocket socket = new TSocket(HOST, PORT);
+      socket.setTimeout(SOCKET_TIMEOUT);
+      TTransport transport = getClientTransport(socket);
+
+      TProtocol protocol = protoFactory.getProtocol(transport);
+      ThriftTest.Client testClient = new ThriftTest.Client(protocol);
+
+      open(transport);
+
+      for (int i = 0; i < NUM_QUERIES; ++i) {
+        testClient.testI32(1);
+      }
+      server.stop();
+      for (int i = 0; i < NUM_QUERIES; ++i) {
+        try {
+          testClient.testI32(1);
+        } catch(TTransportException e) {
+          System.err.println(e);
+          e.printStackTrace();
+          if (e.getCause() instanceof java.net.SocketTimeoutException) {
+            fail("timed out when it should have thrown another kind of error!");
+          }
+        }
+      }
+
+      transport.close();
+      stopServer();
+    }
+  }
 }