|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | * or more contributor license agreements. See the NOTICE file | 
|  | * distributed with this work for additional information | 
|  | * regarding copyright ownership. The ASF licenses this file | 
|  | * to you under the Apache License, Version 2.0 (the | 
|  | * "License"); you may not use this file except in compliance | 
|  | * with the License. You may obtain a copy of the License at | 
|  | * | 
|  | *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless enforced by applicable law or agreed to in writing, | 
|  | * software distributed under the License is distributed on an | 
|  | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | * KIND, either express or implied. See the License for the | 
|  | * specific language governing permissions and limitations | 
|  | * under the License. | 
|  | */ | 
|  | module async_test; | 
|  |  | 
|  | import core.atomic; | 
|  | import core.sync.condition : Condition; | 
|  | import core.sync.mutex : Mutex; | 
|  | import core.thread : dur, Thread, ThreadGroup; | 
|  | import std.conv : text; | 
|  | import std.datetime; | 
|  | import std.getopt; | 
|  | import std.exception : collectException, enforce; | 
|  | import std.parallelism : TaskPool; | 
|  | import std.stdio; | 
|  | import std.string; | 
|  | import std.variant : Variant; | 
|  | import thrift.base; | 
|  | import thrift.async.base; | 
|  | import thrift.async.libevent; | 
|  | import thrift.async.socket; | 
|  | import thrift.async.ssl; | 
|  | import thrift.codegen.async_client; | 
|  | import thrift.codegen.async_client_pool; | 
|  | import thrift.codegen.base; | 
|  | import thrift.codegen.processor; | 
|  | import thrift.protocol.base; | 
|  | import thrift.protocol.binary; | 
|  | import thrift.server.base; | 
|  | import thrift.server.simple; | 
|  | import thrift.server.transport.socket; | 
|  | import thrift.server.transport.ssl; | 
|  | import thrift.transport.base; | 
|  | import thrift.transport.buffered; | 
|  | import thrift.transport.ssl; | 
|  | import thrift.util.cancellation; | 
|  |  | 
|  | version (Posix) { | 
|  | import core.stdc.signal; | 
|  | import core.sys.posix.signal; | 
|  |  | 
|  | // Disable SIGPIPE because SSL server will write to broken socket after | 
|  | // client disconnected (see TSSLSocket docs). | 
|  | shared static this() { | 
|  | signal(SIGPIPE, SIG_IGN); | 
|  | } | 
|  | } | 
|  |  | 
|  | interface AsyncTest { | 
|  | string echo(string value); | 
|  | string delayedEcho(string value, long milliseconds); | 
|  |  | 
|  | void fail(string reason); | 
|  | void delayedFail(string reason, long milliseconds); | 
|  |  | 
|  | enum methodMeta = [ | 
|  | TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]), | 
|  | TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]) | 
|  | ]; | 
|  | alias .AsyncTestException AsyncTestException; | 
|  | } | 
|  |  | 
|  | class AsyncTestException : TException { | 
|  | string reason; | 
|  | mixin TStructHelpers!(); | 
|  | } | 
|  |  | 
|  | void main(string[] args) { | 
|  | ushort port = 9090; | 
|  | ushort managerCount = 2; | 
|  | ushort serversPerManager = 5; | 
|  | ushort threadsPerServer = 10; | 
|  | uint iterations = 10; | 
|  | bool ssl; | 
|  | bool trace; | 
|  |  | 
|  | getopt(args, | 
|  | "iterations", &iterations, | 
|  | "managers", &managerCount, | 
|  | "port", &port, | 
|  | "servers-per-manager", &serversPerManager, | 
|  | "ssl", &ssl, | 
|  | "threads-per-server", &threadsPerServer, | 
|  | "trace", &trace, | 
|  | ); | 
|  |  | 
|  | TTransportFactory clientTransportFactory; | 
|  | TSSLContext serverSSLContext; | 
|  | if (ssl) { | 
|  | auto clientSSLContext = new TSSLContext(); | 
|  | with (clientSSLContext) { | 
|  | ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; | 
|  | authenticate = true; | 
|  | loadTrustedCertificates("./trusted-ca-certificate.pem"); | 
|  | } | 
|  | clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext); | 
|  |  | 
|  | serverSSLContext = new TSSLContext(); | 
|  | with (serverSSLContext) { | 
|  | serverSide = true; | 
|  | loadCertificate("./server-certificate.pem"); | 
|  | loadPrivateKey("./server-private-key.pem"); | 
|  | ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; | 
|  | } | 
|  | } else { | 
|  | clientTransportFactory = new TBufferedTransportFactory; | 
|  | } | 
|  |  | 
|  |  | 
|  | auto serverCancel = new TCancellationOrigin; | 
|  | scope(exit) { | 
|  | writeln("Triggering server shutdown..."); | 
|  | serverCancel.trigger(); | 
|  | writeln("done."); | 
|  | } | 
|  |  | 
|  | auto managers = new TLibeventAsyncManager[managerCount]; | 
|  | scope (exit) foreach (ref m; managers) clear(m); | 
|  |  | 
|  | auto clientsThreads = new ThreadGroup; | 
|  | foreach (managerIndex, ref manager; managers) { | 
|  | manager = new TLibeventAsyncManager; | 
|  | foreach (serverIndex; 0 .. serversPerManager) { | 
|  | auto currentPort = cast(ushort) | 
|  | (port + managerIndex * serversPerManager + serverIndex); | 
|  |  | 
|  | // Start the server and wait until it is up and running. | 
|  | auto servingMutex = new Mutex; | 
|  | auto servingCondition = new Condition(servingMutex); | 
|  | auto handler = new PreServeNotifyHandler(servingMutex, servingCondition); | 
|  | synchronized (servingMutex) { | 
|  | (new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace, | 
|  | serverCancel, handler)).start(); | 
|  | servingCondition.wait(); | 
|  | } | 
|  |  | 
|  | // We only run the timing tests for the first server on each async | 
|  | // manager, so that we don't get spurious timing errors becaue of | 
|  | // ordering issues. | 
|  | auto runTimingTests = (serverIndex == 0); | 
|  |  | 
|  | auto c = new ClientsThread(manager, currentPort, clientTransportFactory, | 
|  | threadsPerServer, iterations, runTimingTests, trace); | 
|  | clientsThreads.add(c); | 
|  | c.start(); | 
|  | } | 
|  | } | 
|  | clientsThreads.joinAll(); | 
|  | } | 
|  |  | 
|  | class AsyncTestHandler : AsyncTest { | 
|  | this(bool trace) { | 
|  | trace_ = trace; | 
|  | } | 
|  |  | 
|  | override string echo(string value) { | 
|  | if (trace_) writefln(`echo("%s")`, value); | 
|  | return value; | 
|  | } | 
|  |  | 
|  | override string delayedEcho(string value, long milliseconds) { | 
|  | if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds); | 
|  | Thread.sleep(dur!"msecs"(milliseconds)); | 
|  | if (trace_) writeln("returning."); | 
|  |  | 
|  | return value; | 
|  | } | 
|  |  | 
|  | override void fail(string reason) { | 
|  | if (trace_) writefln(`fail("%s")`, reason); | 
|  | auto ate = new AsyncTestException; | 
|  | ate.reason = reason; | 
|  | throw ate; | 
|  | } | 
|  |  | 
|  | override void delayedFail(string reason, long milliseconds) { | 
|  | if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds); | 
|  | Thread.sleep(dur!"msecs"(milliseconds)); | 
|  | if (trace_) writeln("returning."); | 
|  |  | 
|  | auto ate = new AsyncTestException; | 
|  | ate.reason = reason; | 
|  | throw ate; | 
|  | } | 
|  |  | 
|  | private: | 
|  | bool trace_; | 
|  | AsyncTestException ate_; | 
|  | } | 
|  |  | 
|  | class PreServeNotifyHandler : TServerEventHandler { | 
|  | this(Mutex servingMutex, Condition servingCondition) { | 
|  | servingMutex_ = servingMutex; | 
|  | servingCondition_ = servingCondition; | 
|  | } | 
|  |  | 
|  | void preServe() { | 
|  | synchronized (servingMutex_) { | 
|  | servingCondition_.notifyAll(); | 
|  | } | 
|  | } | 
|  | Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } | 
|  | void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} | 
|  | void preProcess(Variant serverContext, TTransport transport) {} | 
|  |  | 
|  | private: | 
|  | Mutex servingMutex_; | 
|  | Condition servingCondition_; | 
|  | } | 
|  |  | 
|  | class ServerThread(ServerType) : Thread { | 
|  | this(ushort port, TSSLContext sslContext, bool trace, | 
|  | TCancellation cancellation, TServerEventHandler eventHandler | 
|  | ) { | 
|  | port_ = port; | 
|  | sslContext_ = sslContext; | 
|  | trace_ = trace; | 
|  | cancellation_ = cancellation; | 
|  | eventHandler_ = eventHandler; | 
|  |  | 
|  | super(&run); | 
|  | } | 
|  |  | 
|  | void run() { | 
|  | TServerSocket serverSocket; | 
|  | if (sslContext_) { | 
|  | serverSocket = new TSSLServerSocket(port_, sslContext_); | 
|  | } else { | 
|  | serverSocket = new TServerSocket(port_); | 
|  | } | 
|  | auto transportFactory = new TBufferedTransportFactory; | 
|  | auto protocolFactory = new TBinaryProtocolFactory!(); | 
|  | auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_)); | 
|  |  | 
|  | auto server = new ServerType(processor, serverSocket, transportFactory, | 
|  | protocolFactory); | 
|  | server.eventHandler = eventHandler_; | 
|  |  | 
|  | writefln("Starting server on port %s...", port_); | 
|  | server.serve(cancellation_); | 
|  | writefln("Server thread on port %s done.", port_); | 
|  | } | 
|  |  | 
|  | private: | 
|  | ushort port_; | 
|  | bool trace_; | 
|  | TCancellation cancellation_; | 
|  | TSSLContext sslContext_; | 
|  | TServerEventHandler eventHandler_; | 
|  | } | 
|  |  | 
|  | class ClientsThread : Thread { | 
|  | this(TAsyncSocketManager manager, ushort port, TTransportFactory tf, | 
|  | ushort threads, uint iterations, bool runTimingTests, bool trace | 
|  | ) { | 
|  | manager_ = manager; | 
|  | port_ = port; | 
|  | transportFactory_ = tf; | 
|  | threads_ = threads; | 
|  | iterations_ = iterations; | 
|  | runTimingTests_ = runTimingTests; | 
|  | trace_ = trace; | 
|  | super(&run); | 
|  | } | 
|  |  | 
|  | void run() { | 
|  | auto transport = new TAsyncSocket(manager_, "localhost", port_); | 
|  |  | 
|  | { | 
|  | auto client = new TAsyncClient!AsyncTest( | 
|  | transport, | 
|  | transportFactory_, | 
|  | new TBinaryProtocolFactory!() | 
|  | ); | 
|  | transport.open(); | 
|  | auto clientThreads = new ThreadGroup; | 
|  | foreach (clientId; 0 .. threads_) { | 
|  | clientThreads.create({ | 
|  | auto c = clientId; | 
|  | return { | 
|  | foreach (i; 0 .. iterations_) { | 
|  | immutable id = text(port_, ":", c, ":", i); | 
|  |  | 
|  | { | 
|  | if (trace_) writefln(`Calling echo("%s")... `, id); | 
|  | auto a = client.echo(id); | 
|  | enforce(a == id); | 
|  | if (trace_) writefln(`echo("%s") done.`, id); | 
|  | } | 
|  |  | 
|  | { | 
|  | if (trace_) writefln(`Calling fail("%s")... `, id); | 
|  | auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet()); | 
|  | enforce(a && a.reason == id); | 
|  | if (trace_) writefln(`fail("%s") done.`, id); | 
|  | } | 
|  | } | 
|  | }; | 
|  | }()); | 
|  | } | 
|  | clientThreads.joinAll(); | 
|  | transport.close(); | 
|  | } | 
|  |  | 
|  | if (runTimingTests_) { | 
|  | auto client = new TAsyncClient!AsyncTest( | 
|  | transport, | 
|  | transportFactory_, | 
|  | new TBinaryProtocolFactory!TBufferedTransport | 
|  | ); | 
|  |  | 
|  | // Temporarily redirect error logs to stdout, as SSL errors on the server | 
|  | // side are expected when the client terminates aburptly (as is the case | 
|  | // in the timeout test). | 
|  | auto oldErrorLogSink = g_errorLogSink; | 
|  | g_errorLogSink = g_infoLogSink; | 
|  | scope (exit) g_errorLogSink = oldErrorLogSink; | 
|  |  | 
|  | foreach (i; 0 .. iterations_) { | 
|  | transport.open(); | 
|  |  | 
|  | immutable id = text(port_, ":", i); | 
|  |  | 
|  | { | 
|  | if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id); | 
|  | auto a = client.delayedEcho(id, 100); | 
|  | enforce(!a.completion.wait(dur!"usecs"(1)), | 
|  | text("wait() succeded early (", a.get(), ", ", id, ").")); | 
|  | enforce(!a.completion.wait(dur!"usecs"(1)), | 
|  | text("wait() succeded early (", a.get(), ", ", id, ").")); | 
|  | enforce(a.completion.wait(dur!"msecs"(200)), | 
|  | text("wait() didn't succeed as expected (", id, ").")); | 
|  | enforce(a.get() == id); | 
|  | if (trace_) writefln(`... delayedEcho("%s") done.`, id); | 
|  | } | 
|  |  | 
|  | { | 
|  | if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id); | 
|  | auto a = client.delayedFail(id, 100); | 
|  | enforce(!a.completion.wait(dur!"usecs"(1)), | 
|  | text("wait() succeded early (", id, ", ", collectException(a.get()), ").")); | 
|  | enforce(!a.completion.wait(dur!"usecs"(1)), | 
|  | text("wait() succeded early (", id, ", ", collectException(a.get()), ").")); | 
|  | enforce(a.completion.wait(dur!"msecs"(200)), | 
|  | text("wait() didn't succeed as expected (", id, ").")); | 
|  | auto e = cast(AsyncTestException)collectException(a.get()); | 
|  | enforce(e && e.reason == id); | 
|  | if (trace_) writefln(`... delayedFail("%s") done.`, id); | 
|  | } | 
|  |  | 
|  | { | 
|  | transport.recvTimeout = dur!"msecs"(50); | 
|  |  | 
|  | if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `); | 
|  | auto a = client.delayedEcho("socketTimeout", 100); | 
|  | auto e = cast(TTransportException)collectException(a.waitGet()); | 
|  | enforce(e, text("Operation didn't fail as expected (", id, ").")); | 
|  | enforce(e.type == TTransportException.Type.TIMED_OUT, | 
|  | text("Wrong timeout exception type (", id, "): ", e)); | 
|  | if (trace_) writeln(`timed out as expected.`); | 
|  |  | 
|  | // Wait until the server thread reset before the next iteration. | 
|  | Thread.sleep(dur!"msecs"(50)); | 
|  | transport.recvTimeout = dur!"hnsecs"(0); | 
|  | } | 
|  |  | 
|  | transport.close(); | 
|  | } | 
|  | } | 
|  |  | 
|  | writefln("Clients thread for port %s done.", port_); | 
|  | } | 
|  |  | 
|  | TAsyncSocketManager manager_; | 
|  | ushort port_; | 
|  | TTransportFactory transportFactory_; | 
|  | ushort threads_; | 
|  | uint iterations_; | 
|  | bool runTimingTests_; | 
|  | bool trace_; | 
|  | } |