THRIFT-2624: Add TServerEventHandler support to C#
authorRandy Abernethy <randy@rx-m.com>
Sun, 13 Jul 2014 16:50:19 +0000 (09:50 -0700)
committerRandy Abernethy <randy@rx-m.com>
Sun, 13 Jul 2014 16:50:19 +0000 (09:50 -0700)
Client: C#
Patch: ra

Adds the TServerEventHandler interface to the C# lib and adds
support in all C# servers.

lib/csharp/Makefile.am
lib/csharp/src/Server/TServer.cs
lib/csharp/src/Server/TServerEventHandler.cs [new file with mode: 0644]
lib/csharp/src/Server/TSimpleServer.cs
lib/csharp/src/Server/TThreadPoolServer.cs
lib/csharp/src/Server/TThreadedServer.cs
lib/csharp/src/Thrift.csproj
lib/csharp/test/ThriftTest/TestServer.cs

index 069e48c..1c75aa1 100644 (file)
@@ -46,6 +46,7 @@ THRIFTCODE= \
             src/Server/TThreadPoolServer.cs \
             src/Server/TSimpleServer.cs \
             src/Server/TServer.cs \
+            src/Server/TServerEventHandler.cs \
             src/Transport/TBufferedTransport.cs \
             src/Transport/TTransport.cs \
             src/Transport/TSocket.cs \
index 271b0c4..a2631a9 100644 (file)
@@ -28,118 +28,100 @@ using System.IO;
 
 namespace Thrift.Server
 {
-       public abstract class TServer
-       {
-               /**
-                * Core processor
-                */
-               protected TProcessor processor;
-
-               /**
-                * Server transport
-                */
-               protected TServerTransport serverTransport;
-
-               /**
-                * Input Transport Factory
-                */
-               protected TTransportFactory inputTransportFactory;
-
-               /**
-                * Output Transport Factory
-                */
-               protected TTransportFactory outputTransportFactory;
-
-               /**
-                * Input Protocol Factory
-                */
-               protected TProtocolFactory inputProtocolFactory;
-
-               /**
-                * Output Protocol Factory
-                */
-               protected TProtocolFactory outputProtocolFactory;
-
-        public delegate void LogDelegate(string str);
-        private LogDelegate _logDelegate;
-        protected LogDelegate logDelegate
-        {
-            get { return _logDelegate; }
-            set { _logDelegate = (value != null) ? value : DefaultLogDelegate; } 
-        }
-
-               /**
-                * Default constructors.
-                */
-
-               public TServer(TProcessor processor,
-                                                 TServerTransport serverTransport)
-                       :this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
-               {
-               }
-
-               public TServer(TProcessor processor,
-                                               TServerTransport serverTransport,
-                                               LogDelegate logDelegate)
-                       : this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
-               {
-               }
-
-               public TServer(TProcessor processor,
-                                                 TServerTransport serverTransport,
-                                                 TTransportFactory transportFactory)
-                       :this(processor,
-                                serverTransport,
-                                transportFactory,
-                                transportFactory,
-                                new TBinaryProtocol.Factory(),
-                                new TBinaryProtocol.Factory(),
-                                DefaultLogDelegate)
-               {
-               }
-
-               public TServer(TProcessor processor,
-                                                 TServerTransport serverTransport,
-                                                 TTransportFactory transportFactory,
-                                                 TProtocolFactory protocolFactory)
-                       :this(processor,
-                                serverTransport,
-                                transportFactory,
-                                transportFactory,
-                                protocolFactory,
-                                protocolFactory,
-                            DefaultLogDelegate)
-               {
-               }
-
-               public TServer(TProcessor processor,
-                                                 TServerTransport serverTransport,
-                                                 TTransportFactory inputTransportFactory,
-                                                 TTransportFactory outputTransportFactory,
-                                                 TProtocolFactory inputProtocolFactory,
-                                                 TProtocolFactory outputProtocolFactory,
-                                                 LogDelegate logDelegate)
-               {
-                       this.processor = processor;
-                       this.serverTransport = serverTransport;
-                       this.inputTransportFactory = inputTransportFactory;
-                       this.outputTransportFactory = outputTransportFactory;
-                       this.inputProtocolFactory = inputProtocolFactory;
-                       this.outputProtocolFactory = outputProtocolFactory;
-                       this.logDelegate = (logDelegate != null) ? logDelegate : DefaultLogDelegate;
-               }
-
-               /**
-                * The run method fires up the server and gets things going.
-                */
-               public abstract void Serve();
-
-               public abstract void Stop();
-
-               protected static void DefaultLogDelegate(string s)
-               {
-                       Console.Error.WriteLine(s);
-               }
-       }
+  public abstract class TServer
+  {
+    //Attributes
+    protected TProcessor processor;
+    protected TServerTransport serverTransport;
+    protected TTransportFactory inputTransportFactory;
+    protected TTransportFactory outputTransportFactory;
+    protected TProtocolFactory inputProtocolFactory;
+    protected TProtocolFactory outputProtocolFactory;
+    protected TServerEventHandler serverEventHandler = null;
+
+    //Methods
+    public void setEventHandler(TServerEventHandler seh)
+    {
+      serverEventHandler = seh;
+    }
+    public TServerEventHandler getEventHandler()
+    {
+      return serverEventHandler;
+    }
+
+    //Log delegation
+    public delegate void LogDelegate(string str);
+    private LogDelegate _logDelegate;
+    protected LogDelegate logDelegate
+    {
+      get { return _logDelegate; }
+      set { _logDelegate = (value != null) ? value : DefaultLogDelegate; }
+    }
+    protected static void DefaultLogDelegate(string s)
+    {
+      Console.Error.WriteLine(s);
+    }
+
+    //Construction
+    public TServer(TProcessor processor,
+              TServerTransport serverTransport)
+      : this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+    {
+    }
+
+    public TServer(TProcessor processor,
+            TServerTransport serverTransport,
+            LogDelegate logDelegate)
+      : this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+    {
+    }
+
+    public TServer(TProcessor processor,
+              TServerTransport serverTransport,
+              TTransportFactory transportFactory)
+      : this(processor,
+         serverTransport,
+         transportFactory,
+         transportFactory,
+         new TBinaryProtocol.Factory(),
+         new TBinaryProtocol.Factory(),
+         DefaultLogDelegate)
+    {
+    }
+
+    public TServer(TProcessor processor,
+              TServerTransport serverTransport,
+              TTransportFactory transportFactory,
+              TProtocolFactory protocolFactory)
+      : this(processor,
+         serverTransport,
+         transportFactory,
+         transportFactory,
+         protocolFactory,
+         protocolFactory,
+           DefaultLogDelegate)
+    {
+    }
+
+    public TServer(TProcessor processor,
+              TServerTransport serverTransport,
+              TTransportFactory inputTransportFactory,
+              TTransportFactory outputTransportFactory,
+              TProtocolFactory inputProtocolFactory,
+              TProtocolFactory outputProtocolFactory,
+              LogDelegate logDelegate)
+    {
+      this.processor = processor;
+      this.serverTransport = serverTransport;
+      this.inputTransportFactory = inputTransportFactory;
+      this.outputTransportFactory = outputTransportFactory;
+      this.inputProtocolFactory = inputProtocolFactory;
+      this.outputProtocolFactory = outputProtocolFactory;
+      this.logDelegate = (logDelegate != null) ? logDelegate : DefaultLogDelegate;
+    }
+
+    //Abstract Interface
+    public abstract void Serve();
+    public abstract void Stop();
+  }
 }
