blob: 733a3d8100931835b451901be3befa0b2f803bca [file] [log] [blame]
Roger Meier86e89862012-02-10 19:53:20 +00001
Roger Meier19a99152012-02-11 19:09:30 +00002#include "TQTcpServer.h"
3#include "TQIODeviceTransport.h"
Roger Meier86e89862012-02-10 19:53:20 +00004
5#include <QTcpSocket>
6
Roger Meier19a99152012-02-11 19:09:30 +00007#include <tr1/functional>
8
9#include <protocol/TProtocol.h>
10#include <async/TAsyncProcessor.h>
Roger Meier86e89862012-02-10 19:53:20 +000011
12using boost::shared_ptr;
13using apache::thrift::protocol::TProtocol;
14using apache::thrift::protocol::TProtocolFactory;
15using apache::thrift::transport::TTransport;
16using apache::thrift::transport::TTransportException;
17using apache::thrift::transport::TQIODeviceTransport;
18using std::tr1::function;
19using std::tr1::bind;
20
21QT_USE_NAMESPACE
22
23namespace apache { namespace thrift { namespace async {
24
25struct TQTcpServer::ConnectionContext {
26 shared_ptr<QTcpSocket> connection_;
27 shared_ptr<TTransport> transport_;
28 shared_ptr<TProtocol> iprot_;
29 shared_ptr<TProtocol> oprot_;
30
31 explicit ConnectionContext(shared_ptr<QTcpSocket> connection,
32 shared_ptr<TTransport> transport,
33 shared_ptr<TProtocol> iprot,
34 shared_ptr<TProtocol> oprot)
35 : connection_(connection)
36 , transport_(transport)
37 , iprot_(iprot)
38 , oprot_(oprot)
39 {}
40};
41
42TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server,
43 shared_ptr<TAsyncProcessor> processor,
44 shared_ptr<TProtocolFactory> pfact,
45 QObject* parent)
46 : QObject(parent)
47 , server_(server)
48 , processor_(processor)
49 , pfact_(pfact)
50{
51 connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming()));
52}
53
54TQTcpServer::~TQTcpServer()
55{
56}
57
58void TQTcpServer::processIncoming()
59{
60 while (server_->hasPendingConnections()) {
61 // take ownership of the QTcpSocket; technically it could be deleted
62 // when the QTcpServer is destroyed, but any real app should delete this
63 // class before deleting the QTcpServer that we are using
64 shared_ptr<QTcpSocket> connection(server_->nextPendingConnection());
65
66 shared_ptr<TTransport> transport;
67 shared_ptr<TProtocol> iprot;
68 shared_ptr<TProtocol> oprot;
69
70 try {
71 transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection));
72 iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
73 oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport));
74 } catch(...) {
75 qWarning("[TQTcpServer] Failed to initialize transports/protocols");
76 continue;
77 }
78
79 ctxMap_[connection.get()] =
80 shared_ptr<ConnectionContext>(
81 new ConnectionContext(connection, transport, iprot, oprot));
82
83 connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode()));
84
85 // need to use QueuedConnection since we will be deleting the socket in the slot
86 connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed()),
87 Qt::QueuedConnection);
88 }
89}
90
91void TQTcpServer::beginDecode()
92{
93 QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
94 Q_ASSERT(connection);
95
Roger Meier19a99152012-02-11 19:09:30 +000096 if (ctxMap_.find(connection) == ctxMap_.end()) {
Roger Meier86e89862012-02-10 19:53:20 +000097 qWarning("[TQTcpServer] Got data on an unknown QTcpSocket");
98 return;
99 }
100
101 shared_ptr<ConnectionContext> ctx = ctxMap_[connection];
102
103 try {
104 processor_->process(
105 bind(&TQTcpServer::finish, this,
106 ctx, std::tr1::placeholders::_1),
107 ctx->iprot_, ctx->oprot_);
108 } catch(const TTransportException& ex) {
109 qWarning("[TQTcpServer] TTransportException during processing: '%s'",
110 ex.what());
111 ctxMap_.erase(connection);
112 } catch(...) {
113 qWarning("[TQTcpServer] Unknown processor exception");
114 ctxMap_.erase(connection);
115 }
116}
117
118void TQTcpServer::socketClosed()
119{
120 QTcpSocket* connection(qobject_cast<QTcpSocket*>(sender()));
121 Q_ASSERT(connection);
122
Roger Meier19a99152012-02-11 19:09:30 +0000123 if (ctxMap_.find(connection) == ctxMap_.end()) {
Roger Meier86e89862012-02-10 19:53:20 +0000124 qWarning("[TQTcpServer] Unknown QTcpSocket closed");
125 return;
126 }
127
128 ctxMap_.erase(connection);
129}
130
131void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy)
132{
Roger Meier19a99152012-02-11 19:09:30 +0000133 if (!healthy) {
Roger Meier86e89862012-02-10 19:53:20 +0000134 qWarning("[TQTcpServer] Processor failed to process data successfully");
135 ctxMap_.erase(ctx->connection_.get());
136 }
137}
138
139}}} // apache::thrift::async