| # | 
 | # Licensed to the Apache Software Foundation (ASF) under one | 
 | # or more contributor license agreements. See the NOTICE file | 
 | # distributed with this work for additional information | 
 | # regarding copyright ownership. The ASF licenses this file | 
 | # to you under the Apache License, Version 2.0 (the | 
 | # "License"); you may not use this file except in compliance | 
 | # with the License. You may obtain a copy of the License at | 
 | # | 
 | #   http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, | 
 | # software distributed under the License is distributed on an | 
 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | # KIND, either express or implied. See the License for the | 
 | # specific language governing permissions and limitations | 
 | # under the License. | 
 | # | 
 | import zmq | 
 | from cStringIO import StringIO | 
 | from thrift.transport.TTransport import TTransportBase, CReadableTransport | 
 |  | 
 | class TZmqClient(TTransportBase, CReadableTransport): | 
 |   def __init__(self, ctx, endpoint, sock_type): | 
 |     self._sock = ctx.socket(sock_type) | 
 |     self._endpoint = endpoint | 
 |     self._wbuf = StringIO() | 
 |     self._rbuf = StringIO() | 
 |  | 
 |   def open(self): | 
 |     self._sock.connect(self._endpoint) | 
 |  | 
 |   def read(self, size): | 
 |     ret = self._rbuf.read(size) | 
 |     if len(ret) != 0: | 
 |       return ret | 
 |     self._read_message() | 
 |     return self._rbuf.read(size) | 
 |  | 
 |   def _read_message(self): | 
 |     msg = self._sock.recv() | 
 |     self._rbuf = StringIO(msg) | 
 |  | 
 |   def write(self, buf): | 
 |     self._wbuf.write(buf) | 
 |  | 
 |   def flush(self): | 
 |     msg = self._wbuf.getvalue() | 
 |     self._wbuf = StringIO() | 
 |     self._sock.send(msg) | 
 |  | 
 |   # Implement the CReadableTransport interface. | 
 |   @property | 
 |   def cstringio_buf(self): | 
 |     return self._rbuf | 
 |  | 
 |   # NOTE: This will probably not actually work. | 
 |   def cstringio_refill(self, prefix, reqlen): | 
 |     while len(prefix) < reqlen: | 
 |       self.read_message() | 
 |       prefix += self._rbuf.getvalue() | 
 |     self._rbuf = StringIO(prefix) | 
 |     return self._rbuf |