-
diff --git a/lib/csharp/src/Server/TServerEventHandler.cs b/lib/csharp/src/Server/TServerEventHandler.cs
new file mode 100644 (file)
index 0000000..843b166
--- /dev/null
@@ -0,0 +1,50 @@
+\feff/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+using System;
+
+namespace Thrift.Server
+{
+  /// <summary>
+  /// Interface implemented by server users to handle events from the server
+  /// </summary>
+  public interface TServerEventHandler
+  {
+    /// <summary>
+    /// Called before the server begins */
+    /// </summary>
+    void preServe();
+    /// <summary>
+    /// Called when a new client has connected and is about to being processing */
+    /// </summary>
+    Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output);
+    /// <summary>
+    /// Called when a client has finished request-handling to delete server context */
+    /// </summary>
+    void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output);
+    /// <summary>
+    /// Called when a client is about to call the processor */
+    /// </summary>
+    void processContext(Object serverContext, Thrift.Transport.TTransport transport);
+  };
+}
index 1099fc1..75f8241 100644 (file)
@@ -27,122 +27,132 @@ using Thrift.Protocol;
 
 namespace Thrift.Server
 {
-       /// <summary>
-       /// Simple single-threaded server for testing
-       /// </summary>
-       public class TSimpleServer : TServer
-       {
-               private bool stop = false;
+  /// <summary>
+  /// Simple single-threaded server for testing
+  /// </summary>
+  public class TSimpleServer : TServer
+  {
+    private bool stop = false;
 
-               public TSimpleServer(TProcessor processor,
-                                                 TServerTransport serverTransport)
-                       :base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
-               {
-               }
+    public TSimpleServer(TProcessor processor,
+              TServerTransport serverTransport)
+      base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+    {
+    }
 
-               public TSimpleServer(TProcessor processor,
-                                                       TServerTransport serverTransport,
-                                                       LogDelegate logDel)
-                       : base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), logDel)
-               {
-               }
+    public TSimpleServer(TProcessor processor,
+              TServerTransport serverTransport,
+              LogDelegate logDel)
+      : base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), logDel)
+    {
+    }
 
