THRIFT-1500: d programming language support
Client: D
Patch: David Nadlinger

D program language library and additions



git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1304085 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/d/test/client_pool_test.d b/lib/d/test/client_pool_test.d
new file mode 100644
index 0000000..85bcb29
--- /dev/null
+++ b/lib/d/test/client_pool_test.d
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module client_pool_test;
+
+import core.time : Duration, dur;
+import core.thread : Thread;
+import std.algorithm;
+import std.array;
+import std.conv;
+import std.exception;
+import std.getopt;
+import std.range;
+import std.stdio;
+import std.typecons;
+import thrift.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.codegen.base;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.client;
+import thrift.codegen.client_pool;
+import thrift.codegen.processor;
+import thrift.protocol.binary;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.transport.buffered;
+import thrift.transport.socket;
+import thrift.util.cancellation;
+import thrift.util.future;
+
+// We use this as our RPC-layer exception here to make sure socket/… problems
+// (that would usually considered to be RPC layer faults) cause the tests to
+// fail, even though we are testing the RPC exception handling.
+class TestServiceException : TException {
+  int port;
+}
+
+interface TestService {
+  int getPort();
+  alias .TestServiceException TestServiceException;
+  enum methodMeta = [TMethodMeta("getPort", [],
+    [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+// Use some derived service, just to check that the pools handle inheritance
+// correctly.
+interface ExTestService : TestService {
+  int[] getPortInArray();
+  enum methodMeta = [TMethodMeta("getPortInArray", [],
+    [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+class ExTestHandler : ExTestService {
+  this(ushort port, Duration delay, bool failing, bool trace) {
+    this.port = port;
+    this.delay = delay;
+    this.failing = failing;
+    this.trace = trace;
+  }
+
+  override int getPort() {
+    if (trace) {
+      stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
+        delay, failing);
+    }
+    sleep();
+    failIfEnabled();
+    return port;
+  }
+
+  override int[] getPortInArray() {
+    return [getPort()];
+  }
+
+  ushort port;
+  Duration delay;
+  bool failing;
+  bool trace;
+
+private:
+  void sleep() {
+    if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
+  }
+
+  void failIfEnabled() {
+    if (!failing) return;
+
+    auto e = new TestServiceException;
+    e.port = port;
+    throw e;
+  }
+}
+
+class ServerThread : Thread {
+  this(ExTestHandler handler, TCancellation cancellation) {
+    super(&run);
+    handler_ = handler;
+    cancellation_ = cancellation;
+  }
+private:
+  void run() {
+    try {
+      auto protocolFactory = new TBinaryProtocolFactory!();
+      auto processor = new TServiceProcessor!ExTestService(handler_);
+      auto serverTransport = new TServerSocket(handler_.port);
+      serverTransport.recvTimeout = dur!"seconds"(3);
+      auto transportFactory = new TBufferedTransportFactory;
+
+      auto server = new TSimpleServer(
+        processor, serverTransport, transportFactory, protocolFactory);
+      server.serve(cancellation_);
+    } catch (Exception e) {
+      writefln("Server thread on port %s failed: %s", handler_.port, e);
+    }
+  }
+
+  TCancellation cancellation_;
+  ExTestHandler handler_;
+}
+
+void main(string[] args) {
+  bool trace;
+  ushort port = 9090;
+  getopt(args, "port", &port, "trace", &trace);
+
+  auto serverCancellation = new TCancellationOrigin;
+  scope (exit) serverCancellation.trigger();
+
+  immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
+
+version (none) {
+  // Cannot use this due to multiple DMD @@BUG@@s:
+  // 1. »function D main is a nested function and cannot be accessed from array«
+  //    when calling array() on the result of the outer map() – would have to
+  //    manually do the eager evaluation/array conversion.
+  // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
+  //    can be worked around by calling array() on the map result first.
+  // 3. Even when using the workarounds for the last two points, the DMD-built
+  //    executable crashes when building without (sic!) inlining enabled,
+  //    the backtrace points into the first delegate literal.
+  auto handlers = array(map!((args){
+    return new ExTestHandler(args._0, args._1, args._2, trace);
+  })(zip(
+    ports,
+    map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
+    [false, false, false, true, true, true]
+  )));
+} else {
+  auto handlers = [
+    new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
+    new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
+    new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
+    new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
+    new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
+    new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
+  ];
+}
+
+  // Fire up the server threads.
+  foreach (h; handlers) (new ServerThread(h, serverCancellation)).start();
+
+  // Give the servers some time to get up. This should really be accomplished
+  // via a barrier here and in the preServe() hook.
+  Thread.sleep(dur!"msecs"(10));
+
+  syncClientPoolTest(ports, handlers);
+  asyncClientPoolTest(ports, handlers);
+  asyncFastestClientPoolTest(ports, handlers);
+  asyncAggregatorTest(ports, handlers);
+}
+
+
+void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto clients = array(map!((a){
+    return cast(TClientBase!ExTestService)tClient!ExTestService(
+      tBinaryProtocol(new TSocket("127.0.0.1", a))
+    );
+  })(ports));
+
+  scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
+
+  // Try the case where the first client succeeds.
+  {
+    enforce(makePool(clients).getPort() == ports[0]);
+  }
+
+  // Try the case where all clients fail.
+  {
+    auto pool = makePool(clients[3 .. $]);
+    auto e = cast(TCompoundOperationException)collectException(pool.getPort());
+    enforce(e);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+      ports[3 .. $]));
+  }
+
+  // Try the case where the first clients fail, but a later one succeeds.
+  {
+    auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
+    enforce(pool.getPortInArray() == [ports[0]]);
+  }
+
+  // Make sure a client is properly deactivated when it has failed too often.
+  {
+    auto pool = makePool(clients);
+    pool.faultDisableCount = 1;
+    pool.faultDisableDuration = dur!"msecs"(50);
+
+    handlers[0].failing = true;
+    enforce(pool.getPort() == ports[1]);
+
+    handlers[0].failing = false;
+    enforce(pool.getPort() == ports[1]);
+
+    Thread.sleep(dur!"msecs"(50));
+    enforce(pool.getPort() == ports[0]);
+  }
+}
+
+auto makePool(TClientBase!ExTestService[] clients) {
+  auto p = tClientPool(clients);
+  p.permuteClients = false;
+  p.rpcFaultFilter = (Exception e) {
+    return (cast(TestServiceException)e !is null);
+  };
+  return p;
+}
+
+
+void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto manager = new TLibeventAsyncManager;
+  scope (exit) manager.stop(dur!"hnsecs"(0));
+
+  auto clients = makeAsyncClients(manager, ports);
+  scope(exit) foreach (c; clients) c.transport.close();
+
+  // Try the case where the first client succeeds.
+  {
+    enforce(makeAsyncPool(clients).getPort() == ports[0]);
+  }
+
+  // Try the case where all clients fail.
+  {
+    auto pool = makeAsyncPool(clients[3 .. $]);
+    auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+    enforce(e);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+      ports[3 .. $]));
+  }
+
+  // Try the case where the first clients fail, but a later one succeeds.
+  {
+    auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
+    enforce(pool.getPortInArray() == [ports[0]]);
+  }
+
+  // Make sure a client is properly deactivated when it has failed too often.
+  {
+    auto pool = makeAsyncPool(clients);
+    pool.faultDisableCount = 1;
+    pool.faultDisableDuration = dur!"msecs"(50);
+
+    handlers[0].failing = true;
+    enforce(pool.getPort() == ports[1]);
+
+    handlers[0].failing = false;
+    enforce(pool.getPort() == ports[1]);
+
+    Thread.sleep(dur!"msecs"(50));
+    enforce(pool.getPort() == ports[0]);
+  }
+}
+
+auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
+  auto p = tAsyncClientPool(clients);
+  p.permuteClients = false;
+  p.rpcFaultFilter = (Exception e) {
+    return (cast(TestServiceException)e !is null);
+  };
+  return p;
+}
+
+auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
+  // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
+  // to »function D main is a nested function and cannot be accessed from array«.
+  // Thus, we manually do the array conversion.
+  auto lazyClients = map!((a){
+    return new TAsyncClient!ExTestService(
+      new TAsyncSocket(manager, "127.0.0.1", a),
+      new TBufferedTransportFactory,
+      new TBinaryProtocolFactory!(TBufferedTransport)
+    );
+  })(ports);
+  TAsyncClientBase!ExTestService[] clients;
+  foreach (c; lazyClients) clients ~= c;
+  return clients;
+}
+
+
+void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto manager = new TLibeventAsyncManager;
+  scope (exit) manager.stop(dur!"hnsecs"(0));
+
+  auto clients = makeAsyncClients(manager, ports);
+  scope(exit) foreach (c; clients) c.transport.close();
+
+  // Make sure the fastest client wins, even if they are called in some other
+  // order.
+  {
+    auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
+    enforce(result == ports[0]);
+  }
+
+  // Try the case where all clients fail.
+  {
+    auto pool = makeAsyncFastestPool(clients[3 .. $]);
+    auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+    enforce(e);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+      ports[3 .. $]));
+  }
+
+  // Try the case where the first clients fail, but a later one succeeds.
+  {
+    auto pool = makeAsyncFastestPool(clients[1 .. $]);
+    enforce(pool.getPortInArray() == [ports[1]]);
+  }
+}
+
+auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
+  auto p = tAsyncFastestClientPool(clients);
+  p.rpcFaultFilter = (Exception e) {
+    return (cast(TestServiceException)e !is null);
+  };
+  return p;
+}
+
+
+void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+  auto manager = new TLibeventAsyncManager;
+  scope (exit) manager.stop(dur!"hnsecs"(0));
+
+  auto clients = makeAsyncClients(manager, ports);
+  scope(exit) foreach (c; clients) c.transport.close();
+
+  auto aggregator = tAsyncAggregator(
+    cast(TAsyncClientBase!ExTestService[])clients);
+
+  // Test aggregator range interface.
+  {
+    auto range = aggregator.getPort().range(dur!"msecs"(50));
+    enforce(equal(range, ports[0 .. 2][]));
+    enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
+      ports[3 .. $ - 1]));
+    enforce(range.completedCount == 4);
+  }
+
+  // Test default accumulator for scalars.
+  {
+    auto fullResult = aggregator.getPort().accumulate();
+    enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+    auto partialResult = aggregator.getPort().accumulate();
+    Thread.sleep(dur!"msecs"(20));
+    enforce(partialResult.finishGet() == ports[0 .. 2]);
+
+  }
+
+  // Test default accumulator for arrays.
+  {
+    auto fullResult = aggregator.getPortInArray().accumulate();
+    enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+    auto partialResult = aggregator.getPortInArray().accumulate();
+    Thread.sleep(dur!"msecs"(20));
+    enforce(partialResult.finishGet() == ports[0 .. 2]);
+  }
+
+  // Test custom accumulator.
+  {
+    auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
+      return reduce!"a + b"(results);
+    })();
+    enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
+
+    auto partialResult = aggregator.getPort().accumulate!(
+      function(int[] results, Exception[] exceptions) {
+        // Return a tuple of the parameters so we can check them outside of
+        // this function (to verify the values, we need access to »ports«, but
+        // due to DMD @@BUG5710@@, we can't use a delegate literal).f
+        return tuple(results, exceptions);
+      }
+    )();
+    Thread.sleep(dur!"msecs"(20));
+    auto resultTuple = partialResult.finishGet();
+    enforce(resultTuple._0 == ports[0 .. 2]);
+    enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple._1),
+      ports[3 .. $ - 1]));
+  }
+}