| # | 
 | # Licensed to the Apache Software Foundation (ASF) under one | 
 | # or more contributor license agreements. See the NOTICE file | 
 | # distributed with this work for additional information | 
 | # regarding copyright ownership. The ASF licenses this file | 
 | # to you under the Apache License, Version 2.0 (the | 
 | # "License"); you may not use this file except in compliance | 
 | # with the License. You may obtain a copy of the License at | 
 | # | 
 | #   http://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, | 
 | # software distributed under the License is distributed on an | 
 | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | # KIND, either express or implied. See the License for the | 
 | # specific language governing permissions and limitations | 
 | # under the License. | 
 | # | 
 |  | 
 | from cStringIO import StringIO | 
 | import logging | 
 | import socket | 
 | import struct | 
 |  | 
 | from thrift.transport import TTransport | 
 | from thrift.transport.TTransport import TTransportException | 
 |  | 
 | from tornado import gen | 
 | from tornado import iostream | 
 | from tornado import tcpserver | 
 |  | 
 |  | 
 | class TTornadoStreamTransport(TTransport.TTransportBase): | 
 |     """a framed, buffered transport over a Tornado stream""" | 
 |     def __init__(self, host, port, stream=None): | 
 |         self.host = host | 
 |         self.port = port | 
 |         self.is_queuing_reads = False | 
 |         self.read_queue = [] | 
 |         self.__wbuf = StringIO() | 
 |  | 
 |         # servers provide a ready-to-go stream | 
 |         self.stream = stream | 
 |         if self.stream is not None: | 
 |             self._set_close_callback() | 
 |  | 
 |     # not the same number of parameters as TTransportBase.open | 
 |     def open(self, callback): | 
 |         logging.debug('socket connecting') | 
 |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) | 
 |         self.stream = iostream.IOStream(sock) | 
 |  | 
 |         def on_close_in_connect(*_): | 
 |             message = 'could not connect to {}:{}'.format(self.host, self.port) | 
 |             raise TTransportException( | 
 |                 type=TTransportException.NOT_OPEN, | 
 |                 message=message) | 
 |         self.stream.set_close_callback(on_close_in_connect) | 
 |  | 
 |         def finish(*_): | 
 |             self._set_close_callback() | 
 |             callback() | 
 |  | 
 |         self.stream.connect((self.host, self.port), callback=finish) | 
 |  | 
 |     def _set_close_callback(self): | 
 |         def on_close(): | 
 |             raise TTransportException( | 
 |                 type=TTransportException.END_OF_FILE, | 
 |                 message='socket closed') | 
 |         self.stream.set_close_callback(self.close) | 
 |  | 
 |     def close(self): | 
 |         # don't raise if we intend to close | 
 |         self.stream.set_close_callback(None) | 
 |         self.stream.close() | 
 |  | 
 |     def read(self, _): | 
 |         # The generated code for Tornado shouldn't do individual reads -- only | 
 |         # frames at a time | 
 |         assert "you're doing it wrong" is True | 
 |  | 
 |     @gen.engine | 
 |     def readFrame(self, callback): | 
 |         self.read_queue.append(callback) | 
 |         logging.debug('read queue: %s', self.read_queue) | 
 |  | 
 |         if self.is_queuing_reads: | 
 |             # If a read is already in flight, then the while loop below should | 
 |             # pull it from self.read_queue | 
 |             return | 
 |  | 
 |         self.is_queuing_reads = True | 
 |         while self.read_queue: | 
 |             next_callback = self.read_queue.pop() | 
 |             result = yield gen.Task(self._readFrameFromStream) | 
 |             next_callback(result) | 
 |         self.is_queuing_reads = False | 
 |  | 
 |     @gen.engine | 
 |     def _readFrameFromStream(self, callback): | 
 |         logging.debug('_readFrameFromStream') | 
 |         frame_header = yield gen.Task(self.stream.read_bytes, 4) | 
 |         frame_length, = struct.unpack('!i', frame_header) | 
 |         logging.debug('received frame header, frame length = %i', frame_length) | 
 |         frame = yield gen.Task(self.stream.read_bytes, frame_length) | 
 |         logging.debug('received frame payload') | 
 |         callback(frame) | 
 |  | 
 |     def write(self, buf): | 
 |         self.__wbuf.write(buf) | 
 |  | 
 |     def flush(self, callback=None): | 
 |         wout = self.__wbuf.getvalue() | 
 |         wsz = len(wout) | 
 |         # reset wbuf before write/flush to preserve state on underlying failure | 
 |         self.__wbuf = StringIO() | 
 |         # N.B.: Doing this string concatenation is WAY cheaper than making | 
 |         # two separate calls to the underlying socket object. Socket writes in | 
 |         # Python turn out to be REALLY expensive, but it seems to do a pretty | 
 |         # good job of managing string buffer operations without excessive copies | 
 |         buf = struct.pack("!i", wsz) + wout | 
 |  | 
 |         logging.debug('writing frame length = %i', wsz) | 
 |         self.stream.write(buf, callback) | 
 |  | 
 |  | 
 | class TTornadoServer(tcpserver.TCPServer): | 
 |     def __init__(self, processor, iprot_factory, oprot_factory=None, | 
 |                  *args, **kwargs): | 
 |         super(TTornadoServer, self).__init__(*args, **kwargs) | 
 |  | 
 |         self._processor = processor | 
 |         self._iprot_factory = iprot_factory | 
 |         self._oprot_factory = (oprot_factory if oprot_factory is not None | 
 |                                else iprot_factory) | 
 |  | 
 |     def handle_stream(self, stream, address): | 
 |         try: | 
 |             host, port = address | 
 |             trans = TTornadoStreamTransport(host=host, port=port, stream=stream) | 
 |             oprot = self._oprot_factory.getProtocol(trans) | 
 |  | 
 |             def next_pass(): | 
 |                 if not trans.stream.closed(): | 
 |                     self._processor.process(trans, self._iprot_factory, oprot, | 
 |                                             callback=next_pass) | 
 |  | 
 |             next_pass() | 
 |  | 
 |         except Exception: | 
 |             logging.exception('thrift exception in handle_stream') | 
 |             trans.close() |