-               public TSimpleServer(TProcessor processor,
-                                                 TServerTransport serverTransport,
-                                                 TTransportFactory transportFactory)
-                       :base(processor,
-                                serverTransport,
-                                transportFactory,
-                                transportFactory,
-                                new TBinaryProtocol.Factory(),
-                                new TBinaryProtocol.Factory(),
-                            DefaultLogDelegate)
-               {
-               }
+    public TSimpleServer(TProcessor processor,
+              TServerTransport serverTransport,
+              TTransportFactory transportFactory)
+      base(processor,
+         serverTransport,
+         transportFactory,
+         transportFactory,
+         new TBinaryProtocol.Factory(),
+         new TBinaryProtocol.Factory(),
+           DefaultLogDelegate)
+    {
+    }
 
-               public TSimpleServer(TProcessor processor,
-                                                 TServerTransport serverTransport,
-                                                 TTransportFactory transportFactory,
-                                                 TProtocolFactory protocolFactory)
-                       :base(processor,
-                                serverTransport,
-                                transportFactory,
-                                transportFactory,
-                                protocolFactory,
-                                protocolFactory,
-                                DefaultLogDelegate)
-               {
-               }
+    public TSimpleServer(TProcessor processor,
+              TServerTransport serverTransport,
+              TTransportFactory transportFactory,
+              TProtocolFactory protocolFactory)
+      base(processor,
+         serverTransport,
+         transportFactory,
+         transportFactory,
+         protocolFactory,
+         protocolFactory,
+         DefaultLogDelegate)
+    {
+    }
 
-               public override void Serve()
-               {
-                       try
-                       {
-                               serverTransport.Listen();
-                       }
-                       catch (TTransportException ttx)
-                       {
-                               logDelegate(ttx.ToString());
-                               return;
-                       }
+    public override void Serve()
+    {
+      try
+      {
+        serverTransport.Listen();
+      }
+      catch (TTransportException ttx)
+      {
+        logDelegate(ttx.ToString());
+        return;
+      }
+
+      //Fire the preServe server event when server is up but before any client connections
+      if (serverEventHandler != null)
+        serverEventHandler.preServe();
 
-                       while (!stop)
-                       {
-                               TTransport client = null;
-                               TTransport inputTransport = null;
-                               TTransport outputTransport = null;
-                               TProtocol inputProtocol = null;
-                               TProtocol outputProtocol = null;
-                               try
-                               {
-          using(client = serverTransport.Accept())
+      while (!stop)
+      {
+        TTransport client = null;
+        TTransport inputTransport = null;
+        TTransport outputTransport = null;
+        TProtocol inputProtocol = null;
+        TProtocol outputProtocol = null;
+        Object connectionContext = null;
+        try
+        {
+          using (client = serverTransport.Accept())
           {
             if (client != null)
             {
-              using(inputTransport = inputTransportFactory.GetTransport(client))
+              using (inputTransport = inputTransportFactory.GetTransport(client))
               {
                 using (outputTransport = outputTransportFactory.GetTransport(client))
                 {
                   inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
                   outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
-                  while (processor.Process(inputProtocol, outputProtocol)) { }
+
+                  //Recover event handler (if any) and fire createContext server event when a client connects
+                  if (serverEventHandler != null)
+                    connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
+
+                  //Process client requests until client disconnects
+                  while (true)
+                  {
+                    //Fire processContext server event
+                    //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+                    //That is to say it may be many minutes between the event firing and the client request
+                    //actually arriving or the client may hang up without ever makeing a request.
+                    if (serverEventHandler != null)
+                      serverEventHandler.processContext(connectionContext, inputTransport);
+                    //Process client request (blocks until transport is readable)
+                    if (!processor.Process(inputProtocol, outputProtocol))
+                      break;
+                  }
                 }
               }
             }
           }
         }
-        catch (TTransportException ttx)
+        catch (TTransportException)
         {
-          // Client died, just move on
-          if (stop)
-          {
-            logDelegate("TSimpleServer was shutting down, caught " + ttx.GetType().Name);
-          }
+          //Usually a client disconnect, expected
         }
         catch (Exception x)
         {
+          //Unexpected
           logDelegate(x.ToString());
         }
-      }
 
-                       if (stop)
-                       {
-                               try
-                               {
-                                       serverTransport.Close();
-                               }
-                               catch (TTransportException ttx)
-                               {
-                                       logDelegate("TServerTranport failed on close: " + ttx.Message);
-                               }
-                               stop = false;
-                       }
-               }
+        //Fire deleteContext server event after client disconnects
+        if (serverEventHandler != null)
+          serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+      }
+    }
 
-               public override void Stop()
-               {
-                       stop = true;
-                       serverTransport.Close();
-               }
-       }
+    public override void Stop()
+    {
+      stop = true;
+      serverTransport.Close();
+    }
+  }
 }
