blob: 21fddad0f14f447d3294cf704266bc87c4fec9e3 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier3781c242011-12-11 20:07:21 +000020#define __STDC_FORMAT_MACROS
21
Roger Meier2fa9c312011-09-05 19:15:53 +000022#ifdef HAVE_CONFIG_H
23#include <config.h>
24#endif
25
Mark Slee2f6404d2006-10-10 01:37:40 +000026#include "TNonblockingServer.h"
Roger Meier49ff8b12012-04-13 09:12:31 +000027#include <thrift/concurrency/Exception.h>
28#include <thrift/transport/TSocket.h>
29#include <thrift/concurrency/PlatformThreadFactory.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000030
Mark Sleee02385b2007-06-09 01:21:16 +000031#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000032
33#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000034#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000035#endif
36
37#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000038#include <netinet/in.h>
39#include <netinet/tcp.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000040#endif
41
42#ifdef HAVE_ARPA_INET_H
Bryan Duxbury76c43682011-08-24 21:26:48 +000043#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000044#endif
45
46#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000047#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000048#endif
49
Roger Meier2fa9c312011-09-05 19:15:53 +000050#ifdef HAVE_FCNTL_H
Mark Slee2f6404d2006-10-10 01:37:40 +000051#include <fcntl.h>
Roger Meier2fa9c312011-09-05 19:15:53 +000052#endif
53
Mark Slee2f6404d2006-10-10 01:37:40 +000054#include <errno.h>
55#include <assert.h>
Roger Meier12d70532011-12-14 23:35:28 +000056
57#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +000058#include <sched.h>
Roger Meier12d70532011-12-14 23:35:28 +000059#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000060
David Reiss9b903442009-10-21 05:51:28 +000061#ifndef AF_LOCAL
62#define AF_LOCAL AF_UNIX
63#endif
64
Roger Meier12d70532011-12-14 23:35:28 +000065#ifdef _MSC_VER
66#define PRIu32 "I32u"
67#endif
68
T Jake Lucianib5e62212009-01-31 22:36:20 +000069namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000070
T Jake Lucianib5e62212009-01-31 22:36:20 +000071using namespace apache::thrift::protocol;
72using namespace apache::thrift::transport;
73using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000074using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000075using apache::thrift::transport::TSocket;
76using apache::thrift::transport::TTransportException;
Jake Farrellb0d95602011-12-06 01:17:26 +000077using boost::shared_ptr;
Mark Sleee02385b2007-06-09 01:21:16 +000078
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000079/// Three states for sockets: recv frame size, recv data, and send mode
80enum TSocketState {
81 SOCKET_RECV_FRAMING,
82 SOCKET_RECV,
83 SOCKET_SEND
84};
85
86/**
87 * Five states for the nonblocking server:
88 * 1) initialize
89 * 2) read 4 byte frame size
90 * 3) read frame of data
91 * 4) send back data (if any)
92 * 5) force immediate connection close
93 */
94enum TAppState {
95 APP_INIT,
96 APP_READ_FRAME_SIZE,
97 APP_READ_REQUEST,
98 APP_WAIT_TASK,
99 APP_SEND_RESULT,
100 APP_CLOSE_CONNECTION
101};
102
103/**
104 * Represents a connection that is handled via libevent. This connection
105 * essentially encapsulates a socket that has some associated libevent state.
106 */
107class TNonblockingServer::TConnection {
108 private:
Jake Farrellb0d95602011-12-06 01:17:26 +0000109 /// Server IO Thread handling this connection
110 TNonblockingIOThread* ioThread_;
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000111
112 /// Server handle
113 TNonblockingServer* server_;
114
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000115 /// TProcessor
116 boost::shared_ptr<TProcessor> processor_;
117
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000118 /// Object wrapping network socket
119 boost::shared_ptr<TSocket> tSocket_;
120
121 /// Libevent object
122 struct event event_;
123
124 /// Libevent flags
125 short eventFlags_;
126
127 /// Socket mode
128 TSocketState socketState_;
129
130 /// Application state
131 TAppState appState_;
132
133 /// How much data needed to read
134 uint32_t readWant_;
135
136 /// Where in the read buffer are we
137 uint32_t readBufferPos_;
138
139 /// Read buffer
140 uint8_t* readBuffer_;
141
142 /// Read buffer size
143 uint32_t readBufferSize_;
144
145 /// Write buffer
146 uint8_t* writeBuffer_;
147
148 /// Write buffer size
149 uint32_t writeBufferSize_;
150
151 /// How far through writing are we?
152 uint32_t writeBufferPos_;
153
154 /// Largest size of write buffer seen since buffer was constructed
155 size_t largestWriteBufferSize_;
156
157 /// Count of the number of calls for use with getResizeBufferEveryN().
158 int32_t callsForResize_;
159
160 /// Task handle
161 int taskHandle_;
162
163 /// Task event
164 struct event taskEvent_;
165
166 /// Transport to read from
167 boost::shared_ptr<TMemoryBuffer> inputTransport_;
168
169 /// Transport that processor writes to
170 boost::shared_ptr<TMemoryBuffer> outputTransport_;
171
172 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
173 boost::shared_ptr<TTransport> factoryInputTransport_;
174 boost::shared_ptr<TTransport> factoryOutputTransport_;
175
176 /// Protocol decoder
177 boost::shared_ptr<TProtocol> inputProtocol_;
178
179 /// Protocol encoder
180 boost::shared_ptr<TProtocol> outputProtocol_;
181
182 /// Server event handler, if any
183 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
184
185 /// Thrift call context, if any
186 void *connectionContext_;
187
188 /// Go into read mode
189 void setRead() {
190 setFlags(EV_READ | EV_PERSIST);
191 }
192
193 /// Go into write mode
194 void setWrite() {
195 setFlags(EV_WRITE | EV_PERSIST);
196 }
197
198 /// Set socket idle
199 void setIdle() {
200 setFlags(0);
201 }
202
203 /**
204 * Set event flags for this connection.
205 *
206 * @param eventFlags flags we pass to libevent for the connection.
207 */
208 void setFlags(short eventFlags);
209
210 /**
211 * Libevent handler called (via our static wrapper) when the connection
212 * socket had something happen. Rather than use the flags libevent passed,
213 * we use the connection state to determine whether we need to read or
214 * write the socket.
215 */
216 void workSocket();
217
218 /// Close this connection and free or reset its resources.
219 void close();
220
221 public:
222
223 class Task;
224
225 /// Constructor
Jake Farrellb0d95602011-12-06 01:17:26 +0000226 TConnection(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000227 const sockaddr* addr, socklen_t addrLen) {
228 readBuffer_ = NULL;
229 readBufferSize_ = 0;
230
Jake Farrellb0d95602011-12-06 01:17:26 +0000231 ioThread_ = ioThread;
232 server_ = ioThread->getServer();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000233
Jake Farrellb0d95602011-12-06 01:17:26 +0000234 // Allocate input and output transports these only need to be allocated
235 // once per TConnection (they don't need to be reallocated on init() call)
236 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
237 outputTransport_.reset(new TMemoryBuffer(
238 server_->getWriteBufferDefaultSize()));
239 tSocket_.reset(new TSocket());
240 init(socket, ioThread, addr, addrLen);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000241 }
242
243 ~TConnection() {
244 std::free(readBuffer_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000245 }
246
247 /**
248 * Check buffers against any size limits and shrink it if exceeded.
249 *
250 * @param readLimit we reduce read buffer size to this (if nonzero).
251 * @param writeLimit if nonzero and write buffer is larger, replace it.
252 */
253 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
254
255 /// Initialize
Jake Farrellb0d95602011-12-06 01:17:26 +0000256 void init(int socket, TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000257 const sockaddr* addr, socklen_t addrLen);
258
259 /**
260 * This is called when the application transitions from one state into
261 * another. This means that it has finished writing the data that it needed
262 * to, or finished receiving the data that it needed to.
263 */
264 void transition();
265
266 /**
267 * C-callable event handler for connection events. Provides a callback
268 * that libevent can understand which invokes connection_->workSocket().
269 *
270 * @param fd the descriptor the event occurred on.
271 * @param which the flags associated with the event.
272 * @param v void* callback arg where we placed TConnection's "this".
273 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000274 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000275 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
276 ((TConnection*)v)->workSocket();
277 }
278
279 /**
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000280 * Notification to server that processing has ended on this request.
281 * Can be called either when processing is completed or when a waiting
282 * task has been preemptively terminated (on overload).
283 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000284 * Don't call this from the IO thread itself.
285 *
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000286 * @return true if successful, false if unable to notify (check errno).
287 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000288 bool notifyIOThread() {
289 return ioThread_->notify(this);
290 }
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000291
Jake Farrellb0d95602011-12-06 01:17:26 +0000292 /*
293 * Returns the number of this connection's currently assigned IO
294 * thread.
295 */
296 int getIOThreadNumber() const {
297 return ioThread_->getThreadNumber();
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000298 }
299
300 /// Force connection shutdown for this connection.
301 void forceClose() {
302 appState_ = APP_CLOSE_CONNECTION;
Jake Farrellb0d95602011-12-06 01:17:26 +0000303 if (!notifyIOThread()) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000304 throw TException("TConnection::forceClose: failed write on notify pipe");
305 }
306 }
307
308 /// return the server this connection was initialized for.
Jake Farrellb0d95602011-12-06 01:17:26 +0000309 TNonblockingServer* getServer() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000310 return server_;
311 }
312
313 /// get state of connection.
Jake Farrellb0d95602011-12-06 01:17:26 +0000314 TAppState getState() const {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000315 return appState_;
316 }
317
318 /// return the TSocket transport wrapping this network connection
319 boost::shared_ptr<TSocket> getTSocket() const {
320 return tSocket_;
321 }
322
323 /// return the server event handler if any
324 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
325 return serverEventHandler_;
326 }
327
328 /// return the Thrift connection context if any
329 void* getConnectionContext() {
330 return connectionContext_;
331 }
332
333};
334
335class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000336 public:
337 Task(boost::shared_ptr<TProcessor> processor,
338 boost::shared_ptr<TProtocol> input,
339 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000340 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000341 processor_(processor),
342 input_(input),
343 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000344 connection_(connection),
345 serverEventHandler_(connection_->getServerEventHandler()),
346 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000347
348 void run() {
349 try {
David Reiss105961d2010-10-06 17:10:17 +0000350 for (;;) {
351 if (serverEventHandler_ != NULL) {
352 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
353 }
354 if (!processor_->process(input_, output_, connectionContext_) ||
355 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000356 break;
357 }
358 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000359 } catch (const TTransportException& ttx) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000360 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
Bryan Duxbury1e987582011-08-25 17:33:03 +0000361 } catch (const bad_alloc&) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000362 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
Henrique Mendonca962b3532012-09-20 13:19:55 +0000363 exit(1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000364 } catch (const std::exception& x) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000365 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
Bryan Duxbury1e987582011-08-25 17:33:03 +0000366 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000367 } catch (...) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000368 GlobalOutput.printf(
369 "TNonblockingServer: unknown exception while processing.");
Mark Sleee02385b2007-06-09 01:21:16 +0000370 }
Mark Slee79b16942007-11-26 19:05:29 +0000371
David Reiss01fe1532010-03-09 05:19:25 +0000372 // Signal completion back to the libevent thread via a pipe
Jake Farrellb0d95602011-12-06 01:17:26 +0000373 if (!connection_->notifyIOThread()) {
David Reiss01fe1532010-03-09 05:19:25 +0000374 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000375 }
David Reiss01fe1532010-03-09 05:19:25 +0000376 }
377
378 TConnection* getTConnection() {
379 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000380 }
381
382 private:
383 boost::shared_ptr<TProcessor> processor_;
384 boost::shared_ptr<TProtocol> input_;
385 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000386 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000387 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
388 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000389};
Mark Slee5ea15f92007-03-05 22:55:59 +0000390
Jake Farrellb0d95602011-12-06 01:17:26 +0000391void TNonblockingServer::TConnection::init(int socket,
392 TNonblockingIOThread* ioThread,
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000393 const sockaddr* addr,
394 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000395 tSocket_->setSocketFD(socket);
396 tSocket_->setCachedAddress(addr, addrLen);
397
Jake Farrellb0d95602011-12-06 01:17:26 +0000398 ioThread_ = ioThread;
399 server_ = ioThread->getServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000400 appState_ = APP_INIT;
401 eventFlags_ = 0;
402
403 readBufferPos_ = 0;
404 readWant_ = 0;
405
406 writeBuffer_ = NULL;
407 writeBufferSize_ = 0;
408 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000409 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000410
David Reiss89a12942010-10-06 17:10:52 +0000411 socketState_ = SOCKET_RECV_FRAMING;
David Reiss54bec5d2010-10-06 17:10:45 +0000412 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000413
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000414 // get input/transports
Jake Farrellb0d95602011-12-06 01:17:26 +0000415 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
416 inputTransport_);
417 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
418 outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000419
420 // Create protocol
Jake Farrellb0d95602011-12-06 01:17:26 +0000421 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
422 factoryInputTransport_);
423 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
424 factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000425
426 // Set up for any server event handler
427 serverEventHandler_ = server_->getEventHandler();
428 if (serverEventHandler_ != NULL) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000429 connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
430 outputProtocol_);
David Reiss105961d2010-10-06 17:10:17 +0000431 } else {
432 connectionContext_ = NULL;
433 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000434
435 // Get the processor
Jake Farrellb0d95602011-12-06 01:17:26 +0000436 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000437}
438
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000439void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000440 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000441 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000442
443 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000444 case SOCKET_RECV_FRAMING:
445 union {
446 uint8_t buf[sizeof(uint32_t)];
Roger Meier3781c242011-12-11 20:07:21 +0000447 uint32_t size;
David Reiss89a12942010-10-06 17:10:52 +0000448 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000449
David Reiss89a12942010-10-06 17:10:52 +0000450 // if we've already received some bytes we kept them here
451 framing.size = readWant_;
452 // determine size of this frame
453 try {
454 // Read from the socket
455 fetch = tSocket_->read(&framing.buf[readBufferPos_],
456 uint32_t(sizeof(framing.size) - readBufferPos_));
457 if (fetch == 0) {
458 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000459 close();
460 return;
461 }
David Reiss89a12942010-10-06 17:10:52 +0000462 readBufferPos_ += fetch;
463 } catch (TTransportException& te) {
464 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
465 close();
466
467 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000468 }
469
David Reiss89a12942010-10-06 17:10:52 +0000470 if (readBufferPos_ < sizeof(framing.size)) {
471 // more needed before frame size is known -- save what we have so far
472 readWant_ = framing.size;
473 return;
474 }
475
476 readWant_ = ntohl(framing.size);
Roger Meier3781c242011-12-11 20:07:21 +0000477 if (readWant_ > server_->getMaxFrameSize()) {
478 // Don't allow giant frame sizes. This prevents bad clients from
479 // causing us to try and allocate a giant buffer.
480 GlobalOutput.printf("TNonblockingServer: frame size too large "
481 "(%"PRIu32" > %zu) from client %s. remote side not "
482 "using TFramedTransport?",
483 readWant_, server_->getMaxFrameSize(),
484 tSocket_->getSocketInfo().c_str());
David Reiss89a12942010-10-06 17:10:52 +0000485 close();
486 return;
487 }
488 // size known; now get the rest of the frame
489 transition();
490 return;
491
492 case SOCKET_RECV:
493 // It is an error to be in this state if we already have all the data
494 assert(readBufferPos_ < readWant_);
495
David Reiss105961d2010-10-06 17:10:17 +0000496 try {
497 // Read from the socket
498 fetch = readWant_ - readBufferPos_;
499 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
500 }
501 catch (TTransportException& te) {
502 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
503 close();
Mark Slee79b16942007-11-26 19:05:29 +0000504
David Reiss105961d2010-10-06 17:10:17 +0000505 return;
506 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000507
Mark Slee2f6404d2006-10-10 01:37:40 +0000508 if (got > 0) {
509 // Move along in the buffer
510 readBufferPos_ += got;
511
512 // Check that we did not overdo it
513 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000514
Mark Slee2f6404d2006-10-10 01:37:40 +0000515 // We are done reading, move onto the next state
516 if (readBufferPos_ == readWant_) {
517 transition();
518 }
519 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 }
521
522 // Whenever we get down here it means a remote disconnect
523 close();
Mark Slee79b16942007-11-26 19:05:29 +0000524
Mark Slee2f6404d2006-10-10 01:37:40 +0000525 return;
526
527 case SOCKET_SEND:
528 // Should never have position past size
529 assert(writeBufferPos_ <= writeBufferSize_);
530
531 // If there is no data to send, then let us move on
532 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000533 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000534 transition();
535 return;
536 }
537
David Reiss105961d2010-10-06 17:10:17 +0000538 try {
539 left = writeBufferSize_ - writeBufferPos_;
540 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
541 }
542 catch (TTransportException& te) {
543 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000544 close();
545 return;
546 }
547
548 writeBufferPos_ += sent;
549
550 // Did we overdo it?
551 assert(writeBufferPos_ <= writeBufferSize_);
552
Mark Slee79b16942007-11-26 19:05:29 +0000553 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000554 if (writeBufferPos_ == writeBufferSize_) {
555 transition();
556 }
557
558 return;
559
560 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000561 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 assert(0);
563 }
564}
565
566/**
567 * This is called when the application transitions from one state into
568 * another. This means that it has finished writing the data that it needed
569 * to, or finished receiving the data that it needed to.
570 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000571void TNonblockingServer::TConnection::transition() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000572 // ensure this connection is active right now
573 assert(ioThread_);
574 assert(server_);
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000575
Mark Slee2f6404d2006-10-10 01:37:40 +0000576 // Switch upon the state that we are currently in and move to a new state
577 switch (appState_) {
578
579 case APP_READ_REQUEST:
580 // We are done reading the request, package the read buffer into transport
581 // and get back some data from the dispatch function
582 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000583 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000584 // Prepend four bytes of blank space to the buffer so we can
585 // write the frame size there later.
586 outputTransport_->getWritePtr(4);
587 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000588
David Reiss01fe1532010-03-09 05:19:25 +0000589 server_->incrementActiveProcessors();
590
Mark Sleee02385b2007-06-09 01:21:16 +0000591 if (server_->isThreadPoolProcessing()) {
592 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000593
David Reiss01fe1532010-03-09 05:19:25 +0000594 // Create task and dispatch to the thread manager
595 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000596 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000597 inputProtocol_,
598 outputProtocol_,
599 this));
600 // The application is now waiting on the task to finish
601 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000602
David Reisse11f3072008-10-07 21:39:19 +0000603 try {
604 server_->addTask(task);
605 } catch (IllegalStateException & ise) {
606 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000607 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000608 close();
609 }
Mark Slee402ee282007-08-23 01:43:20 +0000610
David Reiss01fe1532010-03-09 05:19:25 +0000611 // Set this connection idle so that libevent doesn't process more
612 // data on it while we're still waiting for the threadmanager to
613 // finish this task
614 setIdle();
615 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000616 } else {
617 try {
Roger Meierae44abc2012-07-18 05:42:51 +0000618 if (serverEventHandler_ != NULL) {
619 serverEventHandler_->processContext(connectionContext_,
620 getTSocket());
621 }
Mark Sleee02385b2007-06-09 01:21:16 +0000622 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000623 processor_->process(inputProtocol_, outputProtocol_,
624 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000625 } catch (const TTransportException &ttx) {
626 GlobalOutput.printf("TNonblockingServer transport error in "
627 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000628 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000629 close();
630 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000631 } catch (const std::exception &x) {
632 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
633 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000634 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000635 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000636 return;
637 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000638 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000639 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000640 close();
641 return;
642 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000643 }
644
Mark Slee402ee282007-08-23 01:43:20 +0000645 // Intentionally fall through here, the call to process has written into
646 // the writeBuffer_
647
Mark Sleee02385b2007-06-09 01:21:16 +0000648 case APP_WAIT_TASK:
649 // We have now finished processing a task and the result has been written
650 // into the outputTransport_, so we grab its contents and place them into
651 // the writeBuffer_ for actual writing by the libevent thread
652
David Reiss01fe1532010-03-09 05:19:25 +0000653 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000654 // Get the result of the operation
655 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
656
657 // If the function call generated return data, then move into the send
658 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000659 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000660 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000661
662 // Move into write state
663 writeBufferPos_ = 0;
664 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000665
David Reissaf787782008-07-03 20:29:34 +0000666 // Put the frame size into the write buffer
667 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
668 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000669
670 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000671 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000672 setWrite();
673
674 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000675 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000676
677 return;
678 }
679
David Reissc51986f2009-03-24 20:01:25 +0000680 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000681 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000682 goto LABEL_APP_INIT;
683
Mark Slee2f6404d2006-10-10 01:37:40 +0000684 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000685 // it's now safe to perform buffer size housekeeping.
686 if (writeBufferSize_ > largestWriteBufferSize_) {
687 largestWriteBufferSize_ = writeBufferSize_;
688 }
689 if (server_->getResizeBufferEveryN() > 0
690 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
691 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
692 server_->getIdleWriteBufferLimit());
693 callsForResize_ = 0;
694 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000695
696 // N.B.: We also intentionally fall through here into the INIT state!
697
Mark Slee92f00fb2006-10-25 01:28:17 +0000698 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000699 case APP_INIT:
700
701 // Clear write buffer variables
702 writeBuffer_ = NULL;
703 writeBufferPos_ = 0;
704 writeBufferSize_ = 0;
705
Mark Slee2f6404d2006-10-10 01:37:40 +0000706 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000707 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000708 appState_ = APP_READ_FRAME_SIZE;
709
David Reiss89a12942010-10-06 17:10:52 +0000710 readBufferPos_ = 0;
711
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 // Register read event
713 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000714
Mark Slee2f6404d2006-10-10 01:37:40 +0000715 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000716 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000717
718 return;
719
720 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000721 // We just read the request length
722 // Double the buffer size until it is big enough
723 if (readWant_ > readBufferSize_) {
724 if (readBufferSize_ == 0) {
725 readBufferSize_ = 1;
726 }
727 uint32_t newSize = readBufferSize_;
728 while (readWant_ > newSize) {
729 newSize *= 2;
730 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000731
David Reiss89a12942010-10-06 17:10:52 +0000732 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
733 if (newBuffer == NULL) {
734 // nothing else to be done...
735 throw std::bad_alloc();
736 }
737 readBuffer_ = newBuffer;
738 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000739 }
740
Mark Slee2f6404d2006-10-10 01:37:40 +0000741 readBufferPos_= 0;
742
743 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000744 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000745 appState_ = APP_READ_REQUEST;
746
747 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000748 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000749
750 return;
751
David Reiss01fe1532010-03-09 05:19:25 +0000752 case APP_CLOSE_CONNECTION:
753 server_->decrementActiveProcessors();
754 close();
755 return;
756
Mark Slee2f6404d2006-10-10 01:37:40 +0000757 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000758 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000759 assert(0);
760 }
761}
762
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000763void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000764 // Catch the do nothing case
765 if (eventFlags_ == eventFlags) {
766 return;
767 }
768
769 // Delete a previously existing event
770 if (eventFlags_ != 0) {
771 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000772 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000773 return;
774 }
775 }
776
777 // Update in memory structure
778 eventFlags_ = eventFlags;
779
Mark Slee402ee282007-08-23 01:43:20 +0000780 // Do not call event_set if there are no flags
781 if (!eventFlags_) {
782 return;
783 }
784
David Reiss01fe1532010-03-09 05:19:25 +0000785 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000786 * event_set:
787 *
788 * Prepares the event structure &event to be used in future calls to
789 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000790 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000791 *
792 * The events can be either EV_READ, EV_WRITE, or both, indicating
793 * that an application can read or write from the file respectively without
794 * blocking.
795 *
Mark Sleee02385b2007-06-09 01:21:16 +0000796 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000797 * the event and the type of event which will be one of: EV_TIMEOUT,
798 * EV_SIGNAL, EV_READ, EV_WRITE.
799 *
800 * The additional flag EV_PERSIST makes an event_add() persistent until
801 * event_del() has been called.
802 *
803 * Once initialized, the &event struct can be used repeatedly with
804 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000805 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000806 * when an ev structure has been added to libevent using event_add() the
807 * structure must persist until the event occurs (assuming EV_PERSIST
808 * is not set) or is removed using event_del(). You may not reuse the same
809 * ev structure for multiple monitored descriptors; each descriptor needs
810 * its own ev.
811 */
David Reiss105961d2010-10-06 17:10:17 +0000812 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
813 TConnection::eventHandler, this);
Jake Farrellb0d95602011-12-06 01:17:26 +0000814 event_base_set(ioThread_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000815
816 // Add the event
817 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000818 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000819 }
820}
821
822/**
823 * Closes a connection
824 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000825void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000826 // Delete the registered libevent
827 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000828 GlobalOutput.perror("TConnection::close() event_del", errno);
829 }
830
831 if (serverEventHandler_ != NULL) {
832 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000833 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000834 ioThread_ = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000835
836 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000837 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000838
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000839 // close any factory produced transports
840 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000841 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000842
Mark Slee2f6404d2006-10-10 01:37:40 +0000843 // Give this object back to the server that owns it
844 server_->returnConnection(this);
845}
846
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000847void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
848 size_t readLimit,
849 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000850 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000851 free(readBuffer_);
852 readBuffer_ = NULL;
853 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000854 }
David Reiss54bec5d2010-10-06 17:10:45 +0000855
856 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
857 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000858 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000859 largestWriteBufferSize_ = 0;
860 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000861}
862
David Reiss8ede8182010-09-02 15:26:28 +0000863TNonblockingServer::~TNonblockingServer() {
864 // TODO: We currently leak any active TConnection objects.
865 // Since we're shutting down and destroying the event_base, the TConnection
866 // objects will never receive any additional callbacks. (And even if they
867 // did, it would be bad, since they keep a pointer around to the server,
868 // which is being destroyed.)
869
870 // Clean up unused TConnection objects in connectionStack_
871 while (!connectionStack_.empty()) {
872 TConnection* connection = connectionStack_.top();
873 connectionStack_.pop();
874 delete connection;
875 }
David Reiss8ede8182010-09-02 15:26:28 +0000876}
877
Mark Slee2f6404d2006-10-10 01:37:40 +0000878/**
879 * Creates a new connection either by reusing an object off the stack or
880 * by allocating a new one entirely
881 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000882TNonblockingServer::TConnection* TNonblockingServer::createConnection(
Jake Farrellb0d95602011-12-06 01:17:26 +0000883 int socket, const sockaddr* addr, socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000884 // Check the stack
Jake Farrellb0d95602011-12-06 01:17:26 +0000885 Guard g(connMutex_);
886
887 // pick an IO thread to handle this connection -- currently round robin
Jake Farrellb0d95602011-12-06 01:17:26 +0000888 assert(nextIOThread_ < ioThreads_.size());
889 int selectedThreadIdx = nextIOThread_;
890 nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
891
892 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
893
894 // Check the connection stack to see if we can re-use
895 TConnection* result = NULL;
Mark Slee2f6404d2006-10-10 01:37:40 +0000896 if (connectionStack_.empty()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000897 result = new TConnection(socket, ioThread, addr, addrLen);
898 ++numTConnections_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000899 } else {
Jake Farrellb0d95602011-12-06 01:17:26 +0000900 result = connectionStack_.top();
Mark Slee2f6404d2006-10-10 01:37:40 +0000901 connectionStack_.pop();
Jake Farrellb0d95602011-12-06 01:17:26 +0000902 result->init(socket, ioThread, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000903 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000904 return result;
Mark Slee2f6404d2006-10-10 01:37:40 +0000905}
906
907/**
908 * Returns a connection to the stack
909 */
910void TNonblockingServer::returnConnection(TConnection* connection) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000911 Guard g(connMutex_);
912
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000913 if (connectionStackLimit_ &&
914 (connectionStack_.size() >= connectionStackLimit_)) {
915 delete connection;
Jake Farrellb0d95602011-12-06 01:17:26 +0000916 --numTConnections_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000917 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000918 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000919 connectionStack_.push(connection);
920 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000921}
922
923/**
David Reissa79e4882008-03-05 07:51:47 +0000924 * Server socket had something happen. We accept all waiting client
925 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000926 */
927void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000928 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000929 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000930 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000931
Mark Slee2f6404d2006-10-10 01:37:40 +0000932 // Server socket accepted a new connection
933 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000934 sockaddr_storage addrStorage;
935 sockaddr* addrp = (sockaddr*)&addrStorage;
936 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000937
Mark Slee2f6404d2006-10-10 01:37:40 +0000938 // Going to accept a new client socket
939 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000940
Mark Slee2f6404d2006-10-10 01:37:40 +0000941 // Accept as many new clients as possible, even though libevent signaled only
942 // one, this helps us to avoid having to go back into the libevent engine so
943 // many times
David Reiss105961d2010-10-06 17:10:17 +0000944 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000945 // If we're overloaded, take action here
946 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
Jake Farrellb0d95602011-12-06 01:17:26 +0000947 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000948 nConnectionsDropped_++;
949 nTotalConnectionsDropped_++;
950 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
Roger Meier35f977c2012-02-28 20:50:13 +0000951 ::close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000952 return;
David Reiss01fe1532010-03-09 05:19:25 +0000953 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
954 if (!drainPendingTask()) {
955 // Nothing left to discard, so we drop connection instead.
Roger Meier35f977c2012-02-28 20:50:13 +0000956 ::close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000957 return;
David Reiss01fe1532010-03-09 05:19:25 +0000958 }
959 }
960 }
Jake Farrellb0d95602011-12-06 01:17:26 +0000961
Mark Slee2f6404d2006-10-10 01:37:40 +0000962 // Explicitly set this socket to NONBLOCK mode
963 int flags;
964 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
965 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000966 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Roger Meier35f977c2012-02-28 20:50:13 +0000967 ::close(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000968 return;
969 }
970
971 // Create a new TConnection for this client socket.
972 TConnection* clientConnection =
Jake Farrellb0d95602011-12-06 01:17:26 +0000973 createConnection(clientSocket, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000974
975 // Fail fast if we could not create a TConnection object
976 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000977 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Roger Meier35f977c2012-02-28 20:50:13 +0000978 ::close(clientSocket);
Mark Slee2f6404d2006-10-10 01:37:40 +0000979 return;
980 }
981
Jake Farrellb0d95602011-12-06 01:17:26 +0000982 /*
983 * Either notify the ioThread that is assigned this connection to
984 * start processing, or if it is us, we'll just ask this
985 * connection to do its initial state change here.
986 *
987 * (We need to avoid writing to our own notification pipe, to
988 * avoid possible deadlocks if the pipe is full.)
989 *
990 * The IO thread #0 is the only one that handles these listen
991 * events, so unless the connection has been assigned to thread #0
992 * we know it's not on our thread.
993 */
994 if (clientConnection->getIOThreadNumber() == 0) {
995 clientConnection->transition();
996 } else {
997 clientConnection->notifyIOThread();
998 }
David Reiss3e7fca42009-09-19 01:59:13 +0000999
1000 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +00001001 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +00001002 }
Mark Slee79b16942007-11-26 19:05:29 +00001003
Jake Farrellb0d95602011-12-06 01:17:26 +00001004
Mark Slee2f6404d2006-10-10 01:37:40 +00001005 // Done looping accept, now we have to make sure the error is due to
1006 // blocking. Any other error is a problem
1007 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +00001008 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +00001009 }
1010}
1011
1012/**
Mark Slee79b16942007-11-26 19:05:29 +00001013 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +00001014 */
Jake Farrellb0d95602011-12-06 01:17:26 +00001015void TNonblockingServer::createAndListenOnSocket() {
Mark Slee79b16942007-11-26 19:05:29 +00001016 int s;
Jake Farrellb0d95602011-12-06 01:17:26 +00001017
Mark Sleefb4b5142007-11-20 01:27:08 +00001018 struct addrinfo hints, *res, *res0;
1019 int error;
Mark Slee79b16942007-11-26 19:05:29 +00001020
Mark Sleefb4b5142007-11-20 01:27:08 +00001021 char port[sizeof("65536") + 1];
1022 memset(&hints, 0, sizeof(hints));
1023 hints.ai_family = PF_UNSPEC;
1024 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +00001025 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +00001026 sprintf(port, "%d", port_);
1027
1028 // Wildcard address
1029 error = getaddrinfo(NULL, port, &hints, &res0);
1030 if (error) {
Roger Meierd8f50f32012-04-11 21:48:56 +00001031 throw TException("TNonblockingServer::serve() getaddrinfo " +
1032 string(gai_strerror(error)));
Mark Sleefb4b5142007-11-20 01:27:08 +00001033 }
1034
1035 // Pick the ipv6 address first since ipv4 addresses can be mapped
1036 // into ipv6 space.
1037 for (res = res0; res; res = res->ai_next) {
1038 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
1039 break;
1040 }
1041
Mark Slee2f6404d2006-10-10 01:37:40 +00001042 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001043 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1044 if (s == -1) {
1045 freeaddrinfo(res0);
1046 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001047 }
1048
David Reiss13aea462008-06-10 22:56:04 +00001049 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001050 if (res->ai_family == AF_INET6) {
1051 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001052 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001053 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1054 }
David Reiss13aea462008-06-10 22:56:04 +00001055 }
1056 #endif // #ifdef IPV6_V6ONLY
1057
1058
Mark Slee79b16942007-11-26 19:05:29 +00001059 int one = 1;
1060
1061 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001062 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001063
Roger Meier30aae0c2011-07-08 12:23:31 +00001064 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Roger Meier35f977c2012-02-28 20:50:13 +00001065 ::close(s);
Mark Slee79b16942007-11-26 19:05:29 +00001066 freeaddrinfo(res0);
Roger Meierd8f50f32012-04-11 21:48:56 +00001067 throw TTransportException(TTransportException::NOT_OPEN,
1068 "TNonblockingServer::serve() bind",
1069 errno);
Mark Slee79b16942007-11-26 19:05:29 +00001070 }
1071
1072 // Done with the addr info
1073 freeaddrinfo(res0);
1074
1075 // Set up this file descriptor for listening
1076 listenSocket(s);
1077}
1078
1079/**
1080 * Takes a socket created by listenSocket() and sets various options on it
1081 * to prepare for use in the server.
1082 */
1083void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001084 // Set socket to nonblocking mode
1085 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001086 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1087 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
Roger Meier35f977c2012-02-28 20:50:13 +00001088 ::close(s);
Mark Slee79b16942007-11-26 19:05:29 +00001089 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001090 }
1091
1092 int one = 1;
1093 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001094
1095 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001096 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001097
1098 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001099 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001100
1101 // Set TCP nodelay if available, MAC OS X Hack
1102 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1103 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001104 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001105 #endif
1106
David Reiss1c20c872010-03-09 05:20:14 +00001107 #ifdef TCP_LOW_MIN_RTO
1108 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001109 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001110 }
1111 #endif
1112
Mark Slee79b16942007-11-26 19:05:29 +00001113 if (listen(s, LISTEN_BACKLOG) == -1) {
Roger Meier35f977c2012-02-28 20:50:13 +00001114 ::close(s);
Mark Slee79b16942007-11-26 19:05:29 +00001115 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001116 }
1117
Mark Slee79b16942007-11-26 19:05:29 +00001118 // Cool, this socket is good to go, set it as the serverSocket_
1119 serverSocket_ = s;
1120}
1121
David Reiss068f4162010-03-09 05:19:45 +00001122void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1123 threadManager_ = threadManager;
1124 if (threadManager != NULL) {
Roger Meier82525772012-11-16 00:38:27 +00001125 threadManager->setExpireCallback(apache::thrift::stdcxx::bind(&TNonblockingServer::expireClose, this, apache::thrift::stdcxx::placeholders::_1));
David Reiss068f4162010-03-09 05:19:45 +00001126 threadPoolProcessing_ = true;
1127 } else {
1128 threadPoolProcessing_ = false;
1129 }
1130}
1131
David Reiss01fe1532010-03-09 05:19:25 +00001132bool TNonblockingServer::serverOverloaded() {
1133 size_t activeConnections = numTConnections_ - connectionStack_.size();
1134 if (numActiveProcessors_ > maxActiveProcessors_ ||
1135 activeConnections > maxConnections_) {
1136 if (!overloaded_) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001137 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
David Reiss01fe1532010-03-09 05:19:25 +00001138 overloaded_ = true;
1139 }
1140 } else {
1141 if (overloaded_ &&
1142 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1143 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001144 GlobalOutput.printf("TNonblockingServer: overload ended; "
1145 "%u dropped (%llu total)",
David Reiss01fe1532010-03-09 05:19:25 +00001146 nConnectionsDropped_, nTotalConnectionsDropped_);
1147 nConnectionsDropped_ = 0;
1148 overloaded_ = false;
1149 }
1150 }
1151
1152 return overloaded_;
1153}
1154
1155bool TNonblockingServer::drainPendingTask() {
1156 if (threadManager_) {
1157 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1158 if (task) {
1159 TConnection* connection =
1160 static_cast<TConnection::Task*>(task.get())->getTConnection();
1161 assert(connection && connection->getServer()
1162 && connection->getState() == APP_WAIT_TASK);
1163 connection->forceClose();
1164 return true;
1165 }
1166 }
1167 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001168}
1169
David Reiss068f4162010-03-09 05:19:45 +00001170void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1171 TConnection* connection =
1172 static_cast<TConnection::Task*>(task.get())->getTConnection();
Jake Farrellb0d95602011-12-06 01:17:26 +00001173 assert(connection && connection->getServer() &&
1174 connection->getState() == APP_WAIT_TASK);
David Reiss068f4162010-03-09 05:19:45 +00001175 connection->forceClose();
1176}
1177
Jake Farrellb0d95602011-12-06 01:17:26 +00001178void TNonblockingServer::stop() {
1179 // Breaks the event loop in all threads so that they end ASAP.
Roger Meierd0cdecf2011-12-08 19:34:01 +00001180 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001181 ioThreads_[i]->stop();
1182 }
1183}
1184
Mark Slee79b16942007-11-26 19:05:29 +00001185/**
1186 * Main workhorse function, starts up the server listening on a port and
1187 * loops over the libevent handler.
1188 */
1189void TNonblockingServer::serve() {
Jake Farrellb0d95602011-12-06 01:17:26 +00001190 // init listen socket
1191 createAndListenOnSocket();
Mark Slee79b16942007-11-26 19:05:29 +00001192
Jake Farrellb0d95602011-12-06 01:17:26 +00001193 // set up the IO threads
1194 assert(ioThreads_.empty());
1195 if (!numIOThreads_) {
1196 numIOThreads_ = DEFAULT_IO_THREADS;
David Reiss01fe1532010-03-09 05:19:25 +00001197 }
1198
Roger Meierd0cdecf2011-12-08 19:34:01 +00001199 for (uint32_t id = 0; id < numIOThreads_; ++id) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001200 // the first IO thread also does the listening on server socket
1201 int listenFd = (id == 0 ? serverSocket_ : -1);
Mark Slee2f6404d2006-10-10 01:37:40 +00001202
Jake Farrellb0d95602011-12-06 01:17:26 +00001203 shared_ptr<TNonblockingIOThread> thread(
1204 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1205 ioThreads_.push_back(thread);
1206 }
1207
1208 // Notify handler of the preServe event
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001209 if (eventHandler_ != NULL) {
1210 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001211 }
1212
Jake Farrellb0d95602011-12-06 01:17:26 +00001213 // Start all of our helper IO threads. Note that the threads run forever,
1214 // only terminating if stop() is called.
1215 assert(ioThreads_.size() == numIOThreads_);
1216 assert(ioThreads_.size() > 0);
1217
1218 GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
1219 port_, ioThreads_.size());
1220
1221 // Launch all the secondary IO threads in separate threads
1222 if (ioThreads_.size() > 1) {
Roger Meier12d70532011-12-14 23:35:28 +00001223 ioThreadFactory_.reset(new PlatformThreadFactory(
1224#ifndef USE_BOOST_THREAD
1225 PlatformThreadFactory::OTHER, // scheduler
1226 PlatformThreadFactory::NORMAL, // priority
Jake Farrellb0d95602011-12-06 01:17:26 +00001227 1, // stack size (MB)
Roger Meier12d70532011-12-14 23:35:28 +00001228#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001229 false // detached
1230 ));
1231
1232 assert(ioThreadFactory_.get());
1233
1234 // intentionally starting at thread 1, not 0
Roger Meierd0cdecf2011-12-08 19:34:01 +00001235 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001236 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1237 ioThreads_[i]->setThread(thread);
1238 thread->start();
1239 }
1240 }
1241
1242 // Run the primary (listener) IO thread loop in our main thread; this will
1243 // only return when the server is shutting down.
1244 ioThreads_[0]->run();
1245
1246 // Ensure all threads are finished before exiting serve()
Roger Meierd0cdecf2011-12-08 19:34:01 +00001247 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001248 ioThreads_[i]->join();
1249 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1250 }
Mark Slee2f6404d2006-10-10 01:37:40 +00001251}
1252
Jake Farrellb0d95602011-12-06 01:17:26 +00001253TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1254 int number,
1255 int listenSocket,
1256 bool useHighPriority)
1257 : server_(server)
1258 , number_(number)
1259 , listenSocket_(listenSocket)
1260 , useHighPriority_(useHighPriority)
1261 , eventBase_(NULL) {
1262 notificationPipeFDs_[0] = -1;
1263 notificationPipeFDs_[1] = -1;
1264}
1265
1266TNonblockingIOThread::~TNonblockingIOThread() {
1267 // make sure our associated thread is fully finished
1268 join();
1269
1270 if (eventBase_) {
1271 event_base_free(eventBase_);
Bryan Duxbury76c43682011-08-24 21:26:48 +00001272 }
1273
Jake Farrellb0d95602011-12-06 01:17:26 +00001274 if (listenSocket_ >= 0) {
Roger Meier35f977c2012-02-28 20:50:13 +00001275 if (0 != ::close(listenSocket_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001276 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
1277 errno);
1278 }
Roger Meier12d70532011-12-14 23:35:28 +00001279 listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
Jake Farrellb0d95602011-12-06 01:17:26 +00001280 }
1281
1282 for (int i = 0; i < 2; ++i) {
1283 if (notificationPipeFDs_[i] >= 0) {
1284 if (0 != ::close(notificationPipeFDs_[i])) {
1285 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1286 errno);
1287 }
Roger Meier12d70532011-12-14 23:35:28 +00001288 notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
Jake Farrellb0d95602011-12-06 01:17:26 +00001289 }
1290 }
1291}
1292
1293void TNonblockingIOThread::createNotificationPipe() {
Roger Meier12d70532011-12-14 23:35:28 +00001294 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1295 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
Jake Farrellb0d95602011-12-06 01:17:26 +00001296 throw TException("can't create notification pipe");
1297 }
Roger Meier12d70532011-12-14 23:35:28 +00001298 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1299 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
Roger Meier35f977c2012-02-28 20:50:13 +00001300 ::close(notificationPipeFDs_[0]);
1301 ::close(notificationPipeFDs_[1]);
Jake Farrellb0d95602011-12-06 01:17:26 +00001302 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1303 }
1304 for (int i = 0; i < 2; ++i) {
Roger Meier12d70532011-12-14 23:35:28 +00001305#if LIBEVENT_VERSION_NUMBER < 0x02000000
1306 int flags;
Jake Farrellb0d95602011-12-06 01:17:26 +00001307 if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
1308 fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
Roger Meier12d70532011-12-14 23:35:28 +00001309#else
1310 if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
1311#endif
Roger Meier35f977c2012-02-28 20:50:13 +00001312 ::close(notificationPipeFDs_[0]);
1313 ::close(notificationPipeFDs_[1]);
Jake Farrellb0d95602011-12-06 01:17:26 +00001314 throw TException("TNonblockingServer::createNotificationPipe() "
1315 "FD_CLOEXEC");
1316 }
1317 }
1318}
1319
1320/**
1321 * Register the core libevent events onto the proper base.
1322 */
1323void TNonblockingIOThread::registerEvents() {
1324 if (listenSocket_ >= 0) {
1325 // Register the server event
1326 event_set(&serverEvent_,
1327 listenSocket_,
1328 EV_READ | EV_PERSIST,
1329 TNonblockingIOThread::listenHandler,
1330 server_);
1331 event_base_set(eventBase_, &serverEvent_);
1332
1333 // Add the event and start up the server
1334 if (-1 == event_add(&serverEvent_, 0)) {
1335 throw TException("TNonblockingServer::serve(): "
1336 "event_add() failed on server listen event");
1337 }
1338 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
1339 number_);
1340 }
1341
1342 createNotificationPipe();
1343
1344 // Create an event to be notified when a task finishes
1345 event_set(&notificationEvent_,
1346 getNotificationRecvFD(),
1347 EV_READ | EV_PERSIST,
1348 TNonblockingIOThread::notifyHandler,
1349 this);
1350
1351 // Attach to the base
1352 event_base_set(eventBase_, &notificationEvent_);
1353
1354 // Add the event and start up the server
1355 if (-1 == event_add(&notificationEvent_, 0)) {
1356 throw TException("TNonblockingServer::serve(): "
1357 "event_add() failed on task-done notification event");
1358 }
1359 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
1360 number_);
1361}
1362
1363bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1364 int fd = getNotificationSendFD();
1365 if (fd < 0) {
1366 return false;
1367 }
1368
1369 const int kSize = sizeof(conn);
Roger Meier12d70532011-12-14 23:35:28 +00001370 if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001371 return false;
1372 }
1373
1374 return true;
1375}
1376
1377/* static */
Roger Meier12d70532011-12-14 23:35:28 +00001378void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001379 TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
1380 assert(ioThread);
Roger Meierd0cdecf2011-12-08 19:34:01 +00001381 (void)which;
Jake Farrellb0d95602011-12-06 01:17:26 +00001382
1383 while (true) {
1384 TNonblockingServer::TConnection* connection = 0;
1385 const int kSize = sizeof(connection);
Roger Meier12d70532011-12-14 23:35:28 +00001386 int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
Jake Farrellb0d95602011-12-06 01:17:26 +00001387 if (nBytes == kSize) {
1388 if (connection == NULL) {
1389 // this is the command to stop our thread, exit the handler!
1390 return;
1391 }
1392 connection->transition();
1393 } else if (nBytes > 0) {
1394 // throw away these bytes and hope that next time we get a solid read
1395 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
1396 nBytes, kSize);
1397 ioThread->breakLoop(true);
1398 return;
1399 } else if (nBytes == 0) {
1400 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1401 // exit the loop
1402 break;
1403 } else { // nBytes < 0
1404 if (errno != EWOULDBLOCK && errno != EAGAIN) {
1405 GlobalOutput.perror(
1406 "TNonblocking: notifyHandler read() failed: ", errno);
1407 ioThread->breakLoop(true);
1408 return;
1409 }
1410 // exit the loop
1411 break;
1412 }
1413 }
1414}
1415
1416void TNonblockingIOThread::breakLoop(bool error) {
1417 if (error) {
1418 GlobalOutput.printf(
1419 "TNonblockingServer: IO thread #%d exiting with error.", number_);
1420 // TODO: figure out something better to do here, but for now kill the
1421 // whole process.
1422 GlobalOutput.printf("TNonblockingServer: aborting process.");
1423 ::abort();
1424 }
1425
1426 // sets a flag so that the loop exits on the next event
Bryan Duxbury76c43682011-08-24 21:26:48 +00001427 event_base_loopbreak(eventBase_);
1428
Jake Farrellb0d95602011-12-06 01:17:26 +00001429 // event_base_loopbreak() only causes the loop to exit the next time
1430 // it wakes up. We need to force it to wake up, in case there are
1431 // no real events it needs to process.
Bryan Duxbury76c43682011-08-24 21:26:48 +00001432 //
Jake Farrellb0d95602011-12-06 01:17:26 +00001433 // If we're running in the same thread, we can't use the notify(0)
1434 // mechanism to stop the thread, but happily if we're running in the
1435 // same thread, this means the thread can't be blocking in the event
1436 // loop either.
Roger Meier12d70532011-12-14 23:35:28 +00001437 if (!Thread::is_current(threadId_)) {
Jake Farrellb0d95602011-12-06 01:17:26 +00001438 notify(NULL);
1439 }
1440}
1441
1442void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
Roger Meier12d70532011-12-14 23:35:28 +00001443#ifdef HAVE_SCHED_H
Jake Farrellb0d95602011-12-06 01:17:26 +00001444 // Start out with a standard, low-priority setup for the sched params.
1445 struct sched_param sp;
1446 bzero((void*) &sp, sizeof(sp));
1447 int policy = SCHED_OTHER;
1448
1449 // If desired, set up high-priority sched params structure.
1450 if (value) {
1451 // FIFO scheduler, ranked above default SCHED_OTHER queue
1452 policy = SCHED_FIFO;
1453 // The priority only compares us to other SCHED_FIFO threads, so we
1454 // just pick a random priority halfway between min & max.
1455 const int priority = (sched_get_priority_max(policy) +
1456 sched_get_priority_min(policy)) / 2;
1457
1458 sp.sched_priority = priority;
Bryan Duxbury76c43682011-08-24 21:26:48 +00001459 }
1460
Jake Farrellb0d95602011-12-06 01:17:26 +00001461 // Actually set the sched params for the current thread.
1462 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1463 GlobalOutput.printf(
1464 "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1465 } else {
1466 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
1467 }
Roger Meier12d70532011-12-14 23:35:28 +00001468#endif
Jake Farrellb0d95602011-12-06 01:17:26 +00001469}
Bryan Duxbury76c43682011-08-24 21:26:48 +00001470
Jake Farrellb0d95602011-12-06 01:17:26 +00001471void TNonblockingIOThread::run() {
Roger Meier12d70532011-12-14 23:35:28 +00001472 threadId_ = Thread::get_current();
Jake Farrellb0d95602011-12-06 01:17:26 +00001473
1474 assert(eventBase_ == 0);
1475 eventBase_ = event_base_new();
1476
1477 // Print some libevent stats
1478 if (number_ == 0) {
1479 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1480 event_get_version(),
1481 event_base_get_method(eventBase_));
1482 }
1483
1484
1485 registerEvents();
1486
1487 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
1488 number_);
1489
1490 if (useHighPriority_) {
1491 setCurrentThreadHighPriority(true);
1492 }
1493
1494 // Run libevent engine, never returns, invokes calls to eventHandler
1495 event_base_loop(eventBase_, 0);
1496
1497 if (useHighPriority_) {
1498 setCurrentThreadHighPriority(false);
1499 }
1500
1501 // cleans up our registered events
1502 cleanupEvents();
1503
1504 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
1505 number_);
1506}
1507
1508void TNonblockingIOThread::cleanupEvents() {
1509 // stop the listen socket, if any
1510 if (listenSocket_ >= 0) {
1511 if (event_del(&serverEvent_) == -1) {
1512 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
1513 }
1514 }
1515
1516 event_del(&notificationEvent_);
1517}
1518
1519
1520void TNonblockingIOThread::stop() {
1521 // This should cause the thread to fall out of its event loop ASAP.
1522 breakLoop(false);
1523}
1524
1525void TNonblockingIOThread::join() {
1526 // If this was a thread created by a factory (not the thread that called
1527 // serve()), we join() it to make sure we shut down fully.
1528 if (thread_) {
1529 try {
1530 // Note that it is safe to both join() ourselves twice, as well as join
1531 // the current thread as the pthread implementation checks for deadlock.
1532 thread_->join();
1533 } catch(...) {
1534 // swallow everything
1535 }
1536 }
Bryan Duxbury76c43682011-08-24 21:26:48 +00001537}
1538
T Jake Lucianib5e62212009-01-31 22:36:20 +00001539}}} // apache::thrift::server