THRIFT-812 Demo of Thrift over ZeroMQ (C#)
Patch: Nils Huegelmann 


git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1095179 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/contrib/zeromq/csharp/TZmqClient.cs b/contrib/zeromq/csharp/TZmqClient.cs
new file mode 100644
index 0000000..c792882
--- /dev/null
+++ b/contrib/zeromq/csharp/TZmqClient.cs
@@ -0,0 +1,78 @@
+using System;
+using ZMQ;
+using System.IO;
+using Thrift.Transport;
+
+namespace ZmqClient
+{
+	public class TZmqClient : TTransport
+	{
+		Socket _sock;
+		String _endpoint;
+		MemoryStream _wbuf = new MemoryStream ();
+		MemoryStream _rbuf = new MemoryStream ();
+
+		void debug (string msg)
+		{
+			//Uncomment to enable debug
+//			Console.WriteLine (msg);
+		}
+
+		public TZmqClient (Context ctx, String endpoint, SocketType sockType)
+		{
+			_sock = ctx.Socket (sockType);
+			_endpoint = endpoint;
+		}
+
+		public override void Open ()
+		{
+			_sock.Connect (_endpoint);
+		}
+		
+		public override void Close ()
+		{
+			throw new NotImplementedException ();
+		}
+
+		public override bool IsOpen {
+			get {
+				throw new NotImplementedException ();
+			}
+		}
+
+		public override int Read (byte[] buf, int off, int len)
+		{
+			debug ("Client_Read");
+			if (off != 0 || len != buf.Length)
+				throw new NotImplementedException ();
+
+			if (_rbuf.Length == 0) {
+				//Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift reponse
+				debug ("Client_Read Filling buffer..");
+				byte[] tmpBuf = _sock.Recv ();
+				debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
+				_rbuf.Write (tmpBuf, 0, tmpBuf.Length);
+				_rbuf.Position = 0;	//For reading
+			}
+			int ret = _rbuf.Read (buf, 0, len);
+			if (_rbuf.Length == _rbuf.Position)	//Finished reading
+				_rbuf.SetLength (0);
+			debug (string.Format ("Client_Read return {0}b, remaining  {1}b", ret, _rbuf.Length - _rbuf.Position));
+			return ret;
+		}
+
+		public override void Write (byte[] buf, int off, int len)
+		{
+			debug ("Client_Write");
+			_wbuf.Write (buf, off, len);
+		}
+
+		public override void Flush ()
+		{
+			debug ("Client_Flush");
+			_sock.Send (_wbuf.GetBuffer ());
+			_wbuf = new MemoryStream ();
+		}
+	}
+}
+