blob: 45f635cbe91a52fbe4d71b2619296b8e3c2d0d71 [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include "TNonblockingServer.h"
21#include <concurrency/Exception.h>
22
23#include <iostream>
24#include <sys/socket.h>
25#include <netinet/in.h>
26#include <netinet/tcp.h>
27#include <netdb.h>
28#include <fcntl.h>
29#include <errno.h>
30#include <assert.h>
31
32namespace apache { namespace thrift { namespace server {
33
34using namespace apache::thrift::protocol;
35using namespace apache::thrift::transport;
36using namespace apache::thrift::concurrency;
37using namespace std;
38
39class TConnection::Task: public Runnable {
40 public:
41 Task(boost::shared_ptr<TProcessor> processor,
42 boost::shared_ptr<TProtocol> input,
43 boost::shared_ptr<TProtocol> output,
44 int taskHandle) :
45 processor_(processor),
46 input_(input),
47 output_(output),
48 taskHandle_(taskHandle) {}
49
50 void run() {
51 try {
52 while (processor_->process(input_, output_)) {
53 if (!input_->getTransport()->peek()) {
54 break;
55 }
56 }
57 } catch (TTransportException& ttx) {
58 cerr << "TNonblockingServer client died: " << ttx.what() << endl;
59 } catch (TException& x) {
60 cerr << "TNonblockingServer exception: " << x.what() << endl;
61 } catch (...) {
62 cerr << "TNonblockingServer uncaught exception." << endl;
63 }
64
65 // Signal completion back to the libevent thread via a socketpair
66 int8_t b = 0;
67 if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
68 GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
69 }
70 if (-1 == ::close(taskHandle_)) {
71 GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
72 }
73 }
74
75 private:
76 boost::shared_ptr<TProcessor> processor_;
77 boost::shared_ptr<TProtocol> input_;
78 boost::shared_ptr<TProtocol> output_;
79 int taskHandle_;
80};
81
82void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
83 socket_ = socket;
84 server_ = s;
85 appState_ = APP_INIT;
86 eventFlags_ = 0;
87
88 readBufferPos_ = 0;
89 readWant_ = 0;
90
91 writeBuffer_ = NULL;
92 writeBufferSize_ = 0;
93 writeBufferPos_ = 0;
94
95 socketState_ = SOCKET_RECV;
96 appState_ = APP_INIT;
97
98 taskHandle_ = -1;
99
100 // Set flags, which also registers the event
101 setFlags(eventFlags);
102
103 // get input/transports
104 factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
105 factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
106
107 // Create protocol
108 inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
109 outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
110}
111
112void TConnection::workSocket() {
113 int flags=0, got=0, left=0, sent=0;
114 uint32_t fetch = 0;
115
116 switch (socketState_) {
117 case SOCKET_RECV:
118 // It is an error to be in this state if we already have all the data
119 assert(readBufferPos_ < readWant_);
120
121 // Double the buffer size until it is big enough
122 if (readWant_ > readBufferSize_) {
123 while (readWant_ > readBufferSize_) {
124 readBufferSize_ *= 2;
125 }
126 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
127 if (readBuffer_ == NULL) {
128 GlobalOutput("TConnection::workSocket() realloc");
129 close();
130 return;
131 }
132 }
133
134 // Read from the socket
135 fetch = readWant_ - readBufferPos_;
136 got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
137
138 if (got > 0) {
139 // Move along in the buffer
140 readBufferPos_ += got;
141
142 // Check that we did not overdo it
143 assert(readBufferPos_ <= readWant_);
144
145 // We are done reading, move onto the next state
146 if (readBufferPos_ == readWant_) {
147 transition();
148 }
149 return;
150 } else if (got == -1) {
151 // Blocking errors are okay, just move on
152 if (errno == EAGAIN || errno == EWOULDBLOCK) {
153 return;
154 }
155
156 if (errno != ECONNRESET) {
157 GlobalOutput.perror("TConnection::workSocket() recv -1 ", errno);
158 }
159 }
160
161 // Whenever we get down here it means a remote disconnect
162 close();
163
164 return;
165
166 case SOCKET_SEND:
167 // Should never have position past size
168 assert(writeBufferPos_ <= writeBufferSize_);
169
170 // If there is no data to send, then let us move on
171 if (writeBufferPos_ == writeBufferSize_) {
172 GlobalOutput("WARNING: Send state with no data to send\n");
173 transition();
174 return;
175 }
176
177 flags = 0;
178 #ifdef MSG_NOSIGNAL
179 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
180 // check for the EPIPE return condition and close the socket in that case
181 flags |= MSG_NOSIGNAL;
182 #endif // ifdef MSG_NOSIGNAL
183
184 left = writeBufferSize_ - writeBufferPos_;
185 sent = send(socket_, writeBuffer_ + writeBufferPos_, left, flags);
186
187 if (sent <= 0) {
188 // Blocking errors are okay, just move on
189 if (errno == EAGAIN || errno == EWOULDBLOCK) {
190 return;
191 }
192 if (errno != EPIPE) {
193 GlobalOutput.perror("TConnection::workSocket() send -1 ", errno);
194 }
195 close();
196 return;
197 }
198
199 writeBufferPos_ += sent;
200
201 // Did we overdo it?
202 assert(writeBufferPos_ <= writeBufferSize_);
203
204 // We are done!
205 if (writeBufferPos_ == writeBufferSize_) {
206 transition();
207 }
208
209 return;
210
211 default:
212 GlobalOutput.printf("Shit Got Ill. Socket State %d", socketState_);
213 assert(0);
214 }
215}
216
217/**
218 * This is called when the application transitions from one state into
219 * another. This means that it has finished writing the data that it needed
220 * to, or finished receiving the data that it needed to.
221 */
222void TConnection::transition() {
223
224 int sz = 0;
225
226 // Switch upon the state that we are currently in and move to a new state
227 switch (appState_) {
228
229 case APP_READ_REQUEST:
230 // We are done reading the request, package the read buffer into transport
231 // and get back some data from the dispatch function
232 // If we've used these transport buffers enough times, reset them to avoid bloating
233
234 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
235 ++numReadsSinceReset_;
236 if (numWritesSinceReset_ < 512) {
237 outputTransport_->resetBuffer();
238 } else {
239 // reset the capacity of the output transport if we used it enough times that it might be bloated
240 try {
241 outputTransport_->resetBuffer(true);
242 numWritesSinceReset_ = 0;
243 } catch (TTransportException &ttx) {
244 GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
245 close();
246 return;
247 }
248 }
249
250 // Prepend four bytes of blank space to the buffer so we can
251 // write the frame size there later.
252 outputTransport_->getWritePtr(4);
253 outputTransport_->wroteBytes(4);
254
255 if (server_->isThreadPoolProcessing()) {
256 // We are setting up a Task to do this work and we will wait on it
257 int sv[2];
258 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
259 GlobalOutput.perror("TConnection::socketpair() failed ", errno);
260 // Now we will fall through to the APP_WAIT_TASK block with no response
261 } else {
262 // Create task and dispatch to the thread manager
263 boost::shared_ptr<Runnable> task =
264 boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
265 inputProtocol_,
266 outputProtocol_,
267 sv[1]));
268 // The application is now waiting on the task to finish
269 appState_ = APP_WAIT_TASK;
270
271 // Create an event to be notified when the task finishes
272 event_set(&taskEvent_,
273 taskHandle_ = sv[0],
274 EV_READ,
275 TConnection::taskHandler,
276 this);
277
278 // Attach to the base
279 event_base_set(server_->getEventBase(), &taskEvent_);
280
281 // Add the event and start up the server
282 if (-1 == event_add(&taskEvent_, 0)) {
283 GlobalOutput("TNonblockingServer::serve(): coult not event_add");
284 return;
285 }
286 try {
287 server_->addTask(task);
288 } catch (IllegalStateException & ise) {
289 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
290 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
291 close();
292 }
293
294 // Set this connection idle so that libevent doesn't process more
295 // data on it while we're still waiting for the threadmanager to
296 // finish this task
297 setIdle();
298 return;
299 }
300 } else {
301 try {
302 // Invoke the processor
303 server_->getProcessor()->process(inputProtocol_, outputProtocol_);
304 } catch (TTransportException &ttx) {
305 GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
306 close();
307 return;
308 } catch (TException &x) {
309 GlobalOutput.printf("TException: Server::process() %s", x.what());
310 close();
311 return;
312 } catch (...) {
313 GlobalOutput.printf("Server::process() unknown exception");
314 close();
315 return;
316 }
317 }
318
319 // Intentionally fall through here, the call to process has written into
320 // the writeBuffer_
321
322 case APP_WAIT_TASK:
323 // We have now finished processing a task and the result has been written
324 // into the outputTransport_, so we grab its contents and place them into
325 // the writeBuffer_ for actual writing by the libevent thread
326
327 // Get the result of the operation
328 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
329
330 // If the function call generated return data, then move into the send
331 // state and get going
332 // 4 bytes were reserved for frame size
333 if (writeBufferSize_ > 4) {
334
335 // Move into write state
336 writeBufferPos_ = 0;
337 socketState_ = SOCKET_SEND;
338
339 // Put the frame size into the write buffer
340 int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
341 memcpy(writeBuffer_, &frameSize, 4);
342
343 // Socket into write mode
344 appState_ = APP_SEND_RESULT;
345 setWrite();
346
347 // Try to work the socket immediately
348 // workSocket();
349
350 return;
351 }
352
353 // In this case, the request was oneway and we should fall through
354 // right back into the read frame header state
355 goto LABEL_APP_INIT;
356
357 case APP_SEND_RESULT:
358
359 ++numWritesSinceReset_;
360
361 // N.B.: We also intentionally fall through here into the INIT state!
362
363 LABEL_APP_INIT:
364 case APP_INIT:
365
366 // reset the input buffer if we used it enough times that it might be bloated
367 if (numReadsSinceReset_ > 512)
368 {
369 void * new_buffer = std::realloc(readBuffer_, 1024);
370 if (new_buffer == NULL) {
371 GlobalOutput("TConnection::transition() realloc");
372 close();
373 return;
374 }
375 readBuffer_ = (uint8_t*) new_buffer;
376 readBufferSize_ = 1024;
377 numReadsSinceReset_ = 0;
378 }
379
380 // Clear write buffer variables
381 writeBuffer_ = NULL;
382 writeBufferPos_ = 0;
383 writeBufferSize_ = 0;
384
385 // Set up read buffer for getting 4 bytes
386 readBufferPos_ = 0;
387 readWant_ = 4;
388
389 // Into read4 state we go
390 socketState_ = SOCKET_RECV;
391 appState_ = APP_READ_FRAME_SIZE;
392
393 // Register read event
394 setRead();
395
396 // Try to work the socket right away
397 // workSocket();
398
399 return;
400
401 case APP_READ_FRAME_SIZE:
402 // We just read the request length, deserialize it
403 sz = *(int32_t*)readBuffer_;
404 sz = (int32_t)ntohl(sz);
405
406 if (sz <= 0) {
407 GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
408 close();
409 return;
410 }
411
412 // Reset the read buffer
413 readWant_ = (uint32_t)sz;
414 readBufferPos_= 0;
415
416 // Move into read request state
417 appState_ = APP_READ_REQUEST;
418
419 // Work the socket right away
420 // workSocket();
421
422 return;
423
424 default:
425 GlobalOutput.printf("Totally Fucked. Application State %d", appState_);
426 assert(0);
427 }
428}
429
430void TConnection::setFlags(short eventFlags) {
431 // Catch the do nothing case
432 if (eventFlags_ == eventFlags) {
433 return;
434 }
435
436 // Delete a previously existing event
437 if (eventFlags_ != 0) {
438 if (event_del(&event_) == -1) {
439 GlobalOutput("TConnection::setFlags event_del");
440 return;
441 }
442 }
443
444 // Update in memory structure
445 eventFlags_ = eventFlags;
446
447 // Do not call event_set if there are no flags
448 if (!eventFlags_) {
449 return;
450 }
451
452 /**
453 * event_set:
454 *
455 * Prepares the event structure &event to be used in future calls to
456 * event_add() and event_del(). The event will be prepared to call the
457 * eventHandler using the 'sock' file descriptor to monitor events.
458 *
459 * The events can be either EV_READ, EV_WRITE, or both, indicating
460 * that an application can read or write from the file respectively without
461 * blocking.
462 *
463 * The eventHandler will be called with the file descriptor that triggered
464 * the event and the type of event which will be one of: EV_TIMEOUT,
465 * EV_SIGNAL, EV_READ, EV_WRITE.
466 *
467 * The additional flag EV_PERSIST makes an event_add() persistent until
468 * event_del() has been called.
469 *
470 * Once initialized, the &event struct can be used repeatedly with
471 * event_add() and event_del() and does not need to be reinitialized unless
472 * the eventHandler and/or the argument to it are to be changed. However,
473 * when an ev structure has been added to libevent using event_add() the
474 * structure must persist until the event occurs (assuming EV_PERSIST
475 * is not set) or is removed using event_del(). You may not reuse the same
476 * ev structure for multiple monitored descriptors; each descriptor needs
477 * its own ev.
478 */
479 event_set(&event_, socket_, eventFlags_, TConnection::eventHandler, this);
480 event_base_set(server_->getEventBase(), &event_);
481
482 // Add the event
483 if (event_add(&event_, 0) == -1) {
484 GlobalOutput("TConnection::setFlags(): could not event_add");
485 }
486}
487
488/**
489 * Closes a connection
490 */
491void TConnection::close() {
492 // Delete the registered libevent
493 if (event_del(&event_) == -1) {
494 GlobalOutput("TConnection::close() event_del");
495 }
496
497 // Close the socket
498 if (socket_ > 0) {
499 ::close(socket_);
500 }
501 socket_ = 0;
502
503 // close any factory produced transports
504 factoryInputTransport_->close();
505 factoryOutputTransport_->close();
506
507 // Give this object back to the server that owns it
508 server_->returnConnection(this);
509}
510
511void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
512 if (readBufferSize_ > limit) {
513 readBufferSize_ = limit;
514 readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
515 if (readBuffer_ == NULL) {
516 GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
517 close();
518 }
519 }
520}
521
522/**
523 * Creates a new connection either by reusing an object off the stack or
524 * by allocating a new one entirely
525 */
526TConnection* TNonblockingServer::createConnection(int socket, short flags) {
527 // Check the stack
528 if (connectionStack_.empty()) {
529 return new TConnection(socket, flags, this);
530 } else {
531 TConnection* result = connectionStack_.top();
532 connectionStack_.pop();
533 result->init(socket, flags, this);
534 return result;
535 }
536}
537
538/**
539 * Returns a connection to the stack
540 */
541void TNonblockingServer::returnConnection(TConnection* connection) {
542 if (connectionStackLimit_ &&
543 (connectionStack_.size() >= connectionStackLimit_)) {
544 delete connection;
545 } else {
546 connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
547 connectionStack_.push(connection);
548 }
549}
550
551/**
552 * Server socket had something happen. We accept all waiting client
553 * connections on fd and assign TConnection objects to handle those requests.
554 */
555void TNonblockingServer::handleEvent(int fd, short which) {
556 // Make sure that libevent didn't fuck up the socket handles
557 assert(fd == serverSocket_);
558
559 // Server socket accepted a new connection
560 socklen_t addrLen;
561 struct sockaddr addr;
562 addrLen = sizeof(addr);
563
564 // Going to accept a new client socket
565 int clientSocket;
566
567 // Accept as many new clients as possible, even though libevent signaled only
568 // one, this helps us to avoid having to go back into the libevent engine so
569 // many times
570 while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
571
572 // Explicitly set this socket to NONBLOCK mode
573 int flags;
574 if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
575 fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
576 GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
577 close(clientSocket);
578 return;
579 }
580
581 // Create a new TConnection for this client socket.
582 TConnection* clientConnection =
583 createConnection(clientSocket, EV_READ | EV_PERSIST);
584
585 // Fail fast if we could not create a TConnection object
586 if (clientConnection == NULL) {
587 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
588 close(clientSocket);
589 return;
590 }
591
592 // Put this client connection into the proper state
593 clientConnection->transition();
594 }
595
596 // Done looping accept, now we have to make sure the error is due to
597 // blocking. Any other error is a problem
598 if (errno != EAGAIN && errno != EWOULDBLOCK) {
599 GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
600 }
601}
602
603/**
604 * Creates a socket to listen on and binds it to the local port.
605 */
606void TNonblockingServer::listenSocket() {
607 int s;
608 struct addrinfo hints, *res, *res0;
609 int error;
610
611 char port[sizeof("65536") + 1];
612 memset(&hints, 0, sizeof(hints));
613 hints.ai_family = PF_UNSPEC;
614 hints.ai_socktype = SOCK_STREAM;
615 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
616 sprintf(port, "%d", port_);
617
618 // Wildcard address
619 error = getaddrinfo(NULL, port, &hints, &res0);
620 if (error) {
621 string errStr = "TNonblockingServer::serve() getaddrinfo " + string(gai_strerror(error));
622 GlobalOutput(errStr.c_str());
623 return;
624 }
625
626 // Pick the ipv6 address first since ipv4 addresses can be mapped
627 // into ipv6 space.
628 for (res = res0; res; res = res->ai_next) {
629 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
630 break;
631 }
632
633 // Create the server socket
634 s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
635 if (s == -1) {
636 freeaddrinfo(res0);
637 throw TException("TNonblockingServer::serve() socket() -1");
638 }
639
640 #ifdef IPV6_V6ONLY
641 int zero = 0;
642 if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero))) {
643 GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
644 }
645 #endif // #ifdef IPV6_V6ONLY
646
647
648 int one = 1;
649
650 // Set reuseaddr to avoid 2MSL delay on server restart
651 setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
652
653 if (bind(s, res->ai_addr, res->ai_addrlen) == -1) {
654 close(s);
655 freeaddrinfo(res0);
656 throw TException("TNonblockingServer::serve() bind");
657 }
658
659 // Done with the addr info
660 freeaddrinfo(res0);
661
662 // Set up this file descriptor for listening
663 listenSocket(s);
664}
665
666/**
667 * Takes a socket created by listenSocket() and sets various options on it
668 * to prepare for use in the server.
669 */
670void TNonblockingServer::listenSocket(int s) {
671 // Set socket to nonblocking mode
672 int flags;
673 if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
674 fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
675 close(s);
676 throw TException("TNonblockingServer::serve() O_NONBLOCK");
677 }
678
679 int one = 1;
680 struct linger ling = {0, 0};
681
682 // Keepalive to ensure full result flushing
683 setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
684
685 // Turn linger off to avoid hung sockets
686 setsockopt(s, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
687
688 // Set TCP nodelay if available, MAC OS X Hack
689 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
690 #ifndef TCP_NOPUSH
691 setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one));
692 #endif
693
694 if (listen(s, LISTEN_BACKLOG) == -1) {
695 close(s);
696 throw TException("TNonblockingServer::serve() listen");
697 }
698
699 // Cool, this socket is good to go, set it as the serverSocket_
700 serverSocket_ = s;
701}
702
703/**
704 * Register the core libevent events onto the proper base.
705 */
706void TNonblockingServer::registerEvents(event_base* base) {
707 assert(serverSocket_ != -1);
708 assert(!eventBase_);
709 eventBase_ = base;
710
711 // Print some libevent stats
712 GlobalOutput.printf("libevent %s method %s",
713 event_get_version(),
714 event_get_method());
715
716 // Register the server event
717 event_set(&serverEvent_,
718 serverSocket_,
719 EV_READ | EV_PERSIST,
720 TNonblockingServer::eventHandler,
721 this);
722 event_base_set(eventBase_, &serverEvent_);
723
724 // Add the event and start up the server
725 if (-1 == event_add(&serverEvent_, 0)) {
726 throw TException("TNonblockingServer::serve(): coult not event_add");
727 }
728}
729
730/**
731 * Main workhorse function, starts up the server listening on a port and
732 * loops over the libevent handler.
733 */
734void TNonblockingServer::serve() {
735 // Init socket
736 listenSocket();
737
738 // Initialize libevent core
739 registerEvents(static_cast<event_base*>(event_init()));
740
741 // Run the preServe event
742 if (eventHandler_ != NULL) {
743 eventHandler_->preServe();
744 }
745
746 // Run libevent engine, never returns, invokes calls to eventHandler
747 event_base_loop(eventBase_, 0);
748}
749
750}}} // apache::thrift::server