index 7ddabf7..f26a683 100644 (file)
@@ -28,167 +28,186 @@ using Thrift.Transport;
 
 namespace Thrift.Server
 {
-       /// <summary>
-       /// Server that uses C# built-in ThreadPool to spawn threads when handling requests
-       /// </summary>
-       public class TThreadPoolServer : TServer
-       {
-               private const int DEFAULT_MIN_THREADS = 10;
-               private const int DEFAULT_MAX_THREADS = 100;
-               private volatile bool stop = false;
-
-               public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport)
-                       :this(processor, serverTransport,
-                                new TTransportFactory(), new TTransportFactory(),
-                                new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
-                                DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
-               {
-               }
-
-               public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
-                       : this(processor, serverTransport,
-                                new TTransportFactory(), new TTransportFactory(),
-                                new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
-                                DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate)
-               {
-               }
-
-
-               public TThreadPoolServer(TProcessor processor,
-                                                                TServerTransport serverTransport,
-                                                                TTransportFactory transportFactory,
-                                                                TProtocolFactory protocolFactory)
-                       :this(processor, serverTransport,
-                                transportFactory, transportFactory,
-                                protocolFactory, protocolFactory,
-                                DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
-               {
-               }
-
-               public TThreadPoolServer(TProcessor processor,
-                                                                TServerTransport serverTransport,
-                                                                TTransportFactory inputTransportFactory,
-                                                                TTransportFactory outputTransportFactory,
-                                                                TProtocolFactory inputProtocolFactory,
-                                                                TProtocolFactory outputProtocolFactory,
-                                                                int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
-                       :base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
-                                 inputProtocolFactory, outputProtocolFactory, logDel)
-               {
-                       lock (typeof(TThreadPoolServer))
-                       {
-                               if (!ThreadPool.SetMaxThreads(maxThreadPoolThreads, maxThreadPoolThreads))
-                               {
-                                       throw new Exception("Error: could not SetMaxThreads in ThreadPool");
-                               }
-                               if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads))
-                               {
-                                       throw new Exception("Error: could not SetMinThreads in ThreadPool");
-                               }
-                       }
-               }
-
-
-               /// <summary>
-               /// Use new ThreadPool thread for each new client connection
-               /// </summary>
-               public override void Serve()
-               {
-                       try
-                       {
-                               serverTransport.Listen();
-                       }
-                       catch (TTransportException ttx)
-                       {
-                               logDelegate("Error, could not listen on ServerTransport: " + ttx);
-                               return;
-                       }
-
-                       while (!stop)
-                       {
-                               int failureCount = 0;
-                               try
-                               {
-                                       TTransport client = serverTransport.Accept();
-                                       ThreadPool.QueueUserWorkItem(this.Execute, client);
-                               }
-                               catch (TTransportException ttx)
-                               {
-                                       if (stop)
-                                       {
-                                               logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name);
-                                       }
-                                       else
-                                       {
-                                               ++failureCount;
-                                               logDelegate(ttx.ToString());
-                                       }
-
-                               }
-                       }
-
-                       if (stop)
-                       {
-                               try
-                               {
-                                       serverTransport.Close();
-                               }
-                               catch (TTransportException ttx)
-                               {
-                                       logDelegate("TServerTransport failed on close: " + ttx.Message);
-                               }
-                               stop = false;
-                       }
-               }
-
-               /// <summary>
-               /// Loops on processing a client forever
-               /// threadContext will be a TTransport instance
-               /// </summary>
-               /// <param name="threadContext"></param>
-               private void Execute(Object threadContext)
-               {
-                       TTransport client = (TTransport)threadContext;
-                       TTransport inputTransport = null;
-                       TTransport outputTransport = null;
-                       TProtocol inputProtocol = null;
-                       TProtocol outputProtocol = null;
-                       try
-                       {
-                               inputTransport = inputTransportFactory.GetTransport(client);
-                               outputTransport = outputTransportFactory.GetTransport(client);
-                               inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
-                               outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
-                               while (processor.Process(inputProtocol, outputProtocol))
-                               {
-                                       //keep processing requests until client disconnects
-                               }
-                       }
-                       catch (TTransportException)
-                       {
-                               // Assume the client died and continue silently
-                               //Console.WriteLine(ttx);
-                       }
-                       
-                       catch (Exception x)
-                       {
-                               logDelegate("Error: " + x);
-                       }
-
-                       if (inputTransport != null)
-                       {
-                               inputTransport.Close();
-                       }
-                       if (outputTransport != null)
-                       {
-                               outputTransport.Close();
-                       }
-               }
-
-               public override void Stop()
-               {
-                       stop = true;
-                       serverTransport.Close();
-               }
-       }
+  /// <summary>
+  /// Server that uses C# built-in ThreadPool to spawn threads when handling requests
+  /// </summary>
+  public class TThreadPoolServer : TServer
+  {
+    private const int DEFAULT_MIN_THREADS = 10;
+    private const int DEFAULT_MAX_THREADS = 100;
+    private volatile bool stop = false;
+
+    public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport)
+      : this(processor, serverTransport,
+         new TTransportFactory(), new TTransportFactory(),
+         new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+         DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
+    {
+    }
+
+    public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
+      : this(processor, serverTransport,
+         new TTransportFactory(), new TTransportFactory(),
+         new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+         DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate)
+    {
+    }
+
+
+    public TThreadPoolServer(TProcessor processor,
+                 TServerTransport serverTransport,
+                 TTransportFactory transportFactory,
+                 TProtocolFactory protocolFactory)
+      : this(processor, serverTransport,
+         transportFactory, transportFactory,
+         protocolFactory, protocolFactory,
+         DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
+    {
+    }
+
+    public TThreadPoolServer(TProcessor processor,
+                 TServerTransport serverTransport,
+                 TTransportFactory inputTransportFactory,
+                 TTransportFactory outputTransportFactory,
+                 TProtocolFactory inputProtocolFactory,
+                 TProtocolFactory outputProtocolFactory,
+                 int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
+      : base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+          inputProtocolFactory, outputProtocolFactory, logDel)
+    {
+      lock (typeof(TThreadPoolServer))
+      {
+        if (!ThreadPool.SetMaxThreads(maxThreadPoolThreads, maxThreadPoolThreads))
+        {
+          throw new Exception("Error: could not SetMaxThreads in ThreadPool");
+        }
+        if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads))
+        {
+          throw new Exception("Error: could not SetMinThreads in ThreadPool");
+        }
+      }
+    }
+
+
+    /// <summary>
+    /// Use new ThreadPool thread for each new client connection
+    /// </summary>
+    public override void Serve()
+    {
+      try
+      {
+        serverTransport.Listen();
+      }
+      catch (TTransportException ttx)
+      {
+        logDelegate("Error, could not listen on ServerTransport: " + ttx);
+        return;
+      }
+
+      //Fire the preServe server event when server is up but before any client connections
+      if (serverEventHandler != null)
+        serverEventHandler.preServe();
+
+      while (!stop)
+      {
+        int failureCount = 0;
+        try
+        {
+          TTransport client = serverTransport.Accept();
+          ThreadPool.QueueUserWorkItem(this.Execute, client);
+        }
+        catch (TTransportException ttx)
+        {
+          if (stop)
+          {
+            logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name);
+          }
+          else
+          {
+            ++failureCount;
+            logDelegate(ttx.ToString());
+          }
+
+        }
+      }
+
+      if (stop)
+      {
+        try
+        {
+          serverTransport.Close();
+        }
+        catch (TTransportException ttx)
+        {
+          logDelegate("TServerTransport failed on close: " + ttx.Message);
+        }
+        stop = false;
+      }
+    }
+
+    /// <summary>
+    /// Loops on processing a client forever
+    /// threadContext will be a TTransport instance
+    /// </summary>
+    /// <param name="threadContext"></param>
+    private void Execute(Object threadContext)
+    {
+      TTransport client = (TTransport)threadContext;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+      Object connectionContext = null;
+      try
+      {
+        inputTransport = inputTransportFactory.GetTransport(client);
+        outputTransport = outputTransportFactory.GetTransport(client);
+        inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
+        outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
+
+        //Recover event handler (if any) and fire createContext server event when a client connects
+        if (serverEventHandler != null)
+          connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
+
+        //Process client requests until client disconnects
+        while (true)
+        {
+          //Fire processContext server event
+          //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+          //That is to say it may be many minutes between the event firing and the client request
+          //actually arriving or the client may hang up without ever makeing a request.
+          if (serverEventHandler != null)
+            serverEventHandler.processContext(connectionContext, inputTransport);
+          //Process client request (blocks until transport is readable)
+          if (!processor.Process(inputProtocol, outputProtocol))
+            break;
+        }
+      }
+      catch (TTransportException)
+      {
+        //Usually a client disconnect, expected
+      }
+      catch (Exception x)
+      {
+        //Unexpected
+        logDelegate("Error: " + x);
+      }
+
+      //Fire deleteContext server event after client disconnects
+      if (serverEventHandler != null)
+        serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+
+      //Close transports
+      if (inputTransport != null)
+        inputTransport.Close();
+      if (outputTransport != null)
+        outputTransport.Close();
+    }
+
+    public override void Stop()
+    {
+      stop = true;
+      serverTransport.Close();
+    }
+  }
 }
