From 0e86f1f7748c8a4386af8a817d50e4fe39e6b872 Mon Sep 17 00:00:00 2001 From: Randy Abernethy Date: Sun, 13 Jul 2014 09:50:19 -0700 Subject: [PATCH] THRIFT-2624: Add TServerEventHandler support to C# Client: C# Patch: ra Adds the TServerEventHandler interface to the C# lib and adds support in all C# servers. --- lib/csharp/Makefile.am | 1 + lib/csharp/src/Server/TServer.cs | 210 +++++----- lib/csharp/src/Server/TServerEventHandler.cs | 50 +++ lib/csharp/src/Server/TSimpleServer.cs | 192 ++++----- lib/csharp/src/Server/TThreadPoolServer.cs | 345 ++++++++-------- lib/csharp/src/Server/TThreadedServer.cs | 405 ++++++++++--------- lib/csharp/src/Thrift.csproj | 1 + lib/csharp/test/ThriftTest/TestServer.cs | 26 ++ 8 files changed, 672 insertions(+), 558 deletions(-) create mode 100644 lib/csharp/src/Server/TServerEventHandler.cs diff --git a/lib/csharp/Makefile.am b/lib/csharp/Makefile.am index 069e48ce..1c75aa10 100644 --- a/lib/csharp/Makefile.am +++ b/lib/csharp/Makefile.am @@ -46,6 +46,7 @@ THRIFTCODE= \ src/Server/TThreadPoolServer.cs \ src/Server/TSimpleServer.cs \ src/Server/TServer.cs \ + src/Server/TServerEventHandler.cs \ src/Transport/TBufferedTransport.cs \ src/Transport/TTransport.cs \ src/Transport/TSocket.cs \ diff --git a/lib/csharp/src/Server/TServer.cs b/lib/csharp/src/Server/TServer.cs index 271b0c4c..a2631a96 100644 --- a/lib/csharp/src/Server/TServer.cs +++ b/lib/csharp/src/Server/TServer.cs @@ -28,118 +28,100 @@ using System.IO; namespace Thrift.Server { - public abstract class TServer - { - /** - * Core processor - */ - protected TProcessor processor; - - /** - * Server transport - */ - protected TServerTransport serverTransport; - - /** - * Input Transport Factory - */ - protected TTransportFactory inputTransportFactory; - - /** - * Output Transport Factory - */ - protected TTransportFactory outputTransportFactory; - - /** - * Input Protocol Factory - */ - protected TProtocolFactory inputProtocolFactory; - - /** - * Output Protocol Factory - */ - protected TProtocolFactory outputProtocolFactory; - - public delegate void LogDelegate(string str); - private LogDelegate _logDelegate; - protected LogDelegate logDelegate - { - get { return _logDelegate; } - set { _logDelegate = (value != null) ? value : DefaultLogDelegate; } - } - - /** - * Default constructors. - */ - - public TServer(TProcessor processor, - TServerTransport serverTransport) - :this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate) - { - } - - public TServer(TProcessor processor, - TServerTransport serverTransport, - LogDelegate logDelegate) - : this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate) - { - } - - public TServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory) - :this(processor, - serverTransport, - transportFactory, - transportFactory, - new TBinaryProtocol.Factory(), - new TBinaryProtocol.Factory(), - DefaultLogDelegate) - { - } - - public TServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) - :this(processor, - serverTransport, - transportFactory, - transportFactory, - protocolFactory, - protocolFactory, - DefaultLogDelegate) - { - } - - public TServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - LogDelegate logDelegate) - { - this.processor = processor; - this.serverTransport = serverTransport; - this.inputTransportFactory = inputTransportFactory; - this.outputTransportFactory = outputTransportFactory; - this.inputProtocolFactory = inputProtocolFactory; - this.outputProtocolFactory = outputProtocolFactory; - this.logDelegate = (logDelegate != null) ? logDelegate : DefaultLogDelegate; - } - - /** - * The run method fires up the server and gets things going. - */ - public abstract void Serve(); - - public abstract void Stop(); - - protected static void DefaultLogDelegate(string s) - { - Console.Error.WriteLine(s); - } - } + public abstract class TServer + { + //Attributes + protected TProcessor processor; + protected TServerTransport serverTransport; + protected TTransportFactory inputTransportFactory; + protected TTransportFactory outputTransportFactory; + protected TProtocolFactory inputProtocolFactory; + protected TProtocolFactory outputProtocolFactory; + protected TServerEventHandler serverEventHandler = null; + + //Methods + public void setEventHandler(TServerEventHandler seh) + { + serverEventHandler = seh; + } + public TServerEventHandler getEventHandler() + { + return serverEventHandler; + } + + //Log delegation + public delegate void LogDelegate(string str); + private LogDelegate _logDelegate; + protected LogDelegate logDelegate + { + get { return _logDelegate; } + set { _logDelegate = (value != null) ? value : DefaultLogDelegate; } + } + protected static void DefaultLogDelegate(string s) + { + Console.Error.WriteLine(s); + } + + //Construction + public TServer(TProcessor processor, + TServerTransport serverTransport) + : this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate) + { + } + + public TServer(TProcessor processor, + TServerTransport serverTransport, + LogDelegate logDelegate) + : this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate) + { + } + + public TServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory) + : this(processor, + serverTransport, + transportFactory, + transportFactory, + new TBinaryProtocol.Factory(), + new TBinaryProtocol.Factory(), + DefaultLogDelegate) + { + } + + public TServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(processor, + serverTransport, + transportFactory, + transportFactory, + protocolFactory, + protocolFactory, + DefaultLogDelegate) + { + } + + public TServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + LogDelegate logDelegate) + { + this.processor = processor; + this.serverTransport = serverTransport; + this.inputTransportFactory = inputTransportFactory; + this.outputTransportFactory = outputTransportFactory; + this.inputProtocolFactory = inputProtocolFactory; + this.outputProtocolFactory = outputProtocolFactory; + this.logDelegate = (logDelegate != null) ? logDelegate : DefaultLogDelegate; + } + + //Abstract Interface + public abstract void Serve(); + public abstract void Stop(); + } } - diff --git a/lib/csharp/src/Server/TServerEventHandler.cs b/lib/csharp/src/Server/TServerEventHandler.cs new file mode 100644 index 00000000..843b166a --- /dev/null +++ b/lib/csharp/src/Server/TServerEventHandler.cs @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * Contains some contributions under the Thrift Software License. + * Please see doc/old-thrift-license.txt in the Thrift distribution for + * details. + */ + +using System; + +namespace Thrift.Server +{ + /// + /// Interface implemented by server users to handle events from the server + /// + public interface TServerEventHandler + { + /// + /// Called before the server begins */ + /// + void preServe(); + /// + /// Called when a new client has connected and is about to being processing */ + /// + Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output); + /// + /// Called when a client has finished request-handling to delete server context */ + /// + void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output); + /// + /// Called when a client is about to call the processor */ + /// + void processContext(Object serverContext, Thrift.Transport.TTransport transport); + }; +} diff --git a/lib/csharp/src/Server/TSimpleServer.cs b/lib/csharp/src/Server/TSimpleServer.cs index 1099fc1d..75f8241c 100644 --- a/lib/csharp/src/Server/TSimpleServer.cs +++ b/lib/csharp/src/Server/TSimpleServer.cs @@ -27,122 +27,132 @@ using Thrift.Protocol; namespace Thrift.Server { - /// - /// Simple single-threaded server for testing - /// - public class TSimpleServer : TServer - { - private bool stop = false; + /// + /// Simple single-threaded server for testing + /// + public class TSimpleServer : TServer + { + private bool stop = false; - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport) - :base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate) - { - } + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport) + : base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate) + { + } - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport, - LogDelegate logDel) - : base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), logDel) - { - } + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport, + LogDelegate logDel) + : base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), logDel) + { + } - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory) - :base(processor, - serverTransport, - transportFactory, - transportFactory, - new TBinaryProtocol.Factory(), - new TBinaryProtocol.Factory(), - DefaultLogDelegate) - { - } + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory) + : base(processor, + serverTransport, + transportFactory, + transportFactory, + new TBinaryProtocol.Factory(), + new TBinaryProtocol.Factory(), + DefaultLogDelegate) + { + } - public TSimpleServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) - :base(processor, - serverTransport, - transportFactory, - transportFactory, - protocolFactory, - protocolFactory, - DefaultLogDelegate) - { - } + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : base(processor, + serverTransport, + transportFactory, + transportFactory, + protocolFactory, + protocolFactory, + DefaultLogDelegate) + { + } - public override void Serve() - { - try - { - serverTransport.Listen(); - } - catch (TTransportException ttx) - { - logDelegate(ttx.ToString()); - return; - } + public override void Serve() + { + try + { + serverTransport.Listen(); + } + catch (TTransportException ttx) + { + logDelegate(ttx.ToString()); + return; + } + + //Fire the preServe server event when server is up but before any client connections + if (serverEventHandler != null) + serverEventHandler.preServe(); - while (!stop) - { - TTransport client = null; - TTransport inputTransport = null; - TTransport outputTransport = null; - TProtocol inputProtocol = null; - TProtocol outputProtocol = null; - try - { - using(client = serverTransport.Accept()) + while (!stop) + { + TTransport client = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + Object connectionContext = null; + try + { + using (client = serverTransport.Accept()) { if (client != null) { - using(inputTransport = inputTransportFactory.GetTransport(client)) + using (inputTransport = inputTransportFactory.GetTransport(client)) { using (outputTransport = outputTransportFactory.GetTransport(client)) { inputProtocol = inputProtocolFactory.GetProtocol(inputTransport); outputProtocol = outputProtocolFactory.GetProtocol(outputTransport); - while (processor.Process(inputProtocol, outputProtocol)) { } + + //Recover event handler (if any) and fire createContext server event when a client connects + if (serverEventHandler != null) + connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol); + + //Process client requests until client disconnects + while (true) + { + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. + if (serverEventHandler != null) + serverEventHandler.processContext(connectionContext, inputTransport); + //Process client request (blocks until transport is readable) + if (!processor.Process(inputProtocol, outputProtocol)) + break; + } } } } } } - catch (TTransportException ttx) + catch (TTransportException) { - // Client died, just move on - if (stop) - { - logDelegate("TSimpleServer was shutting down, caught " + ttx.GetType().Name); - } + //Usually a client disconnect, expected } catch (Exception x) { + //Unexpected logDelegate(x.ToString()); } - } - if (stop) - { - try - { - serverTransport.Close(); - } - catch (TTransportException ttx) - { - logDelegate("TServerTranport failed on close: " + ttx.Message); - } - stop = false; - } - } + //Fire deleteContext server event after client disconnects + if (serverEventHandler != null) + serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); + } + } - public override void Stop() - { - stop = true; - serverTransport.Close(); - } - } + public override void Stop() + { + stop = true; + serverTransport.Close(); + } + } } diff --git a/lib/csharp/src/Server/TThreadPoolServer.cs b/lib/csharp/src/Server/TThreadPoolServer.cs index 7ddabf7d..f26a683d 100644 --- a/lib/csharp/src/Server/TThreadPoolServer.cs +++ b/lib/csharp/src/Server/TThreadPoolServer.cs @@ -28,167 +28,186 @@ using Thrift.Transport; namespace Thrift.Server { - /// - /// Server that uses C# built-in ThreadPool to spawn threads when handling requests - /// - public class TThreadPoolServer : TServer - { - private const int DEFAULT_MIN_THREADS = 10; - private const int DEFAULT_MAX_THREADS = 100; - private volatile bool stop = false; - - public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport) - :this(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), - DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate) - { - } - - public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate) - : this(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), - DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate) - { - } - - - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) - :this(processor, serverTransport, - transportFactory, transportFactory, - protocolFactory, protocolFactory, - DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate) - { - } - - public TThreadPoolServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel) - :base(processor, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, logDel) - { - lock (typeof(TThreadPoolServer)) - { - if (!ThreadPool.SetMaxThreads(maxThreadPoolThreads, maxThreadPoolThreads)) - { - throw new Exception("Error: could not SetMaxThreads in ThreadPool"); - } - if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads)) - { - throw new Exception("Error: could not SetMinThreads in ThreadPool"); - } - } - } - - - /// - /// Use new ThreadPool thread for each new client connection - /// - public override void Serve() - { - try - { - serverTransport.Listen(); - } - catch (TTransportException ttx) - { - logDelegate("Error, could not listen on ServerTransport: " + ttx); - return; - } - - while (!stop) - { - int failureCount = 0; - try - { - TTransport client = serverTransport.Accept(); - ThreadPool.QueueUserWorkItem(this.Execute, client); - } - catch (TTransportException ttx) - { - if (stop) - { - logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name); - } - else - { - ++failureCount; - logDelegate(ttx.ToString()); - } - - } - } - - if (stop) - { - try - { - serverTransport.Close(); - } - catch (TTransportException ttx) - { - logDelegate("TServerTransport failed on close: " + ttx.Message); - } - stop = false; - } - } - - /// - /// Loops on processing a client forever - /// threadContext will be a TTransport instance - /// - /// - private void Execute(Object threadContext) - { - TTransport client = (TTransport)threadContext; - TTransport inputTransport = null; - TTransport outputTransport = null; - TProtocol inputProtocol = null; - TProtocol outputProtocol = null; - try - { - inputTransport = inputTransportFactory.GetTransport(client); - outputTransport = outputTransportFactory.GetTransport(client); - inputProtocol = inputProtocolFactory.GetProtocol(inputTransport); - outputProtocol = outputProtocolFactory.GetProtocol(outputTransport); - while (processor.Process(inputProtocol, outputProtocol)) - { - //keep processing requests until client disconnects - } - } - catch (TTransportException) - { - // Assume the client died and continue silently - //Console.WriteLine(ttx); - } - - catch (Exception x) - { - logDelegate("Error: " + x); - } - - if (inputTransport != null) - { - inputTransport.Close(); - } - if (outputTransport != null) - { - outputTransport.Close(); - } - } - - public override void Stop() - { - stop = true; - serverTransport.Close(); - } - } + /// + /// Server that uses C# built-in ThreadPool to spawn threads when handling requests + /// + public class TThreadPoolServer : TServer + { + private const int DEFAULT_MIN_THREADS = 10; + private const int DEFAULT_MAX_THREADS = 100; + private volatile bool stop = false; + + public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport) + : this(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate) + { + } + + public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate) + : this(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate) + { + } + + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(processor, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate) + { + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel) + : base(processor, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, logDel) + { + lock (typeof(TThreadPoolServer)) + { + if (!ThreadPool.SetMaxThreads(maxThreadPoolThreads, maxThreadPoolThreads)) + { + throw new Exception("Error: could not SetMaxThreads in ThreadPool"); + } + if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads)) + { + throw new Exception("Error: could not SetMinThreads in ThreadPool"); + } + } + } + + + /// + /// Use new ThreadPool thread for each new client connection + /// + public override void Serve() + { + try + { + serverTransport.Listen(); + } + catch (TTransportException ttx) + { + logDelegate("Error, could not listen on ServerTransport: " + ttx); + return; + } + + //Fire the preServe server event when server is up but before any client connections + if (serverEventHandler != null) + serverEventHandler.preServe(); + + while (!stop) + { + int failureCount = 0; + try + { + TTransport client = serverTransport.Accept(); + ThreadPool.QueueUserWorkItem(this.Execute, client); + } + catch (TTransportException ttx) + { + if (stop) + { + logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name); + } + else + { + ++failureCount; + logDelegate(ttx.ToString()); + } + + } + } + + if (stop) + { + try + { + serverTransport.Close(); + } + catch (TTransportException ttx) + { + logDelegate("TServerTransport failed on close: " + ttx.Message); + } + stop = false; + } + } + + /// + /// Loops on processing a client forever + /// threadContext will be a TTransport instance + /// + /// + private void Execute(Object threadContext) + { + TTransport client = (TTransport)threadContext; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + Object connectionContext = null; + try + { + inputTransport = inputTransportFactory.GetTransport(client); + outputTransport = outputTransportFactory.GetTransport(client); + inputProtocol = inputProtocolFactory.GetProtocol(inputTransport); + outputProtocol = outputProtocolFactory.GetProtocol(outputTransport); + + //Recover event handler (if any) and fire createContext server event when a client connects + if (serverEventHandler != null) + connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol); + + //Process client requests until client disconnects + while (true) + { + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. + if (serverEventHandler != null) + serverEventHandler.processContext(connectionContext, inputTransport); + //Process client request (blocks until transport is readable) + if (!processor.Process(inputProtocol, outputProtocol)) + break; + } + } + catch (TTransportException) + { + //Usually a client disconnect, expected + } + catch (Exception x) + { + //Unexpected + logDelegate("Error: " + x); + } + + //Fire deleteContext server event after client disconnects + if (serverEventHandler != null) + serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); + + //Close transports + if (inputTransport != null) + inputTransport.Close(); + if (outputTransport != null) + outputTransport.Close(); + } + + public override void Stop() + { + stop = true; + serverTransport.Close(); + } + } } diff --git a/lib/csharp/src/Server/TThreadedServer.cs b/lib/csharp/src/Server/TThreadedServer.cs index 8e73bb76..5e707f53 100644 --- a/lib/csharp/src/Server/TThreadedServer.cs +++ b/lib/csharp/src/Server/TThreadedServer.cs @@ -26,204 +26,229 @@ using Thrift.Transport; namespace Thrift.Server { - /// - /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests - /// - public class TThreadedServer : TServer - { - private const int DEFAULT_MAX_THREADS = 100; - private volatile bool stop = false; - private readonly int maxThreads; - - private Queue clientQueue; - private THashSet clientThreads; - private object clientLock; - private Thread workerThread; - - public TThreadedServer(TProcessor processor, TServerTransport serverTransport) - : this(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), - DEFAULT_MAX_THREADS, DefaultLogDelegate) - { - } - - public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate) - : this(processor, serverTransport, - new TTransportFactory(), new TTransportFactory(), - new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), - DEFAULT_MAX_THREADS, logDelegate) - { - } - - - public TThreadedServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory transportFactory, - TProtocolFactory protocolFactory) - : this(processor, serverTransport, - transportFactory, transportFactory, - protocolFactory, protocolFactory, - DEFAULT_MAX_THREADS, DefaultLogDelegate) - { - } - - public TThreadedServer(TProcessor processor, - TServerTransport serverTransport, - TTransportFactory inputTransportFactory, - TTransportFactory outputTransportFactory, - TProtocolFactory inputProtocolFactory, - TProtocolFactory outputProtocolFactory, - int maxThreads, LogDelegate logDel) - : base(processor, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory, logDel) - { - this.maxThreads = maxThreads; - clientQueue = new Queue(); - clientLock = new object(); - clientThreads = new THashSet(); - } - - /// - /// Use new Thread for each new client connection. block until numConnections < maxThreads - /// - public override void Serve() - { - try - { - //start worker thread - workerThread = new Thread(new ThreadStart(Execute)); - workerThread.Start(); - serverTransport.Listen(); - } - catch (TTransportException ttx) - { - logDelegate("Error, could not listen on ServerTransport: " + ttx); - return; - } - - while (!stop) - { - int failureCount = 0; - try - { - TTransport client = serverTransport.Accept(); - lock (clientLock) - { - clientQueue.Enqueue(client); - Monitor.Pulse(clientLock); - } - } - catch (TTransportException ttx) - { - if (stop) - { - logDelegate("TThreadPoolServer was shutting down, caught " + ttx); - } - else - { - ++failureCount; - logDelegate(ttx.ToString()); - } - - } - } - - if (stop) - { - try - { - serverTransport.Close(); - } - catch (TTransportException ttx) - { - logDelegate("TServeTransport failed on close: " + ttx.Message); - } - stop = false; - } - } - - /// - /// Loops on processing a client forever - /// threadContext will be a TTransport instance - /// - /// - private void Execute() - { - while (!stop) - { - TTransport client; - Thread t; - lock (clientLock) - { - //don't dequeue if too many connections - while (clientThreads.Count >= maxThreads) - { - Monitor.Wait(clientLock); - } - - while (clientQueue.Count == 0) - { - Monitor.Wait(clientLock); - } - - client = clientQueue.Dequeue(); - t = new Thread(new ParameterizedThreadStart(ClientWorker)); - clientThreads.Add(t); - } - //start processing requests from client on new thread - t.Start(client); - } - } - - private void ClientWorker(Object context) - { - TTransport client = (TTransport)context; - TTransport inputTransport = null; - TTransport outputTransport = null; - TProtocol inputProtocol = null; - TProtocol outputProtocol = null; - try - { + /// + /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests + /// + public class TThreadedServer : TServer + { + private const int DEFAULT_MAX_THREADS = 100; + private volatile bool stop = false; + private readonly int maxThreads; + + private Queue clientQueue; + private THashSet clientThreads; + private object clientLock; + private Thread workerThread; + + public TThreadedServer(TProcessor processor, TServerTransport serverTransport) + : this(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + DEFAULT_MAX_THREADS, DefaultLogDelegate) + { + } + + public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate) + : this(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), + DEFAULT_MAX_THREADS, logDelegate) + { + } + + + public TThreadedServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) + : this(processor, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + DEFAULT_MAX_THREADS, DefaultLogDelegate) + { + } + + public TThreadedServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + int maxThreads, LogDelegate logDel) + : base(processor, serverTransport, inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, logDel) + { + this.maxThreads = maxThreads; + clientQueue = new Queue(); + clientLock = new object(); + clientThreads = new THashSet(); + } + + /// + /// Use new Thread for each new client connection. block until numConnections < maxThreads + /// + public override void Serve() + { + try + { + //start worker thread + workerThread = new Thread(new ThreadStart(Execute)); + workerThread.Start(); + serverTransport.Listen(); + } + catch (TTransportException ttx) + { + logDelegate("Error, could not listen on ServerTransport: " + ttx); + return; + } + + //Fire the preServe server event when server is up but before any client connections + if (serverEventHandler != null) + serverEventHandler.preServe(); + + while (!stop) + { + int failureCount = 0; + try + { + TTransport client = serverTransport.Accept(); + lock (clientLock) + { + clientQueue.Enqueue(client); + Monitor.Pulse(clientLock); + } + } + catch (TTransportException ttx) + { + if (stop) + { + logDelegate("TThreadPoolServer was shutting down, caught " + ttx); + } + else + { + ++failureCount; + logDelegate(ttx.ToString()); + } + + } + } + + if (stop) + { + try + { + serverTransport.Close(); + } + catch (TTransportException ttx) + { + logDelegate("TServeTransport failed on close: " + ttx.Message); + } + stop = false; + } + } + + /// + /// Loops on processing a client forever + /// threadContext will be a TTransport instance + /// + /// + private void Execute() + { + while (!stop) + { + TTransport client; + Thread t; + lock (clientLock) + { + //don't dequeue if too many connections + while (clientThreads.Count >= maxThreads) + { + Monitor.Wait(clientLock); + } + + while (clientQueue.Count == 0) + { + Monitor.Wait(clientLock); + } + + client = clientQueue.Dequeue(); + t = new Thread(new ParameterizedThreadStart(ClientWorker)); + clientThreads.Add(t); + } + //start processing requests from client on new thread + t.Start(client); + } + } + + private void ClientWorker(Object context) + { + TTransport client = (TTransport)context; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + Object connectionContext = null; + try + { using (inputTransport = inputTransportFactory.GetTransport(client)) { using (outputTransport = outputTransportFactory.GetTransport(client)) { inputProtocol = inputProtocolFactory.GetProtocol(inputTransport); outputProtocol = outputProtocolFactory.GetProtocol(outputTransport); - while (processor.Process(inputProtocol, outputProtocol)) + + //Recover event handler (if any) and fire createContext server event when a client connects + if (serverEventHandler != null) + connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol); + + //Process client requests until client disconnects + while (true) { - //keep processing requests until client disconnects + //Fire processContext server event + //N.B. This is the pattern implemented in C++ and the event fires provisionally. + //That is to say it may be many minutes between the event firing and the client request + //actually arriving or the client may hang up without ever makeing a request. + if (serverEventHandler != null) + serverEventHandler.processContext(connectionContext, inputTransport); + //Process client request (blocks until transport is readable) + if (!processor.Process(inputProtocol, outputProtocol)) + break; } } } - } - catch (TTransportException) - { - } - catch (Exception x) - { - logDelegate("Error: " + x); - } - - lock (clientLock) - { - clientThreads.Remove(Thread.CurrentThread); - Monitor.Pulse(clientLock); - } - return; - } - - public override void Stop() - { - stop = true; - serverTransport.Close(); - //clean up all the threads myself - workerThread.Abort(); - foreach (Thread t in clientThreads) - { - t.Abort(); - } - } - } + } + catch (TTransportException) + { + //Usually a client disconnect, expected + } + catch (Exception x) + { + //Unexpected + logDelegate("Error: " + x); + } + + //Fire deleteContext server event after client disconnects + if (serverEventHandler != null) + serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); + + lock (clientLock) + { + clientThreads.Remove(Thread.CurrentThread); + Monitor.Pulse(clientLock); + } + return; + } + + public override void Stop() + { + stop = true; + serverTransport.Close(); + //clean up all the threads myself + workerThread.Abort(); + foreach (Thread t in clientThreads) + { + t.Abort(); + } + } + } } diff --git a/lib/csharp/src/Thrift.csproj b/lib/csharp/src/Thrift.csproj index 1f108680..195005a1 100644 --- a/lib/csharp/src/Thrift.csproj +++ b/lib/csharp/src/Thrift.csproj @@ -103,6 +103,7 @@ + diff --git a/lib/csharp/test/ThriftTest/TestServer.cs b/lib/csharp/test/ThriftTest/TestServer.cs index f0f539fa..92ef3740 100644 --- a/lib/csharp/test/ThriftTest/TestServer.cs +++ b/lib/csharp/test/ThriftTest/TestServer.cs @@ -34,6 +34,28 @@ namespace Test { public class TestServer { + public class TradeServerEventHandler : TServerEventHandler + { + public int callCount = 0; + public void preServe() + { + callCount++; + } + public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output) + { + callCount++; + return null; + } + public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output) + { + callCount++; + } + public void processContext(Object serverContext, Thrift.Transport.TTransport transport) + { + callCount++; + } + }; + public class TestHandler : ThriftTest.Iface { public TServer server; @@ -400,6 +422,10 @@ namespace Test // Threaded Server // serverEngine = new TThreadedServer(testProcessor, tServerSocket); + //Server event handler + TradeServerEventHandler serverEvents = new TradeServerEventHandler(); + serverEngine.setEventHandler(serverEvents); + testHandler.server = serverEngine; // Run it -- 2.17.1