Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/csharp/src/Server/TServer.cs b/lib/csharp/src/Server/TServer.cs
new file mode 100644
index 0000000..61a9416
--- /dev/null
+++ b/lib/csharp/src/Server/TServer.cs
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Thrift.Protocol;
+using Thrift.Transport;
+using System.IO;
+
+namespace Thrift.Server
+{
+	public abstract class TServer
+	{
+		/**
+		 * Core processor
+		 */
+		protected TProcessor processor;
+
+		/**
+		 * Server transport
+		 */
+		protected TServerTransport serverTransport;
+
+		/**
+		 * Input Transport Factory
+		 */
+		protected TTransportFactory inputTransportFactory;
+
+		/**
+		 * Output Transport Factory
+		 */
+		protected TTransportFactory outputTransportFactory;
+
+		/**
+		 * Input Protocol Factory
+		 */
+		protected TProtocolFactory inputProtocolFactory;
+
+		/**
+		 * Output Protocol Factory
+		 */
+		protected TProtocolFactory outputProtocolFactory;
+		public delegate void LogDelegate(string str);
+		protected LogDelegate logDelegate;
+
+		/**
+		 * Default constructors.
+		 */
+
+		public TServer(TProcessor processor,
+						  TServerTransport serverTransport)
+			:this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+		{
+		}
+
+		public TServer(TProcessor processor,
+						TServerTransport serverTransport,
+						LogDelegate logDelegate)
+			: this(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+		{
+		}
+
+		public TServer(TProcessor processor,
+						  TServerTransport serverTransport,
+						  TTransportFactory transportFactory)
+			:this(processor,
+				 serverTransport,
+				 transportFactory,
+				 transportFactory,
+				 new TBinaryProtocol.Factory(),
+				 new TBinaryProtocol.Factory(),
+				 DefaultLogDelegate)
+		{
+		}
+
+		public TServer(TProcessor processor,
+						  TServerTransport serverTransport,
+						  TTransportFactory transportFactory,
+						  TProtocolFactory protocolFactory)
+			:this(processor,
+				 serverTransport,
+				 transportFactory,
+				 transportFactory,
+				 protocolFactory,
+				 protocolFactory,
+			     DefaultLogDelegate)
+		{
+		}
+
+		public TServer(TProcessor processor,
+						  TServerTransport serverTransport,
+						  TTransportFactory inputTransportFactory,
+						  TTransportFactory outputTransportFactory,
+						  TProtocolFactory inputProtocolFactory,
+						  TProtocolFactory outputProtocolFactory,
+						  LogDelegate logDelegate)
+		{
+			this.processor = processor;
+			this.serverTransport = serverTransport;
+			this.inputTransportFactory = inputTransportFactory;
+			this.outputTransportFactory = outputTransportFactory;
+			this.inputProtocolFactory = inputProtocolFactory;
+			this.outputProtocolFactory = outputProtocolFactory;
+			this.logDelegate = logDelegate;
+		}
+
+		/**
+		 * The run method fires up the server and gets things going.
+		 */
+		public abstract void Serve();
+
+		public abstract void Stop();
+
+		protected static void DefaultLogDelegate(string s)
+		{
+			Console.Error.WriteLine(s);
+		}
+	}
+}
+
diff --git a/lib/csharp/src/Server/TSimpleServer.cs b/lib/csharp/src/Server/TSimpleServer.cs
new file mode 100644
index 0000000..34a51de
--- /dev/null
+++ b/lib/csharp/src/Server/TSimpleServer.cs
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using Thrift.Transport;
+using Thrift.Protocol;
+
+namespace Thrift.Server
+{
+	/// <summary>
+	/// Simple single-threaded server for testing
+	/// </summary>
+	public class TSimpleServer : TServer
+	{
+		private bool stop = false;
+
+		public TSimpleServer(TProcessor processor,
+						  TServerTransport serverTransport)
+			:base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), DefaultLogDelegate)
+		{
+		}
+
+		public TSimpleServer(TProcessor processor,
+							TServerTransport serverTransport,
+							LogDelegate logDel)
+			: base(processor, serverTransport, new TTransportFactory(), new TTransportFactory(), new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(), logDel)
+		{
+		}
+
+		public TSimpleServer(TProcessor processor,
+						  TServerTransport serverTransport,
+						  TTransportFactory transportFactory)
+			:base(processor,
+				 serverTransport,
+				 transportFactory,
+				 transportFactory,
+				 new TBinaryProtocol.Factory(),
+				 new TBinaryProtocol.Factory(),
+			     DefaultLogDelegate)
+		{
+		}
+
+		public TSimpleServer(TProcessor processor,
+						  TServerTransport serverTransport,
+						  TTransportFactory transportFactory,
+						  TProtocolFactory protocolFactory)
+			:base(processor,
+				 serverTransport,
+				 transportFactory,
+				 transportFactory,
+				 protocolFactory,
+				 protocolFactory,
+				 DefaultLogDelegate)
+		{
+		}
+
+		public override void Serve()
+		{
+			try
+			{
+				serverTransport.Listen();
+			}
+			catch (TTransportException ttx)
+			{
+				logDelegate(ttx.ToString());
+				return;
+			}
+
+			while (!stop)
+			{
+				TTransport client = null;
+				TTransport inputTransport = null;
+				TTransport outputTransport = null;
+				TProtocol inputProtocol = null;
+				TProtocol outputProtocol = null;
+				try
+				{
+					client = serverTransport.Accept();
+					if (client != null)
+					{
+						inputTransport = inputTransportFactory.GetTransport(client);
+						outputTransport = outputTransportFactory.GetTransport(client);
+						inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
+						outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
+						while (processor.Process(inputProtocol, outputProtocol)) { }
+					}
+				}
+				catch (TTransportException ttx)
+				{
+					// Client died, just move on
+					if (stop)
+					{
+						logDelegate("TSimpleServer was shutting down, caught " + ttx.GetType().Name);
+					}
+				}
+				catch (Exception x)
+				{
+					logDelegate(x.ToString());
+				}
+
+				if (inputTransport != null)
+				{
+					inputTransport.Close();
+				}
+
+				if (outputTransport != null)
+				{
+					outputTransport.Close();
+				}
+			}
+
+			if (stop)
+			{
+				try
+				{
+					serverTransport.Close();
+				}
+				catch (TTransportException ttx)
+				{
+					logDelegate("TServerTranport failed on close: " + ttx.Message);
+				}
+				stop = false;
+			}
+		}
+
+		public override void Stop()
+		{
+			stop = true;
+			serverTransport.Close();
+		}
+	}
+}
diff --git a/lib/csharp/src/Server/TThreadPoolServer.cs b/lib/csharp/src/Server/TThreadPoolServer.cs
new file mode 100644
index 0000000..efc71f0
--- /dev/null
+++ b/lib/csharp/src/Server/TThreadPoolServer.cs
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Threading;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+	/// <summary>
+	/// Server that uses C# built-in ThreadPool to spawn threads when handling requests
+	/// </summary>
+	public class TThreadPoolServer : TServer
+	{
+		private const int DEFAULT_MIN_THREADS = 10;
+		private const int DEFAULT_MAX_THREADS = 100;
+		private volatile bool stop = false;
+
+		public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport)
+			:this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
+		{
+		}
+
+		public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
+			: this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate)
+		{
+		}
+
+
+		public TThreadPoolServer(TProcessor processor,
+								 TServerTransport serverTransport,
+								 TTransportFactory transportFactory,
+								 TProtocolFactory protocolFactory)
+			:this(processor, serverTransport,
+				 transportFactory, transportFactory,
+				 protocolFactory, protocolFactory,
+				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
+		{
+		}
+
+		public TThreadPoolServer(TProcessor processor,
+								 TServerTransport serverTransport,
+								 TTransportFactory inputTransportFactory,
+								 TTransportFactory outputTransportFactory,
+								 TProtocolFactory inputProtocolFactory,
+								 TProtocolFactory outputProtocolFactory,
+								 int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
+			:base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+				  inputProtocolFactory, outputProtocolFactory, logDel)
+		{
+			if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads))
+			{
+				throw new Exception("Error: could not SetMinThreads in ThreadPool");
+			}
+			if (!ThreadPool.SetMaxThreads(maxThreadPoolThreads, maxThreadPoolThreads))
+			{
+				throw new Exception("Error: could not SetMaxThreads in ThreadPool");
+			}
+		}
+
+		/// <summary>
+		/// Use new ThreadPool thread for each new client connection
+		/// </summary>
+		public override void Serve()
+		{
+			try
+			{
+				serverTransport.Listen();
+			}
+			catch (TTransportException ttx)
+			{
+				logDelegate("Error, could not listen on ServerTransport: " + ttx);
+				return;
+			}
+
+			while (!stop)
+			{
+				int failureCount = 0;
+				try
+				{
+					TTransport client = serverTransport.Accept();
+					ThreadPool.QueueUserWorkItem(this.Execute, client);
+				}
+				catch (TTransportException ttx)
+				{
+					if (stop)
+					{
+						logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name);
+					}
+					else
+					{
+						++failureCount;
+						logDelegate(ttx.ToString());
+					}
+
+				}
+			}
+
+			if (stop)
+			{
+				try
+				{
+					serverTransport.Close();
+				}
+				catch (TTransportException ttx)
+				{
+					logDelegate("TServerTransport failed on close: " + ttx.Message);
+				}
+				stop = false;
+			}
+		}
+
+		/// <summary>
+		/// Loops on processing a client forever
+		/// threadContext will be a TTransport instance
+		/// </summary>
+		/// <param name="threadContext"></param>
+		private void Execute(Object threadContext)
+		{
+			TTransport client = (TTransport)threadContext;
+			TTransport inputTransport = null;
+			TTransport outputTransport = null;
+			TProtocol inputProtocol = null;
+			TProtocol outputProtocol = null;
+			try
+			{
+				inputTransport = inputTransportFactory.GetTransport(client);
+				outputTransport = outputTransportFactory.GetTransport(client);
+				inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
+				outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
+				while (processor.Process(inputProtocol, outputProtocol))
+				{
+					//keep processing requests until client disconnects
+				}
+			}
+			catch (TTransportException)
+			{
+				// Assume the client died and continue silently
+				//Console.WriteLine(ttx);
+			}
+			
+			catch (Exception x)
+			{
+				logDelegate("Error: " + x);
+			}
+
+			if (inputTransport != null)
+			{
+				inputTransport.Close();
+			}
+			if (outputTransport != null)
+			{
+				outputTransport.Close();
+			}
+		}
+
+		public override void Stop()
+		{
+			stop = true;
+			serverTransport.Close();
+		}
+	}
+}
diff --git a/lib/csharp/src/Server/TThreadedServer.cs b/lib/csharp/src/Server/TThreadedServer.cs
new file mode 100644
index 0000000..75206f1
--- /dev/null
+++ b/lib/csharp/src/Server/TThreadedServer.cs
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Thrift.Collections;
+using Thrift.Protocol;
+using Thrift.Transport;
+
+namespace Thrift.Server
+{
+	/// <summary>
+	/// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
+	/// </summary>
+	public class TThreadedServer : TServer
+	{
+		private const int DEFAULT_MAX_THREADS = 100;
+		private volatile bool stop = false;
+		private readonly int maxThreads;
+
+		private Queue<TTransport> clientQueue;
+		private THashSet<Thread> clientThreads;
+		private object clientLock;
+		private Thread workerThread;
+
+		public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
+			: this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MAX_THREADS, DefaultLogDelegate)
+		{
+		}
+
+		public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
+			: this(processor, serverTransport,
+				 new TTransportFactory(), new TTransportFactory(),
+				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
+				 DEFAULT_MAX_THREADS, logDelegate)
+		{
+		}
+
+
+		public TThreadedServer(TProcessor processor,
+								 TServerTransport serverTransport,
+								 TTransportFactory transportFactory,
+								 TProtocolFactory protocolFactory)
+			: this(processor, serverTransport,
+				 transportFactory, transportFactory,
+				 protocolFactory, protocolFactory,
+				 DEFAULT_MAX_THREADS, DefaultLogDelegate)
+		{
+		}
+
+		public TThreadedServer(TProcessor processor,
+								 TServerTransport serverTransport,
+								 TTransportFactory inputTransportFactory,
+								 TTransportFactory outputTransportFactory,
+								 TProtocolFactory inputProtocolFactory,
+								 TProtocolFactory outputProtocolFactory,
+								 int maxThreads, LogDelegate logDel)
+			: base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
+				  inputProtocolFactory, outputProtocolFactory, logDel)
+		{
+			this.maxThreads = maxThreads;
+			clientQueue = new Queue<TTransport>();
+			clientLock = new object();
+			clientThreads = new THashSet<Thread>();
+		}
+
+		/// <summary>
+		/// Use new Thread for each new client connection. block until numConnections < maxTHreads
+		/// </summary>
+		public override void Serve()
+		{
+			try
+			{
+				//start worker thread
+				workerThread = new Thread(new ThreadStart(Execute));
+				workerThread.Start();
+				serverTransport.Listen();
+			}
+			catch (TTransportException ttx)
+			{
+				logDelegate("Error, could not listen on ServerTransport: " + ttx);
+				return;
+			}
+
+			while (!stop)
+			{
+				int failureCount = 0;
+				try
+				{
+					TTransport client = serverTransport.Accept();
+					lock (clientLock)
+					{
+						clientQueue.Enqueue(client);
+						Monitor.Pulse(clientLock);
+					}
+				}
+				catch (TTransportException ttx)
+				{
+					if (stop)
+					{
+						logDelegate("TThreadPoolServer was shutting down, caught " + ttx);
+					}
+					else
+					{
+						++failureCount;
+						logDelegate(ttx.ToString());
+					}
+
+				}
+			}
+
+			if (stop)
+			{
+				try
+				{
+					serverTransport.Close();
+				}
+				catch (TTransportException ttx)
+				{
+					logDelegate("TServeTransport failed on close: " + ttx.Message);
+				}
+				stop = false;
+			}
+		}
+
+		/// <summary>
+		/// Loops on processing a client forever
+		/// threadContext will be a TTransport instance
+		/// </summary>
+		/// <param name="threadContext"></param>
+		private void Execute()
+		{
+			while (!stop)
+			{
+				TTransport client;
+				Thread t;
+				lock (clientLock)
+				{
+					//don't dequeue if too many connections
+					while (clientThreads.Count >= maxThreads)
+					{
+						Monitor.Wait(clientLock);
+					}
+
+					while (clientQueue.Count == 0)
+					{
+						Monitor.Wait(clientLock);
+					}
+
+					client = clientQueue.Dequeue();
+					t = new Thread(new ParameterizedThreadStart(ClientWorker));
+					clientThreads.Add(t);
+				}
+				//start processing requests from client on new thread
+				t.Start(client);
+			}
+		}
+
+		private void ClientWorker(Object context)
+		{
+			TTransport client = (TTransport)context;
+			TTransport inputTransport = null;
+			TTransport outputTransport = null;
+			TProtocol inputProtocol = null;
+			TProtocol outputProtocol = null;
+			try
+			{
+				inputTransport = inputTransportFactory.GetTransport(client);
+				outputTransport = outputTransportFactory.GetTransport(client);
+				inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
+				outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
+				while (processor.Process(inputProtocol, outputProtocol))
+				{
+					//keep processing requests until client disconnects
+				}
+			}
+			catch (TTransportException)
+			{
+			}
+			catch (Exception x)
+			{
+				logDelegate("Error: " + x);
+			}
+
+			if (inputTransport != null)
+			{
+				inputTransport.Close();
+			}
+			if (outputTransport != null)
+			{
+				outputTransport.Close();
+			}
+
+			lock (clientLock)
+			{
+				clientThreads.Remove(Thread.CurrentThread);
+				Monitor.Pulse(clientLock);
+			}
+			return;
+		}
+
+		public override void Stop()
+		{
+			stop = true;
+			serverTransport.Close();
+			//clean up all the threads myself
+			workerThread.Abort();
+			foreach (Thread t in clientThreads)
+			{
+				t.Abort();
+			}
+		}
+	}
+}