index 8e73bb7..5e707f5 100644 (file)
@@ -26,204 +26,229 @@ using Thrift.Transport;
 
 namespace Thrift.Server
 {
-       /// <summary>
-       /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
-       /// </summary>
-       public class TThreadedServer : TServer
-       {
-               private const int DEFAULT_MAX_THREADS = 100;
-               private volatile bool stop = false;
-               private readonly int maxThreads;
-
-               private Queue<TTransport> clientQueue;
-               private THashSet<Thread> clientThreads;
-               private object clientLock;
-               private Thread workerThread;
-
-               public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
-                       : this(processor, serverTransport,
-                                new TTransportFactory(), new TTransportFactory(),
-                                new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
-                                DEFAULT_MAX_THREADS, DefaultLogDelegate)
-               {
-               }
-
-               public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
-                       : this(processor, serverTransport,
-                                new TTransportFactory(), new TTransportFactory(),
-                                new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
-                                DEFAULT_MAX_THREADS, logDelegate)
-               {
-               }
-
-
-               public TThreadedServer(TProcessor processor,
-                                                                TServerTransport serverTransport,
-                                                                TTransportFactory transportFactory,
-                                                                TProtocolFactory protocolFactory)
-                       : this(processor, serverTransport,
-                                transportFactory, transportFactory,
-                                protocolFactory, protocolFactory,
-                                DEFAULT_MAX_THREADS, DefaultLogDelegate)
-               {
-               }
-
-               public TThreadedServer(TProcessor processor,
-                                                                TServerTransport serverTransport,
-                                                                TTransportFactory inputTransportFactory,
-                                                                TTransportFactory outputTransportFactory,
-                                                                TProtocolFactory inputProtocolFactory,
-                                                                TProtocolFactory outputProtocolFactory,
-                                                                int maxThreads, LogDelegate logDel)
-                       : base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
-                                 inputProtocolFactory, outputProtocolFactory, logDel)
-               {
-                       this.maxThreads = maxThreads;
-                       clientQueue = new Queue<TTransport>();
-                       clientLock = new object();
-                       clientThreads = new THashSet<Thread>();
-               }
-
-               /// <summary>
-               /// Use new Thread for each new client connection. block until numConnections < maxThreads
-               /// </summary>
-               public override void Serve()
-               {
-                       try
-                       {
-                               //start worker thread
-                               workerThread = new Thread(new ThreadStart(Execute));
-                               workerThread.Start();
-                               serverTransport.Listen();
-                       }
-                       catch (TTransportException ttx)
-                       {
-                               logDelegate("Error, could not listen on ServerTransport: " + ttx);
-                               return;
-                       }
-
-                       while (!stop)
-                       {
-                               int failureCount = 0;
-                               try
-                               {
-                                       TTransport client = serverTransport.Accept();
-                                       lock (clientLock)
-                                       {
-                                               clientQueue.Enqueue(client);
-                                               Monitor.Pulse(clientLock);
-                                       }
-                               }
-                               catch (TTransportException ttx)
-                               {
-                                       if (stop)
-                                       {
-                                               logDelegate("TThreadPoolServer was shutting down, caught " + ttx);
-                                       }
-                                       else
-                                       {
-                                               ++failureCount;
-                                               logDelegate(ttx.ToString());
-                                       }
-
-                               }
-                       }
-
-                       if (stop)
-                       {
-                               try
-                               {
-                                       serverTransport.Close();
-                               }
-                               catch (TTransportException ttx)
-                               {
-                                       logDelegate("TServeTransport failed on close: " + ttx.Message);
-                               }
-                               stop = false;
-                       }
-               }
-
-               /// <summary>
-               /// Loops on processing a client forever
-               /// threadContext will be a TTransport instance
-               /// </summary>
-               /// <param name="threadContext"></param>
-               private void Execute()
-               {
-                       while (!stop)
-                       {
-                               TTransport client;
-                               Thread t;
-                               lock (clientLock)
-                               {
-                                       //don't dequeue if too many connections
-                                       while (clientThreads.Count >= maxThreads)
-                                       {
-                                               Monitor.Wait(clientLock);
-                                       }
-
-                                       while (clientQueue.Count == 0)
-                                       {
-                                               Monitor.Wait(clientLock);
-                                       }
-
-                                       client = clientQueue.Dequeue();
-                                       t = new Thread(new ParameterizedThreadStart(ClientWorker));
-                                       clientThreads.Add(t);
-                               }
-                               //start processing requests from client on new thread
-                               t.Start(client);
-                       }
-               }
-
-               private void ClientWorker(Object context)
-               {
-                       TTransport client = (TTransport)context;
-                       TTransport inputTransport = null;
-                       TTransport outputTransport = null;
-                       TProtocol inputProtocol = null;
-                       TProtocol outputProtocol = null;
-                       try
-                       {
+  /// <summary>
+  /// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
+  /// </summary>
+  public class TThreadedServer : TServer
+  {
+    private const int DEFAULT_MAX_THREADS = 100;
+    private volatile bool stop = false;
+    private readonly int maxThreads;
+
+    private Queue<TTransport> clientQueue;
+    private THashSet<Thread> clientThreads;
+    private object clientLock;
+    private Thread workerThread;
+
+    public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
+      : this(processor, serverTransport,
+         new TTransportFactory(), new TTransportFactory(),
+         new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+         DEFAULT_MAX_THREADS, DefaultLogDelegate)
+    {
+    }
+
+    public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
+      : this(processor, serverTransport,
+         new TTransportFactory(), new TTransportFactory(),
+         new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+         DEFAULT_MAX_THREADS, logDelegate)
+    {
+    }
+
+
+    public TThreadedServer(TProcessor processor,
+                 TServerTransport serverTransport,
+                 TTransportFactory transportFactory,
+                 TProtocolFactory protocolFactory)
+      : this(processor, serverTransport,
+         transportFactory, transportFactory,
+         protocolFactory, protocolFactory,
+         DEFAULT_MAX_THREADS, DefaultLogDelegate)
+    {
+    }
+
+    public TThreadedServer(TProcessor processor,
+                 TServerTransport serverTransport,
+                 TTransportFactory inputTransportFactory,
+                 TTransportFactory outputTransportFactory,
+                 TProtocolFactory inputProtocolFactory,
+                 TProtocolFactory outputProtocolFactory,
+                 int maxThreads, LogDelegate logDel)
+      : base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+          inputProtocolFactory, outputProtocolFactory, logDel)
+    {
+      this.maxThreads = maxThreads;
+      clientQueue = new Queue<TTransport>();
+      clientLock = new object();
+      clientThreads = new THashSet<Thread>();
+    }
+
+    /// <summary>
+    /// Use new Thread for each new client connection. block until numConnections < maxThreads
+    /// </summary>
+    public override void Serve()
+    {
+      try
+      {
+        //start worker thread
+        workerThread = new Thread(new ThreadStart(Execute));
+        workerThread.Start();
+        serverTransport.Listen();
+      }
+      catch (TTransportException ttx)
+      {
+        logDelegate("Error, could not listen on ServerTransport: " + ttx);
+        return;
+      }
+
+      //Fire the preServe server event when server is up but before any client connections
+      if (serverEventHandler != null)
+        serverEventHandler.preServe();
+
+      while (!stop)
+      {
+        int failureCount = 0;
+        try
+        {
+          TTransport client = serverTransport.Accept();
+          lock (clientLock)
+          {
+            clientQueue.Enqueue(client);
+            Monitor.Pulse(clientLock);
+          }
+        }
+        catch (TTransportException ttx)
+        {
+          if (stop)
+          {
+            logDelegate("TThreadPoolServer was shutting down, caught " + ttx);
+          }
+          else
+          {
+            ++failureCount;
+            logDelegate(ttx.ToString());
+          }
+
+        }
+      }
+
+      if (stop)
+      {
+        try
+        {
+          serverTransport.Close();
+        }
+        catch (TTransportException ttx)
+        {
+          logDelegate("TServeTransport failed on close: " + ttx.Message);
+        }
+        stop = false;
+      }
+    }
+
+    /// <summary>
+    /// Loops on processing a client forever
+    /// threadContext will be a TTransport instance
+    /// </summary>
+    /// <param name="threadContext"></param>
+    private void Execute()
+    {
+      while (!stop)
+      {
+        TTransport client;
+        Thread t;
+        lock (clientLock)
+        {
+          //don't dequeue if too many connections
+          while (clientThreads.Count >= maxThreads)
+          {
+            Monitor.Wait(clientLock);
+          }
+
+          while (clientQueue.Count == 0)
+          {
+            Monitor.Wait(clientLock);
+          }
+
+          client = clientQueue.Dequeue();
+          t = new Thread(new ParameterizedThreadStart(ClientWorker));
+          clientThreads.Add(t);
+        }
+        //start processing requests from client on new thread
+        t.Start(client);
+      }
+    }
+
+    private void ClientWorker(Object context)
+    {
+      TTransport client = (TTransport)context;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+      Object connectionContext = null;
+      try
+      {
         using (inputTransport = inputTransportFactory.GetTransport(client))
         {
           using (outputTransport = outputTransportFactory.GetTransport(client))
           {
             inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
             outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
-            while (processor.Process(inputProtocol, outputProtocol))
+
+            //Recover event handler (if any) and fire createContext server event when a client connects
+            if (serverEventHandler != null)
+              connectionContext = serverEventHandler.createContext(inputProtocol, outputProtocol);
+
+            //Process client requests until client disconnects
+            while (true)
             {
-              //keep processing requests until client disconnects
+              //Fire processContext server event
+              //N.B. This is the pattern implemented in C++ and the event fires provisionally.
+              //That is to say it may be many minutes between the event firing and the client request
+              //actually arriving or the client may hang up without ever makeing a request.
+              if (serverEventHandler != null)
+                serverEventHandler.processContext(connectionContext, inputTransport);
+              //Process client request (blocks until transport is readable)
+              if (!processor.Process(inputProtocol, outputProtocol))
+                break;
             }
           }
         }
-                       }
-                       catch (TTransportException)
-                       {
-                       }
-                       catch (Exception x)
-                       {
-                               logDelegate("Error: " + x);
-                       }
-
-                       lock (clientLock)
-                       {
-                               clientThreads.Remove(Thread.CurrentThread);
-                               Monitor.Pulse(clientLock);
-                       }
-                       return;
-               }
-
-               public override void Stop()
-               {
-                       stop = true;
-                       serverTransport.Close();
-                       //clean up all the threads myself
-                       workerThread.Abort();
-                       foreach (Thread t in clientThreads)
-                       {
-                               t.Abort();
-                       }
-               }
-       }
+      }
+      catch (TTransportException)
+      {
+        //Usually a client disconnect, expected
+      }
+      catch (Exception x)
+      {
+        //Unexpected
+        logDelegate("Error: " + x);
+      }
+
+      //Fire deleteContext server event after client disconnects
+      if (serverEventHandler != null)
+        serverEventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
+
+      lock (clientLock)
+      {
+        clientThreads.Remove(Thread.CurrentThread);
+        Monitor.Pulse(clientLock);
+      }
+      return;
+    }
+
+    public override void Stop()
+    {
+      stop = true;
+      serverTransport.Close();
+      //clean up all the threads myself
+      workerThread.Abort();
+      foreach (Thread t in clientThreads)
+      {
+        t.Abort();
+      }
+    }
+  }
 }
