blob: b547649663c31a7da9413e0e8a2f0c3ad8de4ff9 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Mark Slee4af6ed72006-10-25 19:02:49 +000023#include <Thrift.h>
24#include <server/TServer.h>
David Reiss28f298d2008-05-01 06:17:36 +000025#include <transport/TBufferTransports.h>
Mark Sleee02385b2007-06-09 01:21:16 +000026#include <concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000027#include <climits>
Mark Slee2f6404d2006-10-10 01:37:40 +000028#include <stack>
David Reiss9b209552008-04-08 06:26:05 +000029#include <string>
30#include <errno.h>
David Reissd7a16f42008-02-19 22:47:29 +000031#include <cstdlib>
David Reiss5105b2e2009-05-21 02:28:27 +000032#include <unistd.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000033#include <event.h>
34
T Jake Lucianib5e62212009-01-31 22:36:20 +000035namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000036
T Jake Lucianib5e62212009-01-31 22:36:20 +000037using apache::thrift::transport::TMemoryBuffer;
38using apache::thrift::protocol::TProtocol;
39using apache::thrift::concurrency::Runnable;
40using apache::thrift::concurrency::ThreadManager;
Mark Slee2f6404d2006-10-10 01:37:40 +000041
42// Forward declaration of class
43class TConnection;
44
45/**
46 * This is a non-blocking server in C++ for high performance that operates a
47 * single IO thread. It assumes that all incoming requests are framed with a
48 * 4 byte length indicator and writes out responses using the same framing.
49 *
50 * It does not use the TServerTransport framework, but rather has socket
51 * operations hardcoded for use with select.
52 *
Mark Slee2f6404d2006-10-10 01:37:40 +000053 */
David Reiss01fe1532010-03-09 05:19:25 +000054
55
56/// Overload condition actions.
57enum TOverloadAction {
58 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
59 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
60 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
61};
62
Mark Slee2f6404d2006-10-10 01:37:40 +000063class TNonblockingServer : public TServer {
64 private:
David Reiss01fe1532010-03-09 05:19:25 +000065 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +000066 static const int LISTEN_BACKLOG = 1024;
67
David Reiss01fe1532010-03-09 05:19:25 +000068 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000069 static const size_t CONNECTION_STACK_LIMIT = 1024;
70
David Reiss01fe1532010-03-09 05:19:25 +000071 /// Maximum size of buffer allocated to idle connection
Kevin Clarkcbcd63a2009-03-19 03:50:05 +000072 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
73
David Reiss01fe1532010-03-09 05:19:25 +000074 /// Default limit on total number of connected sockets
75 static const int MAX_CONNECTIONS = INT_MAX;
76
77 /// Default limit on connections in handler/task processing
78 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
79
80 /// Server socket file descriptor
Mark Slee2f6404d2006-10-10 01:37:40 +000081 int serverSocket_;
82
David Reiss01fe1532010-03-09 05:19:25 +000083 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +000084 int port_;
85
David Reiss01fe1532010-03-09 05:19:25 +000086 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +000087 boost::shared_ptr<ThreadManager> threadManager_;
88
David Reiss01fe1532010-03-09 05:19:25 +000089 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +000090 bool threadPoolProcessing_;
91
David Reiss01fe1532010-03-09 05:19:25 +000092 /// The event base for libevent
Mark Slee79b16942007-11-26 19:05:29 +000093 event_base* eventBase_;
94
David Reiss01fe1532010-03-09 05:19:25 +000095 /// Event struct, used with eventBase_ for connection events
Mark Slee79b16942007-11-26 19:05:29 +000096 struct event serverEvent_;
97
David Reiss01fe1532010-03-09 05:19:25 +000098 /// Event struct, used with eventBase_ for task completion notification
99 struct event notificationEvent_;
100
101 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000102 size_t numTConnections_;
103
David Reiss01fe1532010-03-09 05:19:25 +0000104 /// Number of Connections processing or waiting to process
105 size_t numActiveProcessors_;
106
107 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000108 size_t connectionStackLimit_;
109
David Reiss01fe1532010-03-09 05:19:25 +0000110 /// Limit for number of connections processing or waiting to process
111 size_t maxActiveProcessors_;
112
113 /// Limit for number of open connections
114 size_t maxConnections_;
115
116 /**
117 * Hysteresis for overload state. This is the fraction of the overload
118 * value that needs to be reached before the overload state is cleared;
119 * must be <= 1.0.
120 */
121 double overloadHysteresis_;
122
123 /// Action to take when we're overloaded.
124 TOverloadAction overloadAction_;
125
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000126 /**
127 * Max read buffer size for an idle connection. When we place an idle
128 * TConnection into connectionStack_, we insure that its read buffer is
129 * reduced to this size to insure that idle connections don't hog memory.
130 */
David Reiss01fe1532010-03-09 05:19:25 +0000131 size_t idleBufferMemLimit_;
132
133 /// Set if we are currently in an overloaded state.
134 bool overloaded_;
135
136 /// Count of connections dropped since overload started
137 uint32_t nConnectionsDropped_;
138
139 /// Count of connections dropped on overload since server started
140 uint64_t nTotalConnectionsDropped_;
141
142 /// File descriptors for pipe used for task completion notification.
143 int notificationPipeFDs_[2];
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000144
Mark Slee2f6404d2006-10-10 01:37:40 +0000145 /**
146 * This is a stack of all the objects that have been created but that
147 * are NOT currently in use. When we close a connection, we place it on this
148 * stack so that the object can be reused later, rather than freeing the
149 * memory and reallocating a new object later.
150 */
151 std::stack<TConnection*> connectionStack_;
152
David Reiss01fe1532010-03-09 05:19:25 +0000153 /**
154 * Called when server socket had something happen. We accept all waiting
155 * client connections on listen socket fd and assign TConnection objects
156 * to handle those requests.
157 *
158 * @param fd the listen socket.
159 * @param which the event flag that triggered the handler.
160 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 void handleEvent(int fd, short which);
162
163 public:
Mark Slee5ea15f92007-03-05 22:55:59 +0000164 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Sleef9373392007-01-24 19:41:57 +0000165 int port) :
166 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000167 serverSocket_(-1),
Mark Sleef9373392007-01-24 19:41:57 +0000168 port_(port),
dweatherford58985992007-06-19 23:10:19 +0000169 threadPoolProcessing_(false),
David Reiss1997f102008-04-29 00:29:41 +0000170 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000171 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000172 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000173 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000174 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
175 maxConnections_(MAX_CONNECTIONS),
176 overloadHysteresis_(0.8),
177 overloadAction_(T_OVERLOAD_NO_ACTION),
178 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
179 overloaded_(false),
180 nConnectionsDropped_(0),
181 nTotalConnectionsDropped_(0) {}
Mark Sleef9373392007-01-24 19:41:57 +0000182
Mark Slee79b16942007-11-26 19:05:29 +0000183 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
Mark Slee5ea15f92007-03-05 22:55:59 +0000184 boost::shared_ptr<TProtocolFactory> protocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000185 int port,
186 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000187 TServer(processor),
Mark Slee79b16942007-11-26 19:05:29 +0000188 serverSocket_(-1),
Mark Slee92f00fb2006-10-25 01:28:17 +0000189 port_(port),
Mark Slee79b16942007-11-26 19:05:29 +0000190 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000191 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000192 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000193 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000194 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000195 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
196 maxConnections_(MAX_CONNECTIONS),
197 overloadHysteresis_(0.8),
198 overloadAction_(T_OVERLOAD_NO_ACTION),
199 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
200 overloaded_(false),
201 nConnectionsDropped_(0),
202 nTotalConnectionsDropped_(0) {
Mark Slee5ea15f92007-03-05 22:55:59 +0000203 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
204 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000205 setInputProtocolFactory(protocolFactory);
206 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000207 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000208 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000209
Mark Slee5ea15f92007-03-05 22:55:59 +0000210 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
211 boost::shared_ptr<TTransportFactory> inputTransportFactory,
212 boost::shared_ptr<TTransportFactory> outputTransportFactory,
213 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
214 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
Mark Sleee02385b2007-06-09 01:21:16 +0000215 int port,
216 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000217 TServer(processor),
David Reiss01fe1532010-03-09 05:19:25 +0000218 serverSocket_(-1),
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000219 port_(port),
Mark Slee5d1784a2007-12-05 23:20:54 +0000220 threadManager_(threadManager),
David Reiss1997f102008-04-29 00:29:41 +0000221 eventBase_(NULL),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000222 numTConnections_(0),
David Reiss01fe1532010-03-09 05:19:25 +0000223 numActiveProcessors_(0),
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000224 connectionStackLimit_(CONNECTION_STACK_LIMIT),
David Reiss01fe1532010-03-09 05:19:25 +0000225 maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
226 maxConnections_(MAX_CONNECTIONS),
227 overloadHysteresis_(0.8),
228 overloadAction_(T_OVERLOAD_NO_ACTION),
229 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
230 overloaded_(false),
231 nConnectionsDropped_(0),
232 nTotalConnectionsDropped_(0) {
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000233 setInputTransportFactory(inputTransportFactory);
234 setOutputTransportFactory(outputTransportFactory);
235 setInputProtocolFactory(inputProtocolFactory);
236 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000237 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000238 }
Mark Slee79b16942007-11-26 19:05:29 +0000239
Mark Slee2f6404d2006-10-10 01:37:40 +0000240 ~TNonblockingServer() {}
241
Mark Sleee02385b2007-06-09 01:21:16 +0000242 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
243 threadManager_ = threadManager;
244 threadPoolProcessing_ = (threadManager != NULL);
245 }
246
David Reiss1997f102008-04-29 00:29:41 +0000247 boost::shared_ptr<ThreadManager> getThreadManager() {
248 return threadManager_;
249 }
250
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000251 /**
252 * Get the maximum number of unused TConnection we will hold in reserve.
253 *
254 * @return the current limit on TConnection pool size.
255 */
David Reiss260fa932009-04-02 23:51:39 +0000256 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000257 return connectionStackLimit_;
258 }
259
260 /**
261 * Set the maximum number of unused TConnection we will hold in reserve.
262 *
263 * @param sz the new limit for TConnection pool size.
264 */
David Reiss260fa932009-04-02 23:51:39 +0000265 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000266 connectionStackLimit_ = sz;
267 }
268
Mark Slee79b16942007-11-26 19:05:29 +0000269 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000270 return threadPoolProcessing_;
271 }
272
273 void addTask(boost::shared_ptr<Runnable> task) {
274 threadManager_->add(task);
275 }
276
Mark Slee79b16942007-11-26 19:05:29 +0000277 event_base* getEventBase() const {
278 return eventBase_;
279 }
280
David Reiss01fe1532010-03-09 05:19:25 +0000281 /// Increment our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000282 void incrementNumConnections() {
283 ++numTConnections_;
284 }
285
David Reiss01fe1532010-03-09 05:19:25 +0000286 /// Decrement our count of the number of connected sockets.
David Reissc17fe6b2008-04-29 00:29:43 +0000287 void decrementNumConnections() {
288 --numTConnections_;
David Reiss1997f102008-04-29 00:29:41 +0000289 }
290
David Reiss01fe1532010-03-09 05:19:25 +0000291 /**
292 * Return the count of sockets currently connected to.
293 *
294 * @return count of connected sockets.
295 */
296 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000297 return numTConnections_;
298 }
299
David Reiss01fe1532010-03-09 05:19:25 +0000300 /**
301 * Return the count of connection objects allocated but not in use.
302 *
303 * @return count of idle connection objects.
304 */
305 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000306 return connectionStack_.size();
307 }
308
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000309 /**
David Reiss01fe1532010-03-09 05:19:25 +0000310 * Return count of number of connections which are currently processing.
311 * This is defined as a connection where all data has been received and
312 * either assigned a task (when threading) or passed to a handler (when
313 * not threading), and where the handler has not yet returned.
314 *
315 * @return # of connections currently processing.
316 */
317 size_t getNumActiveProcessors() const {
318 return numActiveProcessors_;
319 }
320
321 /// Increment the count of connections currently processing.
322 void incrementActiveProcessors() {
323 ++numActiveProcessors_;
324 }
325
326 /// Decrement the count of connections currently processing.
327 void decrementActiveProcessors() {
328 if (numActiveProcessors_ > 0) {
329 --numActiveProcessors_;
330 }
331 }
332
333 /**
334 * Get the maximum # of connections allowed before overload.
335 *
336 * @return current setting.
337 */
338 size_t getMaxConnections() const {
339 return maxConnections_;
340 }
341
342 /**
343 * Set the maximum # of connections allowed before overload.
344 *
345 * @param maxConnections new setting for maximum # of connections.
346 */
347 void setMaxConnections(size_t maxConnections) {
348 maxConnections_ = maxConnections;
349 }
350
351 /**
352 * Get the maximum # of connections waiting in handler/task before overload.
353 *
354 * @return current setting.
355 */
356 size_t getMaxActiveProcessors() const {
357 return maxActiveProcessors_;
358 }
359
360 /**
361 * Set the maximum # of connections waiting in handler/task before overload.
362 *
363 * @param maxActiveProcessors new setting for maximum # of active processes.
364 */
365 void setMaxActiveProcessors(size_t maxActiveProcessors) {
366 maxActiveProcessors_ = maxActiveProcessors;
367 }
368
369 /**
370 * Get fraction of maximum limits before an overload condition is cleared.
371 *
372 * @return hysteresis fraction
373 */
374 double getOverloadHysteresis() const {
375 return overloadHysteresis_;
376 }
377
378 /**
379 * Set fraction of maximum limits before an overload condition is cleared.
380 * A good value would probably be between 0.5 and 0.9.
381 *
382 * @param hysteresisFraction fraction <= 1.0.
383 */
384 void setOverloadHysteresis(double hysteresisFraction) {
385 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
386 overloadHysteresis_ = hysteresisFraction;
387 }
388 }
389
390 /**
391 * Get the action the server will take on overload.
392 *
393 * @return a TOverloadAction enum value for the currently set action.
394 */
395 TOverloadAction getOverloadAction() const {
396 return overloadAction_;
397 }
398
399 /**
400 * Set the action the server is to take on overload.
401 *
402 * @param overloadAction a TOverloadAction enum value for the action.
403 */
404 void setOverloadAction(TOverloadAction overloadAction) {
405 overloadAction_ = overloadAction;
406 }
407
408 /**
409 * Determine if the server is currently overloaded.
410 * This function checks the maximums for open connections and connections
411 * currently in processing, and sets an overload condition if they are
412 * exceeded. The overload will persist until both values are below the
413 * current hysteresis fraction of their maximums.
414 *
415 * @return true if an overload condition exists, false if not.
416 */
417 bool serverOverloaded();
418
419 /** Pop and discard next task on threadpool wait queue.
420 *
421 * @return true if a task was discarded, false if the wait queue was empty.
422 */
423 bool drainPendingTask();
424
425 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000426 * Get the maximum limit of memory allocated to idle TConnection objects.
427 *
428 * @return # bytes beyond which we will shrink buffers when idle.
429 */
430 size_t getIdleBufferMemLimit() const {
431 return idleBufferMemLimit_;
432 }
433
434 /**
435 * Set the maximum limit of memory allocated to idle TConnection objects.
436 * If a TConnection object goes idle with more than this much memory
437 * allocated to its buffer, we shrink it to this value.
438 *
439 * @param limit of bytes beyond which we will shrink buffers when idle.
440 */
441 void setIdleBufferMemLimit(size_t limit) {
442 idleBufferMemLimit_ = limit;
443 }
444
David Reiss01fe1532010-03-09 05:19:25 +0000445 /**
446 * Return an initialized connection object. Creates or recovers from
447 * pool a TConnection and initializes it with the provided socket FD
448 * and flags.
449 *
450 * @param socket FD of socket associated with this connection.
451 * @param flags initial lib_event flags for this connection.
452 * @return pointer to initialized TConnection object.
453 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000454 TConnection* createConnection(int socket, short flags);
455
David Reiss01fe1532010-03-09 05:19:25 +0000456 /**
457 * Returns a connection to pool or deletion. If the connection pool
458 * (a stack) isn't full, place the connection object on it, otherwise
459 * just delete it.
460 *
461 * @param connection the TConection being returned.
462 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000463 void returnConnection(TConnection* connection);
464
David Reiss01fe1532010-03-09 05:19:25 +0000465 /**
466 * C-callable event handler for listener events. Provides a callback
467 * that libevent can understand which invokes server->handleEvent().
468 *
469 * @param fd the descriptor the event occured on.
470 * @param which the flags associated with the event.
471 * @param v void* callback arg where we placed TNonblockingServer's "this".
472 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000473 static void eventHandler(int fd, short which, void* v) {
474 ((TNonblockingServer*)v)->handleEvent(fd, which);
475 }
476
David Reiss01fe1532010-03-09 05:19:25 +0000477 /// Creates a socket to listen on and binds it to the local port.
Mark Slee79b16942007-11-26 19:05:29 +0000478 void listenSocket();
479
David Reiss01fe1532010-03-09 05:19:25 +0000480 /**
481 * Takes a socket created by listenSocket() and sets various options on it
482 * to prepare for use in the server.
483 *
484 * @param fd descriptor of socket to be initialized/
485 */
Mark Slee79b16942007-11-26 19:05:29 +0000486 void listenSocket(int fd);
487
David Reiss01fe1532010-03-09 05:19:25 +0000488 /// Create the pipe used to notify I/O process of task completion.
489 void createNotificationPipe();
490
491 /**
492 * Get notification pipe send descriptor.
493 *
494 * @return write fd for pipe.
495 */
496 int getNotificationSendFD() const {
497 return notificationPipeFDs_[1];
498 }
499
500 /**
501 * Get notification pipe receive descriptor.
502 *
503 * @return read fd of pipe.
504 */
505 int getNotificationRecvFD() const {
506 return notificationPipeFDs_[0];
507 }
508
509 /**
510 * Register the core libevent events onto the proper base.
511 *
512 * @param base pointer to the event base to be initialized.
513 */
Mark Slee79b16942007-11-26 19:05:29 +0000514 void registerEvents(event_base* base);
515
David Reiss01fe1532010-03-09 05:19:25 +0000516 /**
517 * Main workhorse function, starts up the server listening on a port and
518 * loops over the libevent handler.
519 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000520 void serve();
521};
522
David Reiss01fe1532010-03-09 05:19:25 +0000523/// Two states for sockets, recv and send mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000524enum TSocketState {
525 SOCKET_RECV,
526 SOCKET_SEND
527};
528
529/**
David Reiss01fe1532010-03-09 05:19:25 +0000530 * Five states for the nonblocking servr:
Mark Slee2f6404d2006-10-10 01:37:40 +0000531 * 1) initialize
532 * 2) read 4 byte frame size
533 * 3) read frame of data
534 * 4) send back data (if any)
David Reiss01fe1532010-03-09 05:19:25 +0000535 * 5) force immediate connection close
Mark Slee2f6404d2006-10-10 01:37:40 +0000536 */
537enum TAppState {
538 APP_INIT,
539 APP_READ_FRAME_SIZE,
540 APP_READ_REQUEST,
Mark Sleee02385b2007-06-09 01:21:16 +0000541 APP_WAIT_TASK,
David Reiss01fe1532010-03-09 05:19:25 +0000542 APP_SEND_RESULT,
543 APP_CLOSE_CONNECTION
Mark Slee2f6404d2006-10-10 01:37:40 +0000544};
545
546/**
547 * Represents a connection that is handled via libevent. This connection
548 * essentially encapsulates a socket that has some associated libevent state.
549 */
550class TConnection {
551 private:
552
David Reiss01fe1532010-03-09 05:19:25 +0000553 /// Starting size for new connection buffer
554 static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
Mark Sleee02385b2007-06-09 01:21:16 +0000555
David Reiss01fe1532010-03-09 05:19:25 +0000556 /// Server handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000557 TNonblockingServer* server_;
558
David Reiss01fe1532010-03-09 05:19:25 +0000559 /// Socket handle
Mark Slee2f6404d2006-10-10 01:37:40 +0000560 int socket_;
561
David Reiss01fe1532010-03-09 05:19:25 +0000562 /// Libevent object
Mark Slee2f6404d2006-10-10 01:37:40 +0000563 struct event event_;
564
David Reiss01fe1532010-03-09 05:19:25 +0000565 /// Libevent flags
Mark Slee2f6404d2006-10-10 01:37:40 +0000566 short eventFlags_;
567
David Reiss01fe1532010-03-09 05:19:25 +0000568 /// Socket mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000569 TSocketState socketState_;
570
David Reiss01fe1532010-03-09 05:19:25 +0000571 /// Application state
Mark Slee2f6404d2006-10-10 01:37:40 +0000572 TAppState appState_;
573
David Reiss01fe1532010-03-09 05:19:25 +0000574 /// How much data needed to read
Mark Slee2f6404d2006-10-10 01:37:40 +0000575 uint32_t readWant_;
576
David Reiss01fe1532010-03-09 05:19:25 +0000577 /// Where in the read buffer are we
Mark Slee2f6404d2006-10-10 01:37:40 +0000578 uint32_t readBufferPos_;
579
David Reiss01fe1532010-03-09 05:19:25 +0000580 /// Read buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000581 uint8_t* readBuffer_;
582
David Reiss01fe1532010-03-09 05:19:25 +0000583 /// Read buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000584 uint32_t readBufferSize_;
585
David Reiss01fe1532010-03-09 05:19:25 +0000586 /// Write buffer
Mark Slee2f6404d2006-10-10 01:37:40 +0000587 uint8_t* writeBuffer_;
Mark Slee79b16942007-11-26 19:05:29 +0000588
David Reiss01fe1532010-03-09 05:19:25 +0000589 /// Write buffer size
Mark Slee2f6404d2006-10-10 01:37:40 +0000590 uint32_t writeBufferSize_;
591
David Reiss01fe1532010-03-09 05:19:25 +0000592 /// How far through writing are we?
Mark Slee2f6404d2006-10-10 01:37:40 +0000593 uint32_t writeBufferPos_;
594
David Reiss01fe1532010-03-09 05:19:25 +0000595 /// How many times have we read since our last buffer reset?
Kevin Clark5ace1782009-03-04 21:10:58 +0000596 uint32_t numReadsSinceReset_;
597
David Reiss01fe1532010-03-09 05:19:25 +0000598 /// How many times have we written since our last buffer reset?
Kevin Clark5ace1782009-03-04 21:10:58 +0000599 uint32_t numWritesSinceReset_;
600
David Reiss01fe1532010-03-09 05:19:25 +0000601 /// Task handle
Mark Sleee02385b2007-06-09 01:21:16 +0000602 int taskHandle_;
603
David Reiss01fe1532010-03-09 05:19:25 +0000604 /// Task event
Mark Sleee02385b2007-06-09 01:21:16 +0000605 struct event taskEvent_;
606
David Reiss01fe1532010-03-09 05:19:25 +0000607 /// Transport to read from
Mark Slee5ea15f92007-03-05 22:55:59 +0000608 boost::shared_ptr<TMemoryBuffer> inputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000609
David Reiss01fe1532010-03-09 05:19:25 +0000610 /// Transport that processor writes to
Mark Slee5ea15f92007-03-05 22:55:59 +0000611 boost::shared_ptr<TMemoryBuffer> outputTransport_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000612
David Reiss01fe1532010-03-09 05:19:25 +0000613 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
Mark Slee5ea15f92007-03-05 22:55:59 +0000614 boost::shared_ptr<TTransport> factoryInputTransport_;
615 boost::shared_ptr<TTransport> factoryOutputTransport_;
Mark Slee4af6ed72006-10-25 19:02:49 +0000616
David Reiss01fe1532010-03-09 05:19:25 +0000617 /// Protocol decoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000618 boost::shared_ptr<TProtocol> inputProtocol_;
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000619
David Reiss01fe1532010-03-09 05:19:25 +0000620 /// Protocol encoder
Mark Slee5ea15f92007-03-05 22:55:59 +0000621 boost::shared_ptr<TProtocol> outputProtocol_;
Mark Slee79b16942007-11-26 19:05:29 +0000622
David Reiss01fe1532010-03-09 05:19:25 +0000623 /// Go into read mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000624 void setRead() {
625 setFlags(EV_READ | EV_PERSIST);
626 }
627
David Reiss01fe1532010-03-09 05:19:25 +0000628 /// Go into write mode
Mark Slee2f6404d2006-10-10 01:37:40 +0000629 void setWrite() {
630 setFlags(EV_WRITE | EV_PERSIST);
631 }
632
David Reiss01fe1532010-03-09 05:19:25 +0000633 /// Set socket idle
Mark Slee402ee282007-08-23 01:43:20 +0000634 void setIdle() {
635 setFlags(0);
636 }
637
David Reiss01fe1532010-03-09 05:19:25 +0000638 /**
639 * Set event flags for this connection.
640 *
641 * @param eventFlags flags we pass to libevent for the connection.
642 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000643 void setFlags(short eventFlags);
644
David Reiss01fe1532010-03-09 05:19:25 +0000645 /**
646 * Libevent handler called (via our static wrapper) when the connection
647 * socket had something happen. Rather than use the flags libevent passed,
648 * we use the connection state to determine whether we need to read or
649 * write the socket.
650 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000651 void workSocket();
652
David Reiss01fe1532010-03-09 05:19:25 +0000653 /// Close this connection and free or reset its resources.
Mark Slee2f6404d2006-10-10 01:37:40 +0000654 void close();
655
656 public:
657
David Reiss01fe1532010-03-09 05:19:25 +0000658 class Task;
659
660 /// Constructor
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000661 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
David Reiss01fe1532010-03-09 05:19:25 +0000662 readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
Mark Slee2f6404d2006-10-10 01:37:40 +0000663 if (readBuffer_ == NULL) {
T Jake Lucianib5e62212009-01-31 22:36:20 +0000664 throw new apache::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000665 }
David Reiss01fe1532010-03-09 05:19:25 +0000666 readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
Mark Slee79b16942007-11-26 19:05:29 +0000667
Kevin Clark5ace1782009-03-04 21:10:58 +0000668 numReadsSinceReset_ = 0;
669 numWritesSinceReset_ = 0;
670
Mark Slee2f6404d2006-10-10 01:37:40 +0000671 // Allocate input and output tranpsorts
Mark Slee79b16942007-11-26 19:05:29 +0000672 // these only need to be allocated once per TConnection (they don't need to be
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000673 // reallocated on init() call)
Mark Slee5ea15f92007-03-05 22:55:59 +0000674 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
675 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Mark Slee79b16942007-11-26 19:05:29 +0000676
Mark Slee2f6404d2006-10-10 01:37:40 +0000677 init(socket, eventFlags, s);
David Reiss1997f102008-04-29 00:29:41 +0000678 server_->incrementNumConnections();
679 }
680
681 ~TConnection() {
David Reissc17fe6b2008-04-29 00:29:43 +0000682 server_->decrementNumConnections();
Mark Slee2f6404d2006-10-10 01:37:40 +0000683 }
684
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000685 /**
686 * Check read buffer against a given limit and shrink it if exceeded.
687 *
688 * @param limit we limit buffer size to.
689 */
David Reiss01fe1532010-03-09 05:19:25 +0000690 void checkIdleBufferMemLimit(size_t limit);
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000691
David Reiss01fe1532010-03-09 05:19:25 +0000692 /// Initialize
Mark Slee2f6404d2006-10-10 01:37:40 +0000693 void init(int socket, short eventFlags, TNonblockingServer *s);
694
David Reiss01fe1532010-03-09 05:19:25 +0000695 /**
696 * This is called when the application transitions from one state into
697 * another. This means that it has finished writing the data that it needed
698 * to, or finished receiving the data that it needed to.
699 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000700 void transition();
701
David Reiss01fe1532010-03-09 05:19:25 +0000702 /**
703 * C-callable event handler for connection events. Provides a callback
704 * that libevent can understand which invokes connection_->workSocket().
705 *
706 * @param fd the descriptor the event occured on.
707 * @param which the flags associated with the event.
708 * @param v void* callback arg where we placed TConnection's "this".
709 */
Mark Sleea8de4892008-02-09 00:02:26 +0000710 static void eventHandler(int fd, short /* which */, void* v) {
Mark Sleee02385b2007-06-09 01:21:16 +0000711 assert(fd == ((TConnection*)v)->socket_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000712 ((TConnection*)v)->workSocket();
713 }
Mark Slee79b16942007-11-26 19:05:29 +0000714
David Reiss01fe1532010-03-09 05:19:25 +0000715 /**
716 * C-callable event handler for signaling task completion. Provides a
717 * callback that libevent can understand that will read a connection
718 * object's address from a pipe and call connection->transition() for
719 * that object.
720 *
721 * @param fd the descriptor the event occured on.
722 */
723 static void taskHandler(int fd, short /* which */, void* /* v */) {
724 TConnection* connection;
David Reiss83b8fda2010-03-09 05:19:34 +0000725 ssize_t nBytes;
726 while ((nBytes = read(fd, (void*)&connection, sizeof(TConnection*)))
727 == sizeof(TConnection*)) {
728 connection->transition();
Mark Sleee02385b2007-06-09 01:21:16 +0000729 }
David Reiss83b8fda2010-03-09 05:19:34 +0000730 if (nBytes > 0) {
731 throw TException("TConnection::taskHandler unexpected partial read");
732 }
733 if (errno != EWOULDBLOCK && errno != EAGAIN) {
734 GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
735 }
Mark Sleee02385b2007-06-09 01:21:16 +0000736 }
737
David Reiss01fe1532010-03-09 05:19:25 +0000738 /**
739 * Notification to server that processing has ended on this request.
740 * Can be called either when processing is completed or when a waiting
741 * task has been preemptively terminated (on overload).
742 *
743 * @return true if successful, false if unable to notify (check errno).
744 */
745 bool notifyServer() {
746 TConnection* connection = this;
747 if (write(server_->getNotificationSendFD(), (const void*)&connection,
748 sizeof(TConnection*)) != sizeof(TConnection*)) {
749 return false;
750 }
751
752 return true;
753 }
754
755 /// Force connection shutdown for this connection.
756 void forceClose() {
757 appState_ = APP_CLOSE_CONNECTION;
758 if (!notifyServer()) {
759 throw TException("TConnection::forceClose: failed write on notify pipe");
760 }
761 }
762
763 /// return the server this connection was initialized for.
764 TNonblockingServer* getServer() {
765 return server_;
766 }
767
768 /// get state of connection.
769 TAppState getState() {
770 return appState_;
771 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000772};
773
T Jake Lucianib5e62212009-01-31 22:36:20 +0000774}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000775
776#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_