blob: 8d72a1509ac9ff1d3898c3a9931a8c9f0f6be5dd [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#include "TNonblockingServer.h"
David Reisse11f3072008-10-07 21:39:19 +000021#include <concurrency/Exception.h>
David Reiss1c20c872010-03-09 05:20:14 +000022#include <transport/TSocket.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000023
Mark Sleee02385b2007-06-09 01:21:16 +000024#include <iostream>
Roger Meier30aae0c2011-07-08 12:23:31 +000025
26#ifdef HAVE_SYS_SOCKET_H
Mark Slee2f6404d2006-10-10 01:37:40 +000027#include <sys/socket.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000028#endif
29
30#ifdef HAVE_NETINET_IN_H
Mark Slee2f6404d2006-10-10 01:37:40 +000031#include <netinet/in.h>
32#include <netinet/tcp.h>
Bryan Duxbury76c43682011-08-24 21:26:48 +000033#include <arpa/inet.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000034#endif
35
36#ifdef HAVE_NETDB_H
Mark Sleefb4b5142007-11-20 01:27:08 +000037#include <netdb.h>
Roger Meier30aae0c2011-07-08 12:23:31 +000038#endif
39
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <fcntl.h>
41#include <errno.h>
42#include <assert.h>
43
David Reiss9b903442009-10-21 05:51:28 +000044#ifndef AF_LOCAL
45#define AF_LOCAL AF_UNIX
46#endif
47
T Jake Lucianib5e62212009-01-31 22:36:20 +000048namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000049
T Jake Lucianib5e62212009-01-31 22:36:20 +000050using namespace apache::thrift::protocol;
51using namespace apache::thrift::transport;
52using namespace apache::thrift::concurrency;
Mark Sleee02385b2007-06-09 01:21:16 +000053using namespace std;
David Reiss1c20c872010-03-09 05:20:14 +000054using apache::thrift::transport::TSocket;
55using apache::thrift::transport::TTransportException;
Mark Sleee02385b2007-06-09 01:21:16 +000056
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000057/// Three states for sockets: recv frame size, recv data, and send mode
58enum TSocketState {
59 SOCKET_RECV_FRAMING,
60 SOCKET_RECV,
61 SOCKET_SEND
62};
63
64/**
65 * Five states for the nonblocking server:
66 * 1) initialize
67 * 2) read 4 byte frame size
68 * 3) read frame of data
69 * 4) send back data (if any)
70 * 5) force immediate connection close
71 */
72enum TAppState {
73 APP_INIT,
74 APP_READ_FRAME_SIZE,
75 APP_READ_REQUEST,
76 APP_WAIT_TASK,
77 APP_SEND_RESULT,
78 APP_CLOSE_CONNECTION
79};
80
81/**
82 * Represents a connection that is handled via libevent. This connection
83 * essentially encapsulates a socket that has some associated libevent state.
84 */
85class TNonblockingServer::TConnection {
86 private:
87
88 /// Server handle
89 TNonblockingServer* server_;
90
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +000091 /// TProcessor
92 boost::shared_ptr<TProcessor> processor_;
93
Bryan Duxbury526fa8e2011-08-29 20:28:23 +000094 /// Object wrapping network socket
95 boost::shared_ptr<TSocket> tSocket_;
96
97 /// Libevent object
98 struct event event_;
99
100 /// Libevent flags
101 short eventFlags_;
102
103 /// Socket mode
104 TSocketState socketState_;
105
106 /// Application state
107 TAppState appState_;
108
109 /// How much data needed to read
110 uint32_t readWant_;
111
112 /// Where in the read buffer are we
113 uint32_t readBufferPos_;
114
115 /// Read buffer
116 uint8_t* readBuffer_;
117
118 /// Read buffer size
119 uint32_t readBufferSize_;
120
121 /// Write buffer
122 uint8_t* writeBuffer_;
123
124 /// Write buffer size
125 uint32_t writeBufferSize_;
126
127 /// How far through writing are we?
128 uint32_t writeBufferPos_;
129
130 /// Largest size of write buffer seen since buffer was constructed
131 size_t largestWriteBufferSize_;
132
133 /// Count of the number of calls for use with getResizeBufferEveryN().
134 int32_t callsForResize_;
135
136 /// Task handle
137 int taskHandle_;
138
139 /// Task event
140 struct event taskEvent_;
141
142 /// Transport to read from
143 boost::shared_ptr<TMemoryBuffer> inputTransport_;
144
145 /// Transport that processor writes to
146 boost::shared_ptr<TMemoryBuffer> outputTransport_;
147
148 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
149 boost::shared_ptr<TTransport> factoryInputTransport_;
150 boost::shared_ptr<TTransport> factoryOutputTransport_;
151
152 /// Protocol decoder
153 boost::shared_ptr<TProtocol> inputProtocol_;
154
155 /// Protocol encoder
156 boost::shared_ptr<TProtocol> outputProtocol_;
157
158 /// Server event handler, if any
159 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
160
161 /// Thrift call context, if any
162 void *connectionContext_;
163
164 /// Go into read mode
165 void setRead() {
166 setFlags(EV_READ | EV_PERSIST);
167 }
168
169 /// Go into write mode
170 void setWrite() {
171 setFlags(EV_WRITE | EV_PERSIST);
172 }
173
174 /// Set socket idle
175 void setIdle() {
176 setFlags(0);
177 }
178
179 /**
180 * Set event flags for this connection.
181 *
182 * @param eventFlags flags we pass to libevent for the connection.
183 */
184 void setFlags(short eventFlags);
185
186 /**
187 * Libevent handler called (via our static wrapper) when the connection
188 * socket had something happen. Rather than use the flags libevent passed,
189 * we use the connection state to determine whether we need to read or
190 * write the socket.
191 */
192 void workSocket();
193
194 /// Close this connection and free or reset its resources.
195 void close();
196
197 public:
198
199 class Task;
200
201 /// Constructor
202 TConnection(int socket, short eventFlags, TNonblockingServer *s,
203 const sockaddr* addr, socklen_t addrLen) {
204 readBuffer_ = NULL;
205 readBufferSize_ = 0;
206
207 // Allocate input and output transports
208 // these only need to be allocated once per TConnection (they don't need to be
209 // reallocated on init() call)
210 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
211 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
212 tSocket_.reset(new TSocket());
213
214 init(socket, eventFlags, s, addr, addrLen);
215 server_->incrementNumConnections();
216 }
217
218 ~TConnection() {
219 std::free(readBuffer_);
220 server_->decrementNumConnections();
221 }
222
223 /**
224 * Check buffers against any size limits and shrink it if exceeded.
225 *
226 * @param readLimit we reduce read buffer size to this (if nonzero).
227 * @param writeLimit if nonzero and write buffer is larger, replace it.
228 */
229 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
230
231 /// Initialize
232 void init(int socket, short eventFlags, TNonblockingServer *s,
233 const sockaddr* addr, socklen_t addrLen);
234
235 /**
236 * This is called when the application transitions from one state into
237 * another. This means that it has finished writing the data that it needed
238 * to, or finished receiving the data that it needed to.
239 */
240 void transition();
241
242 /**
243 * C-callable event handler for connection events. Provides a callback
244 * that libevent can understand which invokes connection_->workSocket().
245 *
246 * @param fd the descriptor the event occurred on.
247 * @param which the flags associated with the event.
248 * @param v void* callback arg where we placed TConnection's "this".
249 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000250 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000251 assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
252 ((TConnection*)v)->workSocket();
253 }
254
255 /**
256 * C-callable event handler for signaling task completion. Provides a
257 * callback that libevent can understand that will read a connection
258 * object's address from a pipe and call connection->transition() for
259 * that object.
260 *
261 * @param fd the descriptor the event occurred on.
262 */
Bryan Duxbury266b1732011-09-01 16:50:28 +0000263 static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000264 TConnection* connection;
265 ssize_t nBytes;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000266 while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000267 == sizeof(TConnection*)) {
268 connection->transition();
269 }
270 if (nBytes > 0) {
271 throw TException("TConnection::taskHandler unexpected partial read");
272 }
273 if (errno != EWOULDBLOCK && errno != EAGAIN) {
274 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
275 }
276 }
277
278 /**
279 * Notification to server that processing has ended on this request.
280 * Can be called either when processing is completed or when a waiting
281 * task has been preemptively terminated (on overload).
282 *
283 * @return true if successful, false if unable to notify (check errno).
284 */
285 bool notifyServer() {
286 TConnection* connection = this;
Bryan Duxbury266b1732011-09-01 16:50:28 +0000287 if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
288 sizeof(TConnection*), 0) != sizeof(TConnection*)) {
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000289 return false;
290 }
291
292 return true;
293 }
294
295 /// Force connection shutdown for this connection.
296 void forceClose() {
297 appState_ = APP_CLOSE_CONNECTION;
298 if (!notifyServer()) {
299 throw TException("TConnection::forceClose: failed write on notify pipe");
300 }
301 }
302
303 /// return the server this connection was initialized for.
304 TNonblockingServer* getServer() {
305 return server_;
306 }
307
308 /// get state of connection.
309 TAppState getState() {
310 return appState_;
311 }
312
313 /// return the TSocket transport wrapping this network connection
314 boost::shared_ptr<TSocket> getTSocket() const {
315 return tSocket_;
316 }
317
318 /// return the server event handler if any
319 boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
320 return serverEventHandler_;
321 }
322
323 /// return the Thrift connection context if any
324 void* getConnectionContext() {
325 return connectionContext_;
326 }
327
328};
329
330class TNonblockingServer::TConnection::Task: public Runnable {
Mark Sleee02385b2007-06-09 01:21:16 +0000331 public:
332 Task(boost::shared_ptr<TProcessor> processor,
333 boost::shared_ptr<TProtocol> input,
334 boost::shared_ptr<TProtocol> output,
David Reiss01fe1532010-03-09 05:19:25 +0000335 TConnection* connection) :
Mark Sleee02385b2007-06-09 01:21:16 +0000336 processor_(processor),
337 input_(input),
338 output_(output),
David Reiss105961d2010-10-06 17:10:17 +0000339 connection_(connection),
340 serverEventHandler_(connection_->getServerEventHandler()),
341 connectionContext_(connection_->getConnectionContext()) {}
Mark Sleee02385b2007-06-09 01:21:16 +0000342
343 void run() {
344 try {
David Reiss105961d2010-10-06 17:10:17 +0000345 for (;;) {
346 if (serverEventHandler_ != NULL) {
347 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
348 }
349 if (!processor_->process(input_, output_, connectionContext_) ||
350 !input_->getTransport()->peek()) {
Mark Sleee02385b2007-06-09 01:21:16 +0000351 break;
352 }
353 }
Bryan Duxbury1e987582011-08-25 17:33:03 +0000354 } catch (const TTransportException& ttx) {
355 GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
356 } catch (const bad_alloc&) {
357 GlobalOutput("TNonblockingServer caught bad_alloc exception.");
David Reiss28e88ec2010-03-09 05:19:27 +0000358 exit(-1);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000359 } catch (const std::exception& x) {
360 GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
361 typeid(x).name(), x.what());
Mark Sleee02385b2007-06-09 01:21:16 +0000362 } catch (...) {
Bryan Duxbury1e987582011-08-25 17:33:03 +0000363 GlobalOutput("TNonblockingServer uncaught exception.");
Mark Sleee02385b2007-06-09 01:21:16 +0000364 }
Mark Slee79b16942007-11-26 19:05:29 +0000365
David Reiss01fe1532010-03-09 05:19:25 +0000366 // Signal completion back to the libevent thread via a pipe
367 if (!connection_->notifyServer()) {
368 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
Mark Sleee02385b2007-06-09 01:21:16 +0000369 }
David Reiss01fe1532010-03-09 05:19:25 +0000370 }
371
372 TConnection* getTConnection() {
373 return connection_;
Mark Sleee02385b2007-06-09 01:21:16 +0000374 }
375
376 private:
377 boost::shared_ptr<TProcessor> processor_;
378 boost::shared_ptr<TProtocol> input_;
379 boost::shared_ptr<TProtocol> output_;
David Reiss01fe1532010-03-09 05:19:25 +0000380 TConnection* connection_;
David Reiss105961d2010-10-06 17:10:17 +0000381 boost::shared_ptr<TServerEventHandler> serverEventHandler_;
382 void* connectionContext_;
Mark Sleee02385b2007-06-09 01:21:16 +0000383};
Mark Slee5ea15f92007-03-05 22:55:59 +0000384
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000385void TNonblockingServer::TConnection::init(int socket, short eventFlags,
386 TNonblockingServer* s,
387 const sockaddr* addr,
388 socklen_t addrLen) {
David Reiss105961d2010-10-06 17:10:17 +0000389 tSocket_->setSocketFD(socket);
390 tSocket_->setCachedAddress(addr, addrLen);
391
Mark Slee2f6404d2006-10-10 01:37:40 +0000392 server_ = s;
393 appState_ = APP_INIT;
394 eventFlags_ = 0;
395
396 readBufferPos_ = 0;
397 readWant_ = 0;
398
399 writeBuffer_ = NULL;
400 writeBufferSize_ = 0;
401 writeBufferPos_ = 0;
David Reiss54bec5d2010-10-06 17:10:45 +0000402 largestWriteBufferSize_ = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000403
David Reiss89a12942010-10-06 17:10:52 +0000404 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 appState_ = APP_INIT;
David Reiss54bec5d2010-10-06 17:10:45 +0000406 callsForResize_ = 0;
Mark Slee79b16942007-11-26 19:05:29 +0000407
Mark Slee2f6404d2006-10-10 01:37:40 +0000408 // Set flags, which also registers the event
409 setFlags(eventFlags);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000410
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000411 // get input/transports
412 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
413 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000414
415 // Create protocol
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000416 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
417 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
David Reiss105961d2010-10-06 17:10:17 +0000418
419 // Set up for any server event handler
420 serverEventHandler_ = server_->getEventHandler();
421 if (serverEventHandler_ != NULL) {
422 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
423 } else {
424 connectionContext_ = NULL;
425 }
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000426
427 // Get the processor
428 processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000429}
430
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000431void TNonblockingServer::TConnection::workSocket() {
David Reiss105961d2010-10-06 17:10:17 +0000432 int got=0, left=0, sent=0;
Mark Sleeaaa23ed2007-01-30 19:52:05 +0000433 uint32_t fetch = 0;
Mark Slee2f6404d2006-10-10 01:37:40 +0000434
435 switch (socketState_) {
David Reiss89a12942010-10-06 17:10:52 +0000436 case SOCKET_RECV_FRAMING:
437 union {
438 uint8_t buf[sizeof(uint32_t)];
439 int32_t size;
440 } framing;
Mark Slee2f6404d2006-10-10 01:37:40 +0000441
David Reiss89a12942010-10-06 17:10:52 +0000442 // if we've already received some bytes we kept them here
443 framing.size = readWant_;
444 // determine size of this frame
445 try {
446 // Read from the socket
447 fetch = tSocket_->read(&framing.buf[readBufferPos_],
448 uint32_t(sizeof(framing.size) - readBufferPos_));
449 if (fetch == 0) {
450 // Whenever we get here it means a remote disconnect
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 close();
452 return;
453 }
David Reiss89a12942010-10-06 17:10:52 +0000454 readBufferPos_ += fetch;
455 } catch (TTransportException& te) {
456 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
457 close();
458
459 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 }
461
David Reiss89a12942010-10-06 17:10:52 +0000462 if (readBufferPos_ < sizeof(framing.size)) {
463 // more needed before frame size is known -- save what we have so far
464 readWant_ = framing.size;
465 return;
466 }
467
468 readWant_ = ntohl(framing.size);
469 if (static_cast<int>(readWant_) <= 0) {
470 GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
471 close();
472 return;
473 }
474 // size known; now get the rest of the frame
475 transition();
476 return;
477
478 case SOCKET_RECV:
479 // It is an error to be in this state if we already have all the data
480 assert(readBufferPos_ < readWant_);
481
David Reiss105961d2010-10-06 17:10:17 +0000482 try {
483 // Read from the socket
484 fetch = readWant_ - readBufferPos_;
485 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
486 }
487 catch (TTransportException& te) {
488 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
489 close();
Mark Slee79b16942007-11-26 19:05:29 +0000490
David Reiss105961d2010-10-06 17:10:17 +0000491 return;
492 }
493
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 if (got > 0) {
495 // Move along in the buffer
496 readBufferPos_ += got;
497
498 // Check that we did not overdo it
499 assert(readBufferPos_ <= readWant_);
Mark Slee79b16942007-11-26 19:05:29 +0000500
Mark Slee2f6404d2006-10-10 01:37:40 +0000501 // We are done reading, move onto the next state
502 if (readBufferPos_ == readWant_) {
503 transition();
504 }
505 return;
Mark Slee2f6404d2006-10-10 01:37:40 +0000506 }
507
508 // Whenever we get down here it means a remote disconnect
509 close();
Mark Slee79b16942007-11-26 19:05:29 +0000510
Mark Slee2f6404d2006-10-10 01:37:40 +0000511 return;
512
513 case SOCKET_SEND:
514 // Should never have position past size
515 assert(writeBufferPos_ <= writeBufferSize_);
516
517 // If there is no data to send, then let us move on
518 if (writeBufferPos_ == writeBufferSize_) {
Mark Slee79b16942007-11-26 19:05:29 +0000519 GlobalOutput("WARNING: Send state with no data to send\n");
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 transition();
521 return;
522 }
523
David Reiss105961d2010-10-06 17:10:17 +0000524 try {
525 left = writeBufferSize_ - writeBufferPos_;
526 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
527 }
528 catch (TTransportException& te) {
529 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
Mark Slee2f6404d2006-10-10 01:37:40 +0000530 close();
531 return;
532 }
533
534 writeBufferPos_ += sent;
535
536 // Did we overdo it?
537 assert(writeBufferPos_ <= writeBufferSize_);
538
Mark Slee79b16942007-11-26 19:05:29 +0000539 // We are done!
Mark Slee2f6404d2006-10-10 01:37:40 +0000540 if (writeBufferPos_ == writeBufferSize_) {
541 transition();
542 }
543
544 return;
545
546 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000547 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000548 assert(0);
549 }
550}
551
552/**
553 * This is called when the application transitions from one state into
554 * another. This means that it has finished writing the data that it needed
555 * to, or finished receiving the data that it needed to.
556 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000557void TNonblockingServer::TConnection::transition() {
558
559 int sz = 0;
560
Mark Slee2f6404d2006-10-10 01:37:40 +0000561 // Switch upon the state that we are currently in and move to a new state
562 switch (appState_) {
563
564 case APP_READ_REQUEST:
565 // We are done reading the request, package the read buffer into transport
566 // and get back some data from the dispatch function
567 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
David Reiss7197efb2010-10-06 17:10:43 +0000568 outputTransport_->resetBuffer();
David Reiss52cb7a72008-06-30 21:40:35 +0000569 // Prepend four bytes of blank space to the buffer so we can
570 // write the frame size there later.
571 outputTransport_->getWritePtr(4);
572 outputTransport_->wroteBytes(4);
Mark Slee79b16942007-11-26 19:05:29 +0000573
David Reiss01fe1532010-03-09 05:19:25 +0000574 server_->incrementActiveProcessors();
575
Mark Sleee02385b2007-06-09 01:21:16 +0000576 if (server_->isThreadPoolProcessing()) {
577 // We are setting up a Task to do this work and we will wait on it
Mark Slee79b16942007-11-26 19:05:29 +0000578
David Reiss01fe1532010-03-09 05:19:25 +0000579 // Create task and dispatch to the thread manager
580 boost::shared_ptr<Runnable> task =
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000581 boost::shared_ptr<Runnable>(new Task(processor_,
David Reiss01fe1532010-03-09 05:19:25 +0000582 inputProtocol_,
583 outputProtocol_,
584 this));
585 // The application is now waiting on the task to finish
586 appState_ = APP_WAIT_TASK;
Mark Slee2f6404d2006-10-10 01:37:40 +0000587
David Reisse11f3072008-10-07 21:39:19 +0000588 try {
589 server_->addTask(task);
590 } catch (IllegalStateException & ise) {
591 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
David Reissc53a5942008-10-07 23:55:24 +0000592 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
David Reisse11f3072008-10-07 21:39:19 +0000593 close();
594 }
Mark Slee402ee282007-08-23 01:43:20 +0000595
David Reiss01fe1532010-03-09 05:19:25 +0000596 // Set this connection idle so that libevent doesn't process more
597 // data on it while we're still waiting for the threadmanager to
598 // finish this task
599 setIdle();
600 return;
Mark Sleee02385b2007-06-09 01:21:16 +0000601 } else {
602 try {
603 // Invoke the processor
Bryan Duxbury6dd9cd02011-09-01 18:06:20 +0000604 processor_->process(inputProtocol_, outputProtocol_,
605 connectionContext_);
Bryan Duxbury1e987582011-08-25 17:33:03 +0000606 } catch (const TTransportException &ttx) {
607 GlobalOutput.printf("TNonblockingServer transport error in "
608 "process(): %s", ttx.what());
David Reiss01fe1532010-03-09 05:19:25 +0000609 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000610 close();
611 return;
Bryan Duxbury1e987582011-08-25 17:33:03 +0000612 } catch (const std::exception &x) {
613 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
614 typeid(x).name(), x.what());
David Reiss01fe1532010-03-09 05:19:25 +0000615 server_->decrementActiveProcessors();
Mark Slee79b16942007-11-26 19:05:29 +0000616 close();
Mark Sleee02385b2007-06-09 01:21:16 +0000617 return;
618 } catch (...) {
David Reiss01e55c12008-07-13 22:18:51 +0000619 GlobalOutput.printf("Server::process() unknown exception");
David Reiss01fe1532010-03-09 05:19:25 +0000620 server_->decrementActiveProcessors();
Mark Sleee02385b2007-06-09 01:21:16 +0000621 close();
622 return;
623 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000624 }
625
Mark Slee402ee282007-08-23 01:43:20 +0000626 // Intentionally fall through here, the call to process has written into
627 // the writeBuffer_
628
Mark Sleee02385b2007-06-09 01:21:16 +0000629 case APP_WAIT_TASK:
630 // We have now finished processing a task and the result has been written
631 // into the outputTransport_, so we grab its contents and place them into
632 // the writeBuffer_ for actual writing by the libevent thread
633
David Reiss01fe1532010-03-09 05:19:25 +0000634 server_->decrementActiveProcessors();
Mark Slee2f6404d2006-10-10 01:37:40 +0000635 // Get the result of the operation
636 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
637
638 // If the function call generated return data, then move into the send
639 // state and get going
David Reissaf787782008-07-03 20:29:34 +0000640 // 4 bytes were reserved for frame size
David Reiss52cb7a72008-06-30 21:40:35 +0000641 if (writeBufferSize_ > 4) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000642
643 // Move into write state
644 writeBufferPos_ = 0;
645 socketState_ = SOCKET_SEND;
Mark Slee92f00fb2006-10-25 01:28:17 +0000646
David Reissaf787782008-07-03 20:29:34 +0000647 // Put the frame size into the write buffer
648 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
649 memcpy(writeBuffer_, &frameSize, 4);
Mark Slee2f6404d2006-10-10 01:37:40 +0000650
651 // Socket into write mode
David Reiss52cb7a72008-06-30 21:40:35 +0000652 appState_ = APP_SEND_RESULT;
Mark Slee2f6404d2006-10-10 01:37:40 +0000653 setWrite();
654
655 // Try to work the socket immediately
Mark Sleee02385b2007-06-09 01:21:16 +0000656 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000657
658 return;
659 }
660
David Reissc51986f2009-03-24 20:01:25 +0000661 // In this case, the request was oneway and we should fall through
Mark Slee2f6404d2006-10-10 01:37:40 +0000662 // right back into the read frame header state
Mark Slee92f00fb2006-10-25 01:28:17 +0000663 goto LABEL_APP_INIT;
664
Mark Slee2f6404d2006-10-10 01:37:40 +0000665 case APP_SEND_RESULT:
David Reiss54bec5d2010-10-06 17:10:45 +0000666 // it's now safe to perform buffer size housekeeping.
667 if (writeBufferSize_ > largestWriteBufferSize_) {
668 largestWriteBufferSize_ = writeBufferSize_;
669 }
670 if (server_->getResizeBufferEveryN() > 0
671 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
672 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
673 server_->getIdleWriteBufferLimit());
674 callsForResize_ = 0;
675 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000676
677 // N.B.: We also intentionally fall through here into the INIT state!
678
Mark Slee92f00fb2006-10-25 01:28:17 +0000679 LABEL_APP_INIT:
Mark Slee2f6404d2006-10-10 01:37:40 +0000680 case APP_INIT:
681
682 // Clear write buffer variables
683 writeBuffer_ = NULL;
684 writeBufferPos_ = 0;
685 writeBufferSize_ = 0;
686
Mark Slee2f6404d2006-10-10 01:37:40 +0000687 // Into read4 state we go
David Reiss89a12942010-10-06 17:10:52 +0000688 socketState_ = SOCKET_RECV_FRAMING;
Mark Slee2f6404d2006-10-10 01:37:40 +0000689 appState_ = APP_READ_FRAME_SIZE;
690
David Reiss89a12942010-10-06 17:10:52 +0000691 readBufferPos_ = 0;
692
Mark Slee2f6404d2006-10-10 01:37:40 +0000693 // Register read event
694 setRead();
David Reiss84e63ab2008-03-07 20:12:28 +0000695
Mark Slee2f6404d2006-10-10 01:37:40 +0000696 // Try to work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000697 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000698
699 return;
700
701 case APP_READ_FRAME_SIZE:
David Reiss89a12942010-10-06 17:10:52 +0000702 // We just read the request length
703 // Double the buffer size until it is big enough
704 if (readWant_ > readBufferSize_) {
705 if (readBufferSize_ == 0) {
706 readBufferSize_ = 1;
707 }
708 uint32_t newSize = readBufferSize_;
709 while (readWant_ > newSize) {
710 newSize *= 2;
711 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000712
David Reiss89a12942010-10-06 17:10:52 +0000713 uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
714 if (newBuffer == NULL) {
715 // nothing else to be done...
716 throw std::bad_alloc();
717 }
718 readBuffer_ = newBuffer;
719 readBufferSize_ = newSize;
Mark Slee2f6404d2006-10-10 01:37:40 +0000720 }
721
Mark Slee2f6404d2006-10-10 01:37:40 +0000722 readBufferPos_= 0;
723
724 // Move into read request state
David Reiss89a12942010-10-06 17:10:52 +0000725 socketState_ = SOCKET_RECV;
Mark Slee2f6404d2006-10-10 01:37:40 +0000726 appState_ = APP_READ_REQUEST;
727
728 // Work the socket right away
Mark Sleee02385b2007-06-09 01:21:16 +0000729 // workSocket();
Mark Slee2f6404d2006-10-10 01:37:40 +0000730
731 return;
732
David Reiss01fe1532010-03-09 05:19:25 +0000733 case APP_CLOSE_CONNECTION:
734 server_->decrementActiveProcessors();
735 close();
736 return;
737
Mark Slee2f6404d2006-10-10 01:37:40 +0000738 default:
David Reiss3bb5e052010-01-25 19:31:31 +0000739 GlobalOutput.printf("Unexpected Application State %d", appState_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000740 assert(0);
741 }
742}
743
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000744void TNonblockingServer::TConnection::setFlags(short eventFlags) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000745 // Catch the do nothing case
746 if (eventFlags_ == eventFlags) {
747 return;
748 }
749
750 // Delete a previously existing event
751 if (eventFlags_ != 0) {
752 if (event_del(&event_) == -1) {
boz6ded7752007-06-05 22:41:18 +0000753 GlobalOutput("TConnection::setFlags event_del");
Mark Slee2f6404d2006-10-10 01:37:40 +0000754 return;
755 }
756 }
757
758 // Update in memory structure
759 eventFlags_ = eventFlags;
760
Mark Slee402ee282007-08-23 01:43:20 +0000761 // Do not call event_set if there are no flags
762 if (!eventFlags_) {
763 return;
764 }
765
David Reiss01fe1532010-03-09 05:19:25 +0000766 /*
Mark Slee2f6404d2006-10-10 01:37:40 +0000767 * event_set:
768 *
769 * Prepares the event structure &event to be used in future calls to
770 * event_add() and event_del(). The event will be prepared to call the
Mark Sleee02385b2007-06-09 01:21:16 +0000771 * eventHandler using the 'sock' file descriptor to monitor events.
Mark Slee2f6404d2006-10-10 01:37:40 +0000772 *
773 * The events can be either EV_READ, EV_WRITE, or both, indicating
774 * that an application can read or write from the file respectively without
775 * blocking.
776 *
Mark Sleee02385b2007-06-09 01:21:16 +0000777 * The eventHandler will be called with the file descriptor that triggered
Mark Slee2f6404d2006-10-10 01:37:40 +0000778 * the event and the type of event which will be one of: EV_TIMEOUT,
779 * EV_SIGNAL, EV_READ, EV_WRITE.
780 *
781 * The additional flag EV_PERSIST makes an event_add() persistent until
782 * event_del() has been called.
783 *
784 * Once initialized, the &event struct can be used repeatedly with
785 * event_add() and event_del() and does not need to be reinitialized unless
Mark Sleee02385b2007-06-09 01:21:16 +0000786 * the eventHandler and/or the argument to it are to be changed. However,
Mark Slee2f6404d2006-10-10 01:37:40 +0000787 * when an ev structure has been added to libevent using event_add() the
788 * structure must persist until the event occurs (assuming EV_PERSIST
789 * is not set) or is removed using event_del(). You may not reuse the same
790 * ev structure for multiple monitored descriptors; each descriptor needs
791 * its own ev.
792 */
David Reiss105961d2010-10-06 17:10:17 +0000793 event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
794 TConnection::eventHandler, this);
Mark Slee79b16942007-11-26 19:05:29 +0000795 event_base_set(server_->getEventBase(), &event_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000796
797 // Add the event
798 if (event_add(&event_, 0) == -1) {
Mark Slee17496a02007-08-02 06:37:40 +0000799 GlobalOutput("TConnection::setFlags(): could not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +0000800 }
801}
802
803/**
804 * Closes a connection
805 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000806void TNonblockingServer::TConnection::close() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000807 // Delete the registered libevent
808 if (event_del(&event_) == -1) {
David Reiss105961d2010-10-06 17:10:17 +0000809 GlobalOutput.perror("TConnection::close() event_del", errno);
810 }
811
812 if (serverEventHandler_ != NULL) {
813 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000814 }
815
816 // Close the socket
David Reiss105961d2010-10-06 17:10:17 +0000817 tSocket_->close();
Mark Slee2f6404d2006-10-10 01:37:40 +0000818
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000819 // close any factory produced transports
820 factoryInputTransport_->close();
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000821 factoryOutputTransport_->close();
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000822
Mark Slee2f6404d2006-10-10 01:37:40 +0000823 // Give this object back to the server that owns it
824 server_->returnConnection(this);
825}
826
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000827void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
828 size_t readLimit,
829 size_t writeLimit) {
David Reiss54bec5d2010-10-06 17:10:45 +0000830 if (readLimit > 0 && readBufferSize_ > readLimit) {
David Reiss89a12942010-10-06 17:10:52 +0000831 free(readBuffer_);
832 readBuffer_ = NULL;
833 readBufferSize_ = 0;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000834 }
David Reiss54bec5d2010-10-06 17:10:45 +0000835
836 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
837 // just start over
David Reiss89a12942010-10-06 17:10:52 +0000838 outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
David Reiss54bec5d2010-10-06 17:10:45 +0000839 largestWriteBufferSize_ = 0;
840 }
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000841}
842
David Reiss8ede8182010-09-02 15:26:28 +0000843TNonblockingServer::~TNonblockingServer() {
844 // TODO: We currently leak any active TConnection objects.
845 // Since we're shutting down and destroying the event_base, the TConnection
846 // objects will never receive any additional callbacks. (And even if they
847 // did, it would be bad, since they keep a pointer around to the server,
848 // which is being destroyed.)
849
850 // Clean up unused TConnection objects in connectionStack_
851 while (!connectionStack_.empty()) {
852 TConnection* connection = connectionStack_.top();
853 connectionStack_.pop();
854 delete connection;
855 }
856
Roger Meierc1905582011-08-02 23:37:36 +0000857 if (eventBase_ && ownEventBase_) {
David Reiss8ede8182010-09-02 15:26:28 +0000858 event_base_free(eventBase_);
859 }
860
861 if (serverSocket_ >= 0) {
862 close(serverSocket_);
863 }
864}
865
Mark Slee2f6404d2006-10-10 01:37:40 +0000866/**
867 * Creates a new connection either by reusing an object off the stack or
868 * by allocating a new one entirely
869 */
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000870TNonblockingServer::TConnection* TNonblockingServer::createConnection(
871 int socket, short flags,
872 const sockaddr* addr,
873 socklen_t addrLen) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000874 // Check the stack
875 if (connectionStack_.empty()) {
David Reiss105961d2010-10-06 17:10:17 +0000876 return new TConnection(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000877 } else {
878 TConnection* result = connectionStack_.top();
879 connectionStack_.pop();
David Reiss105961d2010-10-06 17:10:17 +0000880 result->init(socket, flags, this, addr, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000881 return result;
882 }
883}
884
885/**
886 * Returns a connection to the stack
887 */
888void TNonblockingServer::returnConnection(TConnection* connection) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000889 if (connectionStackLimit_ &&
890 (connectionStack_.size() >= connectionStackLimit_)) {
891 delete connection;
892 } else {
David Reiss54bec5d2010-10-06 17:10:45 +0000893 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000894 connectionStack_.push(connection);
895 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000896}
897
898/**
David Reissa79e4882008-03-05 07:51:47 +0000899 * Server socket had something happen. We accept all waiting client
900 * connections on fd and assign TConnection objects to handle those requests.
Mark Slee2f6404d2006-10-10 01:37:40 +0000901 */
902void TNonblockingServer::handleEvent(int fd, short which) {
Roger Meier3b771a12010-11-17 22:11:26 +0000903 (void) which;
David Reiss3bb5e052010-01-25 19:31:31 +0000904 // Make sure that libevent didn't mess up the socket handles
Mark Slee2f6404d2006-10-10 01:37:40 +0000905 assert(fd == serverSocket_);
Mark Slee79b16942007-11-26 19:05:29 +0000906
Mark Slee2f6404d2006-10-10 01:37:40 +0000907 // Server socket accepted a new connection
908 socklen_t addrLen;
David Reiss105961d2010-10-06 17:10:17 +0000909 sockaddr_storage addrStorage;
910 sockaddr* addrp = (sockaddr*)&addrStorage;
911 addrLen = sizeof(addrStorage);
Mark Slee79b16942007-11-26 19:05:29 +0000912
Mark Slee2f6404d2006-10-10 01:37:40 +0000913 // Going to accept a new client socket
914 int clientSocket;
Mark Slee79b16942007-11-26 19:05:29 +0000915
Mark Slee2f6404d2006-10-10 01:37:40 +0000916 // Accept as many new clients as possible, even though libevent signaled only
917 // one, this helps us to avoid having to go back into the libevent engine so
918 // many times
David Reiss105961d2010-10-06 17:10:17 +0000919 while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
David Reiss01fe1532010-03-09 05:19:25 +0000920 // If we're overloaded, take action here
921 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
922 nConnectionsDropped_++;
923 nTotalConnectionsDropped_++;
924 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
925 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000926 return;
David Reiss01fe1532010-03-09 05:19:25 +0000927 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
928 if (!drainPendingTask()) {
929 // Nothing left to discard, so we drop connection instead.
930 close(clientSocket);
David Reiss83b8fda2010-03-09 05:19:34 +0000931 return;
David Reiss01fe1532010-03-09 05:19:25 +0000932 }
933 }
934 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000935 // Explicitly set this socket to NONBLOCK mode
936 int flags;
937 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
938 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
David Reiss01e55c12008-07-13 22:18:51 +0000939 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000940 close(clientSocket);
941 return;
942 }
943
944 // Create a new TConnection for this client socket.
945 TConnection* clientConnection =
David Reiss105961d2010-10-06 17:10:17 +0000946 createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000947
948 // Fail fast if we could not create a TConnection object
949 if (clientConnection == NULL) {
David Reiss01e55c12008-07-13 22:18:51 +0000950 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
Mark Slee2f6404d2006-10-10 01:37:40 +0000951 close(clientSocket);
952 return;
953 }
954
955 // Put this client connection into the proper state
956 clientConnection->transition();
David Reiss3e7fca42009-09-19 01:59:13 +0000957
958 // addrLen is written by the accept() call, so needs to be set before the next call.
David Reiss105961d2010-10-06 17:10:17 +0000959 addrLen = sizeof(addrStorage);
Mark Slee2f6404d2006-10-10 01:37:40 +0000960 }
Mark Slee79b16942007-11-26 19:05:29 +0000961
Mark Slee2f6404d2006-10-10 01:37:40 +0000962 // Done looping accept, now we have to make sure the error is due to
963 // blocking. Any other error is a problem
964 if (errno != EAGAIN && errno != EWOULDBLOCK) {
David Reiss01e55c12008-07-13 22:18:51 +0000965 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
Mark Slee2f6404d2006-10-10 01:37:40 +0000966 }
967}
968
969/**
Mark Slee79b16942007-11-26 19:05:29 +0000970 * Creates a socket to listen on and binds it to the local port.
Mark Slee2f6404d2006-10-10 01:37:40 +0000971 */
Mark Slee79b16942007-11-26 19:05:29 +0000972void TNonblockingServer::listenSocket() {
973 int s;
Mark Sleefb4b5142007-11-20 01:27:08 +0000974 struct addrinfo hints, *res, *res0;
975 int error;
Mark Slee79b16942007-11-26 19:05:29 +0000976
Mark Sleefb4b5142007-11-20 01:27:08 +0000977 char port[sizeof("65536") + 1];
978 memset(&hints, 0, sizeof(hints));
979 hints.ai_family = PF_UNSPEC;
980 hints.ai_socktype = SOCK_STREAM;
Mark Slee256bdc42007-11-27 08:42:19 +0000981 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
Mark Sleefb4b5142007-11-20 01:27:08 +0000982 sprintf(port, "%d", port_);
983
984 // Wildcard address
985 error = getaddrinfo(NULL, port, &hints, &res0);
986 if (error) {
David Reiss9b209552008-04-08 06:26:05 +0000987 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
988 GlobalOutput(errStr.c_str());
Mark Sleefb4b5142007-11-20 01:27:08 +0000989 return;
990 }
991
992 // Pick the ipv6 address first since ipv4 addresses can be mapped
993 // into ipv6 space.
994 for (res = res0; res; res = res->ai_next) {
995 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
996 break;
997 }
998
Mark Slee2f6404d2006-10-10 01:37:40 +0000999 // Create the server socket
Mark Slee79b16942007-11-26 19:05:29 +00001000 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
1001 if (s == -1) {
1002 freeaddrinfo(res0);
1003 throw TException("TNonblockingServer::serve() socket() -1");
Mark Slee2f6404d2006-10-10 01:37:40 +00001004 }
1005
David Reiss13aea462008-06-10 22:56:04 +00001006 #ifdef IPV6_V6ONLY
David Reisseee98be2010-03-09 05:20:10 +00001007 if (res->ai_family == AF_INET6) {
1008 int zero = 0;
Roger Meier30aae0c2011-07-08 12:23:31 +00001009 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
David Reisseee98be2010-03-09 05:20:10 +00001010 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
1011 }
David Reiss13aea462008-06-10 22:56:04 +00001012 }
1013 #endif // #ifdef IPV6_V6ONLY
1014
1015
Mark Slee79b16942007-11-26 19:05:29 +00001016 int one = 1;
1017
1018 // Set reuseaddr to avoid 2MSL delay on server restart
Roger Meier30aae0c2011-07-08 12:23:31 +00001019 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
Mark Slee79b16942007-11-26 19:05:29 +00001020
Roger Meier30aae0c2011-07-08 12:23:31 +00001021 if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
Mark Slee79b16942007-11-26 19:05:29 +00001022 close(s);
1023 freeaddrinfo(res0);
1024 throw TException("TNonblockingServer::serve() bind");
1025 }
1026
1027 // Done with the addr info
1028 freeaddrinfo(res0);
1029
1030 // Set up this file descriptor for listening
1031 listenSocket(s);
1032}
1033
1034/**
1035 * Takes a socket created by listenSocket() and sets various options on it
1036 * to prepare for use in the server.
1037 */
1038void TNonblockingServer::listenSocket(int s) {
Mark Slee2f6404d2006-10-10 01:37:40 +00001039 // Set socket to nonblocking mode
1040 int flags;
Mark Slee79b16942007-11-26 19:05:29 +00001041 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
1042 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
1043 close(s);
1044 throw TException("TNonblockingServer::serve() O_NONBLOCK");
Mark Slee2f6404d2006-10-10 01:37:40 +00001045 }
1046
1047 int one = 1;
1048 struct linger ling = {0, 0};
Mark Slee2f6404d2006-10-10 01:37:40 +00001049
1050 // Keepalive to ensure full result flushing
Roger Meier30aae0c2011-07-08 12:23:31 +00001051 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001052
1053 // Turn linger off to avoid hung sockets
Roger Meier30aae0c2011-07-08 12:23:31 +00001054 setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
Mark Slee2f6404d2006-10-10 01:37:40 +00001055
1056 // Set TCP nodelay if available, MAC OS X Hack
1057 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
1058 #ifndef TCP_NOPUSH
Roger Meier30aae0c2011-07-08 12:23:31 +00001059 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
Mark Slee2f6404d2006-10-10 01:37:40 +00001060 #endif
1061
David Reiss1c20c872010-03-09 05:20:14 +00001062 #ifdef TCP_LOW_MIN_RTO
1063 if (TSocket::getUseLowMinRto()) {
Roger Meier30aae0c2011-07-08 12:23:31 +00001064 setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
David Reiss1c20c872010-03-09 05:20:14 +00001065 }
1066 #endif
1067
Mark Slee79b16942007-11-26 19:05:29 +00001068 if (listen(s, LISTEN_BACKLOG) == -1) {
1069 close(s);
1070 throw TException("TNonblockingServer::serve() listen");
Mark Slee2f6404d2006-10-10 01:37:40 +00001071 }
1072
Mark Slee79b16942007-11-26 19:05:29 +00001073 // Cool, this socket is good to go, set it as the serverSocket_
1074 serverSocket_ = s;
1075}
1076
David Reiss01fe1532010-03-09 05:19:25 +00001077void TNonblockingServer::createNotificationPipe() {
Roger Meier30aae0c2011-07-08 12:23:31 +00001078 if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1079 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1080 throw TException("can't create notification pipe");
David Reiss01fe1532010-03-09 05:19:25 +00001081 }
Roger Meier30aae0c2011-07-08 12:23:31 +00001082 if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
1083 evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
David Reiss83b8fda2010-03-09 05:19:34 +00001084 close(notificationPipeFDs_[0]);
1085 close(notificationPipeFDs_[1]);
1086 throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
1087 }
David Reiss01fe1532010-03-09 05:19:25 +00001088}
1089
Mark Slee79b16942007-11-26 19:05:29 +00001090/**
1091 * Register the core libevent events onto the proper base.
1092 */
Roger Meierc1905582011-08-02 23:37:36 +00001093void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
Mark Slee79b16942007-11-26 19:05:29 +00001094 assert(serverSocket_ != -1);
1095 assert(!eventBase_);
1096 eventBase_ = base;
Roger Meierc1905582011-08-02 23:37:36 +00001097 ownEventBase_ = ownEventBase;
Mark Slee79b16942007-11-26 19:05:29 +00001098
1099 // Print some libevent stats
David Reiss01e55c12008-07-13 22:18:51 +00001100 GlobalOutput.printf("libevent %s method %s",
Mark Slee79b16942007-11-26 19:05:29 +00001101 event_get_version(),
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001102 event_base_get_method(eventBase_));
Mark Slee2f6404d2006-10-10 01:37:40 +00001103
1104 // Register the server event
Mark Slee79b16942007-11-26 19:05:29 +00001105 event_set(&serverEvent_,
Mark Slee2f6404d2006-10-10 01:37:40 +00001106 serverSocket_,
1107 EV_READ | EV_PERSIST,
1108 TNonblockingServer::eventHandler,
1109 this);
Mark Slee79b16942007-11-26 19:05:29 +00001110 event_base_set(eventBase_, &serverEvent_);
Mark Slee2f6404d2006-10-10 01:37:40 +00001111
1112 // Add the event and start up the server
Mark Slee79b16942007-11-26 19:05:29 +00001113 if (-1 == event_add(&serverEvent_, 0)) {
1114 throw TException("TNonblockingServer::serve(): coult not event_add");
Mark Slee2f6404d2006-10-10 01:37:40 +00001115 }
David Reiss01fe1532010-03-09 05:19:25 +00001116 if (threadPoolProcessing_) {
1117 // Create an event to be notified when a task finishes
1118 event_set(&notificationEvent_,
1119 getNotificationRecvFD(),
1120 EV_READ | EV_PERSIST,
1121 TConnection::taskHandler,
1122 this);
David Reiss1c20c872010-03-09 05:20:14 +00001123
David Reiss01fe1532010-03-09 05:19:25 +00001124 // Attach to the base
1125 event_base_set(eventBase_, &notificationEvent_);
1126
1127 // Add the event and start up the server
1128 if (-1 == event_add(&notificationEvent_, 0)) {
1129 throw TException("TNonblockingServer::serve(): notification event_add fail");
1130 }
1131 }
1132}
1133
David Reiss068f4162010-03-09 05:19:45 +00001134void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
1135 threadManager_ = threadManager;
1136 if (threadManager != NULL) {
1137 threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
1138 threadPoolProcessing_ = true;
1139 } else {
1140 threadPoolProcessing_ = false;
1141 }
1142}
1143
David Reiss01fe1532010-03-09 05:19:25 +00001144bool TNonblockingServer::serverOverloaded() {
1145 size_t activeConnections = numTConnections_ - connectionStack_.size();
1146 if (numActiveProcessors_ > maxActiveProcessors_ ||
1147 activeConnections > maxConnections_) {
1148 if (!overloaded_) {
1149 GlobalOutput.printf("thrift non-blocking server overload condition");
1150 overloaded_ = true;
1151 }
1152 } else {
1153 if (overloaded_ &&
1154 (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
1155 (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1156 GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
1157 nConnectionsDropped_, nTotalConnectionsDropped_);
1158 nConnectionsDropped_ = 0;
1159 overloaded_ = false;
1160 }
1161 }
1162
1163 return overloaded_;
1164}
1165
1166bool TNonblockingServer::drainPendingTask() {
1167 if (threadManager_) {
1168 boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1169 if (task) {
1170 TConnection* connection =
1171 static_cast<TConnection::Task*>(task.get())->getTConnection();
1172 assert(connection && connection->getServer()
1173 && connection->getState() == APP_WAIT_TASK);
1174 connection->forceClose();
1175 return true;
1176 }
1177 }
1178 return false;
Mark Slee79b16942007-11-26 19:05:29 +00001179}
1180
David Reiss068f4162010-03-09 05:19:45 +00001181void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
1182 TConnection* connection =
1183 static_cast<TConnection::Task*>(task.get())->getTConnection();
1184 assert(connection && connection->getServer()
1185 && connection->getState() == APP_WAIT_TASK);
1186 connection->forceClose();
1187}
1188
Mark Slee79b16942007-11-26 19:05:29 +00001189/**
1190 * Main workhorse function, starts up the server listening on a port and
1191 * loops over the libevent handler.
1192 */
1193void TNonblockingServer::serve() {
1194 // Init socket
1195 listenSocket();
1196
David Reiss01fe1532010-03-09 05:19:25 +00001197 if (threadPoolProcessing_) {
1198 // Init task completion notification pipe
1199 createNotificationPipe();
1200 }
1201
Mark Slee79b16942007-11-26 19:05:29 +00001202 // Initialize libevent core
Bryan Duxbury37874ca2011-08-25 17:28:23 +00001203 registerEvents(static_cast<event_base*>(event_base_new()), true);
Mark Slee2f6404d2006-10-10 01:37:40 +00001204
Mark Sleeb4d3e7b2007-11-28 01:51:43 +00001205 // Run the preServe event
1206 if (eventHandler_ != NULL) {
1207 eventHandler_->preServe();
dweatherford58985992007-06-19 23:10:19 +00001208 }
1209
Bryan Duxbury76c43682011-08-24 21:26:48 +00001210 // Run libevent engine, invokes calls to eventHandler
1211 // Only returns if stop() is called.
Mark Slee79b16942007-11-26 19:05:29 +00001212 event_base_loop(eventBase_, 0);
Mark Slee2f6404d2006-10-10 01:37:40 +00001213}
1214
Bryan Duxbury76c43682011-08-24 21:26:48 +00001215void TNonblockingServer::stop() {
1216 if (!eventBase_) {
1217 return;
1218 }
1219
1220 // Call event_base_loopbreak() to tell libevent to exit the loop
1221 //
1222 // (The libevent documentation doesn't explicitly state that this function is
1223 // safe to call from another thread. However, all it does is set a variable,
1224 // in the event_base, so it should be fine.)
1225 event_base_loopbreak(eventBase_);
1226
1227 // event_base_loopbreak() only causes the loop to exit the next time it wakes
1228 // up. We need to force it to wake up, in case there are no real events
1229 // it needs to process.
1230 //
1231 // Attempt to connect to the server socket. If anything fails,
1232 // we'll just have to wait until libevent wakes up on its own.
1233 //
1234 // First create a socket
1235 int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
1236 if (fd < 0) {
1237 return;
1238 }
1239
1240 // Set up the address
1241 struct sockaddr_in addr;
1242 addr.sin_family = AF_INET;
1243 addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
1244 addr.sin_port = htons(port_);
1245
1246 // Finally do the connect().
1247 // We don't care about the return value;
1248 // we're just going to close the socket either way.
1249 connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
1250 close(fd);
1251}
1252
T Jake Lucianib5e62212009-01-31 22:36:20 +00001253}}} // apache::thrift::server