|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements. See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership. The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License. You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied. See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  | import logging | 
|  | import zmq | 
|  | import thrift.server.TServer | 
|  | import thrift.transport.TTransport | 
|  |  | 
|  | class TZmqServer(thrift.server.TServer.TServer): | 
|  | def __init__(self, processor, ctx, endpoint, sock_type): | 
|  | thrift.server.TServer.TServer.__init__(self, processor, None) | 
|  | self.zmq_type = sock_type | 
|  | self.socket = ctx.socket(sock_type) | 
|  | self.socket.bind(endpoint) | 
|  |  | 
|  | def serveOne(self): | 
|  | msg = self.socket.recv() | 
|  | itrans = thrift.transport.TTransport.TMemoryBuffer(msg) | 
|  | otrans = thrift.transport.TTransport.TMemoryBuffer() | 
|  | iprot = self.inputProtocolFactory.getProtocol(itrans) | 
|  | oprot = self.outputProtocolFactory.getProtocol(otrans) | 
|  |  | 
|  | try: | 
|  | self.processor.process(iprot, oprot) | 
|  | except Exception: | 
|  | logging.exception("Exception while processing request") | 
|  | # Fall through and send back a response, even if empty or incomplete. | 
|  |  | 
|  | if self.zmq_type == zmq.REP: | 
|  | msg = otrans.getvalue() | 
|  | self.socket.send(msg) | 
|  |  | 
|  | def serve(self): | 
|  | while True: | 
|  | self.serveOne() | 
|  |  | 
|  |  | 
|  | class TZmqMultiServer(object): | 
|  | def __init__(self): | 
|  | self.servers = [] | 
|  |  | 
|  | def serveOne(self, timeout = -1): | 
|  | self._serveActive(self._setupPoll(), timeout) | 
|  |  | 
|  | def serveForever(self): | 
|  | poll_info = self._setupPoll() | 
|  | while True: | 
|  | self._serveActive(poll_info, -1) | 
|  |  | 
|  | def _setupPoll(self): | 
|  | server_map = {} | 
|  | poller = zmq.Poller() | 
|  | for server in self.servers: | 
|  | server_map[server.socket] = server | 
|  | poller.register(server.socket, zmq.POLLIN) | 
|  | return (server_map, poller) | 
|  |  | 
|  | def _serveActive(self, poll_info, timeout): | 
|  | (server_map, poller) = poll_info | 
|  | ready = dict(poller.poll()) | 
|  | for sock, state in ready.items(): | 
|  | assert (state & zmq.POLLIN) != 0 | 
|  | server_map[sock].serveOne() |