index 1f10868..195005a 100644 (file)
     <Compile Include="Protocol\TType.cs" />
     <Compile Include="Server\TThreadedServer.cs" />
     <Compile Include="Server\TServer.cs" />
+    <Compile Include="Server\TServerEventHandler.cs" />
     <Compile Include="Server\TSimpleServer.cs" />
     <Compile Include="Server\TThreadPoolServer.cs" />
     <Compile Include="TException.cs" />
index f0f539f..92ef374 100644 (file)
@@ -34,6 +34,28 @@ namespace Test
 {
        public class TestServer
        {
+    public class TradeServerEventHandler : TServerEventHandler
+    {
+      public int callCount = 0;
+      public void preServe()
+      {
+        callCount++;
+      }
+      public Object createContext(Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
+      {
+        callCount++;
+        return null;
+      }
+      public void deleteContext(Object serverContext, Thrift.Protocol.TProtocol input, Thrift.Protocol.TProtocol output)
+      {
+        callCount++;
+      }
+      public void processContext(Object serverContext, Thrift.Transport.TTransport transport)
+      {
+        callCount++;
+      }
+    };
+
                public class TestHandler : ThriftTest.Iface
                {
                        public TServer server;
@@ -400,6 +422,10 @@ namespace Test
                                // Threaded Server
                                // serverEngine = new TThreadedServer(testProcessor, tServerSocket);
 
+        //Server event handler
+        TradeServerEventHandler serverEvents = new TradeServerEventHandler();
+        serverEngine.setEventHandler(serverEvents);
+
                                testHandler.server = serverEngine;
 
                                // Run it