THRIFT-812 Demo of Thrift over ZeroMQ (C#)
Patch: Nils Huegelmann
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1095179 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/contrib/zeromq/csharp/TZmqClient.cs b/contrib/zeromq/csharp/TZmqClient.cs
new file mode 100644
index 0000000..c792882
--- /dev/null
+++ b/contrib/zeromq/csharp/TZmqClient.cs
@@ -0,0 +1,78 @@
+using System;
+using ZMQ;
+using System.IO;
+using Thrift.Transport;
+
+namespace ZmqClient
+{
+ public class TZmqClient : TTransport
+ {
+ Socket _sock;
+ String _endpoint;
+ MemoryStream _wbuf = new MemoryStream ();
+ MemoryStream _rbuf = new MemoryStream ();
+
+ void debug (string msg)
+ {
+ //Uncomment to enable debug
+// Console.WriteLine (msg);
+ }
+
+ public TZmqClient (Context ctx, String endpoint, SocketType sockType)
+ {
+ _sock = ctx.Socket (sockType);
+ _endpoint = endpoint;
+ }
+
+ public override void Open ()
+ {
+ _sock.Connect (_endpoint);
+ }
+
+ public override void Close ()
+ {
+ throw new NotImplementedException ();
+ }
+
+ public override bool IsOpen {
+ get {
+ throw new NotImplementedException ();
+ }
+ }
+
+ public override int Read (byte[] buf, int off, int len)
+ {
+ debug ("Client_Read");
+ if (off != 0 || len != buf.Length)
+ throw new NotImplementedException ();
+
+ if (_rbuf.Length == 0) {
+ //Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift reponse
+ debug ("Client_Read Filling buffer..");
+ byte[] tmpBuf = _sock.Recv ();
+ debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
+ _rbuf.Write (tmpBuf, 0, tmpBuf.Length);
+ _rbuf.Position = 0; //For reading
+ }
+ int ret = _rbuf.Read (buf, 0, len);
+ if (_rbuf.Length == _rbuf.Position) //Finished reading
+ _rbuf.SetLength (0);
+ debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
+ return ret;
+ }
+
+ public override void Write (byte[] buf, int off, int len)
+ {
+ debug ("Client_Write");
+ _wbuf.Write (buf, off, len);
+ }
+
+ public override void Flush ()
+ {
+ debug ("Client_Flush");
+ _sock.Send (_wbuf.GetBuffer ());
+ _wbuf = new MemoryStream ();
+ }
+ }
+}
+