| using System; | 
 | using ZMQ; | 
 | using System.IO; | 
 | using Thrift.Transport; | 
 |  | 
 | namespace ZmqClient | 
 | { | 
 | 	public class TZmqClient : TTransport | 
 | 	{ | 
 | 		Socket _sock; | 
 | 		String _endpoint; | 
 | 		MemoryStream _wbuf = new MemoryStream (); | 
 | 		MemoryStream _rbuf = new MemoryStream (); | 
 |  | 
 | 		void debug (string msg) | 
 | 		{ | 
 | 			//Uncomment to enable debug | 
 | //			Console.WriteLine (msg); | 
 | 		} | 
 |  | 
 | 		public TZmqClient (Context ctx, String endpoint, SocketType sockType) | 
 | 		{ | 
 | 			_sock = ctx.Socket (sockType); | 
 | 			_endpoint = endpoint; | 
 | 		} | 
 |  | 
 | 		public override void Open () | 
 | 		{ | 
 | 			_sock.Connect (_endpoint); | 
 | 		} | 
 | 		 | 
 | 		public override void Close () | 
 | 		{ | 
 | 			throw new NotImplementedException (); | 
 | 		} | 
 |  | 
 | 		public override bool IsOpen { | 
 | 			get { | 
 | 				throw new NotImplementedException (); | 
 | 			} | 
 | 		} | 
 |  | 
 | 		public override int Read (byte[] buf, int off, int len) | 
 | 		{ | 
 | 			debug ("Client_Read"); | 
 | 			if (off != 0 || len != buf.Length) | 
 | 				throw new NotImplementedException (); | 
 |  | 
 | 			if (_rbuf.Length == 0) { | 
 | 				//Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift reponse | 
 | 				debug ("Client_Read Filling buffer.."); | 
 | 				byte[] tmpBuf = _sock.Recv (); | 
 | 				debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length)); | 
 | 				_rbuf.Write (tmpBuf, 0, tmpBuf.Length); | 
 | 				_rbuf.Position = 0;	//For reading | 
 | 			} | 
 | 			int ret = _rbuf.Read (buf, 0, len); | 
 | 			if (_rbuf.Length == _rbuf.Position)	//Finished reading | 
 | 				_rbuf.SetLength (0); | 
 | 			debug (string.Format ("Client_Read return {0}b, remaining  {1}b", ret, _rbuf.Length - _rbuf.Position)); | 
 | 			return ret; | 
 | 		} | 
 |  | 
 | 		public override void Write (byte[] buf, int off, int len) | 
 | 		{ | 
 | 			debug ("Client_Write"); | 
 | 			_wbuf.Write (buf, off, len); | 
 | 		} | 
 |  | 
 | 		public override void Flush () | 
 | 		{ | 
 | 			debug ("Client_Flush"); | 
 | 			_sock.Send (_wbuf.GetBuffer ()); | 
 | 			_wbuf = new MemoryStream (); | 
 | 		} | 
 | 	} | 
 | } | 
 |  |