Roger Meier | 86e8986 | 2012-02-10 19:53:20 +0000 | [diff] [blame] | 1 | |
Roger Meier | 19a9915 | 2012-02-11 19:09:30 +0000 | [diff] [blame^] | 2 | #include "TQTcpServer.h" |
| 3 | #include "TQIODeviceTransport.h" |
Roger Meier | 86e8986 | 2012-02-10 19:53:20 +0000 | [diff] [blame] | 4 | |
| 5 | #include <QTcpSocket> |
| 6 | |
Roger Meier | 19a9915 | 2012-02-11 19:09:30 +0000 | [diff] [blame^] | 7 | #include <tr1/functional> |
| 8 | |
| 9 | #include <protocol/TProtocol.h> |
| 10 | #include <async/TAsyncProcessor.h> |
Roger Meier | 86e8986 | 2012-02-10 19:53:20 +0000 | [diff] [blame] | 11 | |
| 12 | using boost::shared_ptr; |
| 13 | using apache::thrift::protocol::TProtocol; |
| 14 | using apache::thrift::protocol::TProtocolFactory; |
| 15 | using apache::thrift::transport::TTransport; |
| 16 | using apache::thrift::transport::TTransportException; |
| 17 | using apache::thrift::transport::TQIODeviceTransport; |
| 18 | using std::tr1::function; |
| 19 | using std::tr1::bind; |
| 20 | |
| 21 | QT_USE_NAMESPACE |
| 22 | |
| 23 | namespace apache { namespace thrift { namespace async { |
| 24 | |
| 25 | struct TQTcpServer::ConnectionContext { |
| 26 | shared_ptr<QTcpSocket> connection_; |
| 27 | shared_ptr<TTransport> transport_; |
| 28 | shared_ptr<TProtocol> iprot_; |
| 29 | shared_ptr<TProtocol> oprot_; |
| 30 | |
| 31 | explicit ConnectionContext(shared_ptr<QTcpSocket> connection, |
| 32 | shared_ptr<TTransport> transport, |
| 33 | shared_ptr<TProtocol> iprot, |
| 34 | shared_ptr<TProtocol> oprot) |
| 35 | : connection_(connection) |
| 36 | , transport_(transport) |
| 37 | , iprot_(iprot) |
| 38 | , oprot_(oprot) |
| 39 | {} |
| 40 | }; |
| 41 | |
| 42 | TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server, |
| 43 | shared_ptr<TAsyncProcessor> processor, |
| 44 | shared_ptr<TProtocolFactory> pfact, |
| 45 | QObject* parent) |
| 46 | : QObject(parent) |
| 47 | , server_(server) |
| 48 | , processor_(processor) |
| 49 | , pfact_(pfact) |
| 50 | { |
| 51 | connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming())); |
| 52 | } |
| 53 | |
| 54 | TQTcpServer::~TQTcpServer() |
| 55 | { |
| 56 | } |
| 57 | |
| 58 | void TQTcpServer::processIncoming() |
| 59 | { |
| 60 | while (server_->hasPendingConnections()) { |
| 61 | // take ownership of the QTcpSocket; technically it could be deleted |
| 62 | // when the QTcpServer is destroyed, but any real app should delete this |
| 63 | // class before deleting the QTcpServer that we are using |
| 64 | shared_ptr<QTcpSocket> connection(server_->nextPendingConnection()); |
| 65 | |
| 66 | shared_ptr<TTransport> transport; |
| 67 | shared_ptr<TProtocol> iprot; |
| 68 | shared_ptr<TProtocol> oprot; |
| 69 | |
| 70 | try { |
| 71 | transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection)); |
| 72 | iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); |
| 73 | oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); |
| 74 | } catch(...) { |
| 75 | qWarning("[TQTcpServer] Failed to initialize transports/protocols"); |
| 76 | continue; |
| 77 | } |
| 78 | |
| 79 | ctxMap_[connection.get()] = |
| 80 | shared_ptr<ConnectionContext>( |
| 81 | new ConnectionContext(connection, transport, iprot, oprot)); |
| 82 | |
| 83 | connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode())); |
| 84 | |
| 85 | // need to use QueuedConnection since we will be deleting the socket in the slot |
| 86 | connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()), |
| 87 | Qt::QueuedConnection); |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | void TQTcpServer::beginDecode() |
| 92 | { |
| 93 | QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender())); |
| 94 | Q_ASSERT(connection); |
| 95 | |
Roger Meier | 19a9915 | 2012-02-11 19:09:30 +0000 | [diff] [blame^] | 96 | if (ctxMap_.find(connection) == ctxMap_.end()) { |
Roger Meier | 86e8986 | 2012-02-10 19:53:20 +0000 | [diff] [blame] | 97 | qWarning("[TQTcpServer] Got data on an unknown QTcpSocket"); |
| 98 | return; |
| 99 | } |
| 100 | |
| 101 | shared_ptr<ConnectionContext> ctx = ctxMap_[connection]; |
| 102 | |
| 103 | try { |
| 104 | processor_->process( |
| 105 | bind(&TQTcpServer::finish, this, |
| 106 | ctx, std::tr1::placeholders::_1), |
| 107 | ctx->iprot_, ctx->oprot_); |
| 108 | } catch(const TTransportException& ex) { |
| 109 | qWarning("[TQTcpServer] TTransportException during processing: '%s'", |
| 110 | ex.what()); |
| 111 | ctxMap_.erase(connection); |
| 112 | } catch(...) { |
| 113 | qWarning("[TQTcpServer] Unknown processor exception"); |
| 114 | ctxMap_.erase(connection); |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | void TQTcpServer::socketClosed() |
| 119 | { |
| 120 | QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender())); |
| 121 | Q_ASSERT(connection); |
| 122 | |
Roger Meier | 19a9915 | 2012-02-11 19:09:30 +0000 | [diff] [blame^] | 123 | if (ctxMap_.find(connection) == ctxMap_.end()) { |
Roger Meier | 86e8986 | 2012-02-10 19:53:20 +0000 | [diff] [blame] | 124 | qWarning("[TQTcpServer] Unknown QTcpSocket closed"); |
| 125 | return; |
| 126 | } |
| 127 | |
| 128 | ctxMap_.erase(connection); |
| 129 | } |
| 130 | |
| 131 | void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) |
| 132 | { |
Roger Meier | 19a9915 | 2012-02-11 19:09:30 +0000 | [diff] [blame^] | 133 | if (!healthy) { |
Roger Meier | 86e8986 | 2012-02-10 19:53:20 +0000 | [diff] [blame] | 134 | qWarning("[TQTcpServer] Processor failed to process data successfully"); |
| 135 | ctxMap_.erase(ctx->connection_.get()); |
| 136 | } |
| 137 | } |
| 138 | |
| 139 | }}} // apache::thrift::async |