blob: 585aa79bc2515047e4d54d9a78145f608b9b68e5 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/Thrift.h>
24#include <thrift/server/TServer.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040025#include <thrift/transport/PlatformSocket.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000026#include <thrift/transport/TBufferTransports.h>
27#include <thrift/transport/TSocket.h>
28#include <thrift/concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000029#include <climits>
Roger Meier49ff8b12012-04-13 09:12:31 +000030#include <thrift/concurrency/Thread.h>
31#include <thrift/concurrency/PlatformThreadFactory.h>
32#include <thrift/concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000033#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000034#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000035#include <string>
David Reissd7a16f42008-02-19 22:47:29 +000036#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000037#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000038#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000039#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <event.h>
41
Jake Farrellb0d95602011-12-06 01:17:26 +000042
43
T Jake Lucianib5e62212009-01-31 22:36:20 +000044namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000045
T Jake Lucianib5e62212009-01-31 22:36:20 +000046using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000047using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000048using apache::thrift::protocol::TProtocol;
49using apache::thrift::concurrency::Runnable;
50using apache::thrift::concurrency::ThreadManager;
Roger Meier12d70532011-12-14 23:35:28 +000051using apache::thrift::concurrency::PlatformThreadFactory;
Jake Farrellb0d95602011-12-06 01:17:26 +000052using apache::thrift::concurrency::ThreadFactory;
53using apache::thrift::concurrency::Thread;
54using apache::thrift::concurrency::Mutex;
55using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000056
Roger Meier30aae0c2011-07-08 12:23:31 +000057#ifdef LIBEVENT_VERSION_NUMBER
58#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
59#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
60#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
61#else
62// assume latest version 1 series
63#define LIBEVENT_VERSION_MAJOR 1
64#define LIBEVENT_VERSION_MINOR 14
65#define LIBEVENT_VERSION_REL 13
66#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
67#endif
68
69#if LIBEVENT_VERSION_NUMBER < 0x02000000
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040070 typedef THRIFT_SOCKET evutil_socket_t;
Roger Meier30aae0c2011-07-08 12:23:31 +000071#endif
72
73#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000074# ifndef _WIN32
75# define SOCKOPT_CAST_T void
76# else
77# define SOCKOPT_CAST_T char
78# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000079#endif
80
81template<class T>
82inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
83 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
84}
85
86template<class T>
87inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
88 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
89}
90
Mark Slee2f6404d2006-10-10 01:37:40 +000091/**
Jake Farrellb0d95602011-12-06 01:17:26 +000092 * This is a non-blocking server in C++ for high performance that
93 * operates a set of IO threads (by default only one). It assumes that
94 * all incoming requests are framed with a 4 byte length indicator and
95 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +000096 *
97 * It does not use the TServerTransport framework, but rather has socket
98 * operations hardcoded for use with select.
99 *
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 */
David Reiss01fe1532010-03-09 05:19:25 +0000101
102
103/// Overload condition actions.
104enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108};
109
Jake Farrellb0d95602011-12-06 01:17:26 +0000110class TNonblockingIOThread;
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112class TNonblockingServer : public TServer {
113 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000114 class TConnection;
115
Jake Farrellb0d95602011-12-06 01:17:26 +0000116 friend class TNonblockingIOThread;
117 private:
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 static const int LISTEN_BACKLOG = 1024;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000122 static const size_t CONNECTION_STACK_LIMIT = 1024;
123
Roger Meier3781c242011-12-11 20:07:21 +0000124 /// Default limit on frame size
125 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// Default limit on total number of connected sockets
128 static const int MAX_CONNECTIONS = INT_MAX;
129
130 /// Default limit on connections in handler/task processing
131 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
132
David Reiss89a12942010-10-06 17:10:52 +0000133 /// Default size of write buffer
134 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
135
David Reiss54bec5d2010-10-06 17:10:45 +0000136 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
137 static const int IDLE_READ_BUFFER_LIMIT = 1024;
138
139 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
140 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
141
142 /// # of calls before resizing oversized buffers (0 = check only on close)
143 static const int RESIZE_BUFFER_EVERY_N = 512;
144
Jake Farrellb0d95602011-12-06 01:17:26 +0000145 /// # of IO threads to use by default
146 static const int DEFAULT_IO_THREADS = 1;
147
148 /// File descriptor of an invalid socket
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400149 static const THRIFT_SOCKET INVALID_SOCKET_VALUE = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000150
151 /// # of IO threads this server will use
152 size_t numIOThreads_;
153
154 /// Whether to set high scheduling priority for IO threads
155 bool useHighPriorityIOThreads_;
156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Server socket file descriptor
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400158 THRIFT_SOCKET serverSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000159
David Reiss01fe1532010-03-09 05:19:25 +0000160 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 int port_;
162
Roger Meier6f2a5032013-07-08 23:35:25 +0200163 /// The optional user-provided event-base (for single-thread servers)
164 event_base* userEventBase_;
165
David Reiss01fe1532010-03-09 05:19:25 +0000166 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000167 boost::shared_ptr<ThreadManager> threadManager_;
168
David Reiss01fe1532010-03-09 05:19:25 +0000169 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000170 bool threadPoolProcessing_;
171
Jake Farrellb0d95602011-12-06 01:17:26 +0000172 // Factory to create the IO threads
Roger Meier12d70532011-12-14 23:35:28 +0000173 boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000174
Jake Farrellb0d95602011-12-06 01:17:26 +0000175 // Vector of IOThread objects that will handle our IO
176 std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000177
Jake Farrellb0d95602011-12-06 01:17:26 +0000178 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000179 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000180
181 // Synchronizes access to connection stack and similar data
182 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000183
184 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000185 size_t numTConnections_;
186
David Reiss9e8073c2010-03-09 05:19:39 +0000187 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000188 size_t numActiveProcessors_;
189
190 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000191 size_t connectionStackLimit_;
192
David Reiss01fe1532010-03-09 05:19:25 +0000193 /// Limit for number of connections processing or waiting to process
194 size_t maxActiveProcessors_;
195
196 /// Limit for number of open connections
197 size_t maxConnections_;
198
Roger Meier3781c242011-12-11 20:07:21 +0000199 /// Limit for frame size
200 size_t maxFrameSize_;
201
David Reiss068f4162010-03-09 05:19:45 +0000202 /// Time in milliseconds before an unperformed task expires (0 == infinite).
203 int64_t taskExpireTime_;
204
David Reiss01fe1532010-03-09 05:19:25 +0000205 /**
206 * Hysteresis for overload state. This is the fraction of the overload
207 * value that needs to be reached before the overload state is cleared;
208 * must be <= 1.0.
209 */
210 double overloadHysteresis_;
211
212 /// Action to take when we're overloaded.
213 TOverloadAction overloadAction_;
214
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000215 /**
David Reiss89a12942010-10-06 17:10:52 +0000216 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
217 * and found to be exceeded, reinitialized) to this size.
218 */
219 size_t writeBufferDefaultSize_;
220
221 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000222 * Max read buffer size for an idle TConnection. When we place an idle
223 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000224 * we will free the buffer (such that it will be reinitialized by the next
225 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000226 */
David Reiss54bec5d2010-10-06 17:10:45 +0000227 size_t idleReadBufferLimit_;
228
229 /**
230 * Max write buffer size for an idle connection. When we place an idle
231 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
232 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000233 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
234 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000235 */
236 size_t idleWriteBufferLimit_;
237
238 /**
239 * Every N calls we check the buffer size limits on a connected TConnection.
240 * 0 disables (i.e. the checks are only done when a connection closes).
241 */
242 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000243
244 /// Set if we are currently in an overloaded state.
245 bool overloaded_;
246
247 /// Count of connections dropped since overload started
248 uint32_t nConnectionsDropped_;
249
250 /// Count of connections dropped on overload since server started
251 uint64_t nTotalConnectionsDropped_;
252
Mark Slee2f6404d2006-10-10 01:37:40 +0000253 /**
254 * This is a stack of all the objects that have been created but that
255 * are NOT currently in use. When we close a connection, we place it on this
256 * stack so that the object can be reused later, rather than freeing the
257 * memory and reallocating a new object later.
258 */
259 std::stack<TConnection*> connectionStack_;
260
David Reiss01fe1532010-03-09 05:19:25 +0000261 /**
Roger Meier0c04fcc2013-03-22 19:52:08 +0100262 * This container holds pointers to all active connections. This container
263 * allows the server to clean up unlcosed connection objects at destruction,
264 * which in turn allows their transports, protocols, processors and handlers
265 * to deallocate and clean up correctly.
266 */
267 std::vector<TConnection*> activeConnections_;
268
269 /**
David Reiss01fe1532010-03-09 05:19:25 +0000270 * Called when server socket had something happen. We accept all waiting
271 * client connections on listen socket fd and assign TConnection objects
272 * to handle those requests.
273 *
274 * @param fd the listen socket.
275 * @param which the event flag that triggered the handler.
276 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400277 void handleEvent(THRIFT_SOCKET fd, short which);
Mark Slee2f6404d2006-10-10 01:37:40 +0000278
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000279 void init(int port) {
280 serverSocket_ = -1;
Jake Farrellb0d95602011-12-06 01:17:26 +0000281 numIOThreads_ = DEFAULT_IO_THREADS;
282 nextIOThread_ = 0;
283 useHighPriorityIOThreads_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000284 port_ = port;
Roger Meier6f2a5032013-07-08 23:35:25 +0200285 userEventBase_ = NULL;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000286 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000287 numTConnections_ = 0;
288 numActiveProcessors_ = 0;
289 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
290 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
291 maxConnections_ = MAX_CONNECTIONS;
Roger Meier3781c242011-12-11 20:07:21 +0000292 maxFrameSize_ = MAX_FRAME_SIZE;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000293 taskExpireTime_ = 0;
294 overloadHysteresis_ = 0.8;
295 overloadAction_ = T_OVERLOAD_NO_ACTION;
296 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
297 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
298 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
299 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
300 overloaded_ = false;
301 nConnectionsDropped_ = 0;
302 nTotalConnectionsDropped_ = 0;
303 }
Mark Sleef9373392007-01-24 19:41:57 +0000304
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000305 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000306 template<typename ProcessorFactory>
307 TNonblockingServer(
308 const boost::shared_ptr<ProcessorFactory>& processorFactory,
309 int port,
310 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
311 TServer(processorFactory) {
312 init(port);
313 }
314
315 template<typename Processor>
316 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
317 int port,
318 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000319 TServer(processor) {
320 init(port);
321 }
322
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000323 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000324 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000325 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000326 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
327 int port,
328 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000329 boost::shared_ptr<ThreadManager>(),
330 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
331 TServer(processorFactory) {
332
333 init(port);
334
335 setInputProtocolFactory(protocolFactory);
336 setOutputProtocolFactory(protocolFactory);
337 setThreadManager(threadManager);
338 }
339
340 template<typename Processor>
341 TNonblockingServer(
342 const boost::shared_ptr<Processor>& processor,
343 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
344 int port,
345 const boost::shared_ptr<ThreadManager>& threadManager =
346 boost::shared_ptr<ThreadManager>(),
347 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000348 TServer(processor) {
349
350 init(port);
351
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000352 setInputProtocolFactory(protocolFactory);
353 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000354 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000355 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000356
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000357 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000358 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000359 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000360 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
361 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
362 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
363 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
364 int port,
365 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000366 boost::shared_ptr<ThreadManager>(),
367 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
368 TServer(processorFactory) {
369
370 init(port);
371
372 setInputTransportFactory(inputTransportFactory);
373 setOutputTransportFactory(outputTransportFactory);
374 setInputProtocolFactory(inputProtocolFactory);
375 setOutputProtocolFactory(outputProtocolFactory);
376 setThreadManager(threadManager);
377 }
378
379 template<typename Processor>
380 TNonblockingServer(
381 const boost::shared_ptr<Processor>& processor,
382 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
383 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
384 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
385 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
386 int port,
387 const boost::shared_ptr<ThreadManager>& threadManager =
388 boost::shared_ptr<ThreadManager>(),
389 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000390 TServer(processor) {
391
392 init(port);
393
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000394 setInputTransportFactory(inputTransportFactory);
395 setOutputTransportFactory(outputTransportFactory);
396 setInputProtocolFactory(inputProtocolFactory);
397 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000398 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000399 }
Mark Slee79b16942007-11-26 19:05:29 +0000400
David Reiss8ede8182010-09-02 15:26:28 +0000401 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000402
David Reiss068f4162010-03-09 05:19:45 +0000403 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000404
David Reiss1997f102008-04-29 00:29:41 +0000405 boost::shared_ptr<ThreadManager> getThreadManager() {
406 return threadManager_;
407 }
408
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000409 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000410 * Sets the number of IO threads used by this server. Can only be used before
411 * the call to serve() and has no effect afterwards. We always use a
412 * PosixThreadFactory for the IO worker threads, because they must joinable
413 * for clean shutdown.
414 */
415 void setNumIOThreads(size_t numThreads) {
416 numIOThreads_ = numThreads;
417 }
418
419 /** Return whether the IO threads will get high scheduling priority */
420 bool useHighPriorityIOThreads() const {
421 return useHighPriorityIOThreads_;
422 }
423
424 /** Set whether the IO threads will get high scheduling priority. */
425 void setUseHighPriorityIOThreads(bool val) {
426 useHighPriorityIOThreads_ = val;
427 }
428
429 /** Return the number of IO threads used by this server. */
430 size_t getNumIOThreads() const {
431 return numIOThreads_;
432 }
433
434 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000435 * Get the maximum number of unused TConnection we will hold in reserve.
436 *
437 * @return the current limit on TConnection pool size.
438 */
David Reiss260fa932009-04-02 23:51:39 +0000439 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000440 return connectionStackLimit_;
441 }
442
443 /**
444 * Set the maximum number of unused TConnection we will hold in reserve.
445 *
446 * @param sz the new limit for TConnection pool size.
447 */
David Reiss260fa932009-04-02 23:51:39 +0000448 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000449 connectionStackLimit_ = sz;
450 }
451
Mark Slee79b16942007-11-26 19:05:29 +0000452 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000453 return threadPoolProcessing_;
454 }
455
456 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000457 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000458 }
459
David Reiss01fe1532010-03-09 05:19:25 +0000460 /**
461 * Return the count of sockets currently connected to.
462 *
463 * @return count of connected sockets.
464 */
465 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000466 return numTConnections_;
467 }
468
David Reiss01fe1532010-03-09 05:19:25 +0000469 /**
Roger Meierec8027f2012-04-11 21:43:25 +0000470 * Return the count of sockets currently connected to.
471 *
472 * @return count of connected sockets.
473 */
474 size_t getNumActiveConnections() const {
475 return getNumConnections() - getNumIdleConnections();
476 }
477
478 /**
David Reiss01fe1532010-03-09 05:19:25 +0000479 * Return the count of connection objects allocated but not in use.
480 *
481 * @return count of idle connection objects.
482 */
483 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000484 return connectionStack_.size();
485 }
486
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000487 /**
David Reiss01fe1532010-03-09 05:19:25 +0000488 * Return count of number of connections which are currently processing.
489 * This is defined as a connection where all data has been received and
490 * either assigned a task (when threading) or passed to a handler (when
491 * not threading), and where the handler has not yet returned.
492 *
493 * @return # of connections currently processing.
494 */
495 size_t getNumActiveProcessors() const {
496 return numActiveProcessors_;
497 }
498
499 /// Increment the count of connections currently processing.
500 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000501 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000502 ++numActiveProcessors_;
503 }
504
505 /// Decrement the count of connections currently processing.
506 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000507 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000508 if (numActiveProcessors_ > 0) {
509 --numActiveProcessors_;
510 }
511 }
512
513 /**
514 * Get the maximum # of connections allowed before overload.
515 *
516 * @return current setting.
517 */
518 size_t getMaxConnections() const {
519 return maxConnections_;
520 }
521
522 /**
523 * Set the maximum # of connections allowed before overload.
524 *
525 * @param maxConnections new setting for maximum # of connections.
526 */
527 void setMaxConnections(size_t maxConnections) {
528 maxConnections_ = maxConnections;
529 }
530
531 /**
532 * Get the maximum # of connections waiting in handler/task before overload.
533 *
534 * @return current setting.
535 */
536 size_t getMaxActiveProcessors() const {
537 return maxActiveProcessors_;
538 }
539
540 /**
541 * Set the maximum # of connections waiting in handler/task before overload.
542 *
543 * @param maxActiveProcessors new setting for maximum # of active processes.
544 */
545 void setMaxActiveProcessors(size_t maxActiveProcessors) {
546 maxActiveProcessors_ = maxActiveProcessors;
547 }
548
549 /**
Roger Meier3781c242011-12-11 20:07:21 +0000550 * Get the maximum allowed frame size.
551 *
552 * If a client tries to send a message larger than this limit,
553 * its connection will be closed.
554 *
555 * @return Maxium frame size, in bytes.
556 */
557 size_t getMaxFrameSize() const {
558 return maxFrameSize_;
559 }
560
561 /**
562 * Set the maximum allowed frame size.
563 *
564 * @param maxFrameSize The new maximum frame size.
565 */
566 void setMaxFrameSize(size_t maxFrameSize) {
567 maxFrameSize_ = maxFrameSize;
568 }
569
570 /**
David Reiss01fe1532010-03-09 05:19:25 +0000571 * Get fraction of maximum limits before an overload condition is cleared.
572 *
573 * @return hysteresis fraction
574 */
575 double getOverloadHysteresis() const {
576 return overloadHysteresis_;
577 }
578
579 /**
580 * Set fraction of maximum limits before an overload condition is cleared.
581 * A good value would probably be between 0.5 and 0.9.
582 *
583 * @param hysteresisFraction fraction <= 1.0.
584 */
585 void setOverloadHysteresis(double hysteresisFraction) {
586 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
587 overloadHysteresis_ = hysteresisFraction;
588 }
589 }
590
591 /**
592 * Get the action the server will take on overload.
593 *
594 * @return a TOverloadAction enum value for the currently set action.
595 */
596 TOverloadAction getOverloadAction() const {
597 return overloadAction_;
598 }
599
600 /**
601 * Set the action the server is to take on overload.
602 *
603 * @param overloadAction a TOverloadAction enum value for the action.
604 */
605 void setOverloadAction(TOverloadAction overloadAction) {
606 overloadAction_ = overloadAction;
607 }
608
609 /**
David Reiss068f4162010-03-09 05:19:45 +0000610 * Get the time in milliseconds after which a task expires (0 == infinite).
611 *
612 * @return a 64-bit time in milliseconds.
613 */
614 int64_t getTaskExpireTime() const {
615 return taskExpireTime_;
616 }
617
618 /**
619 * Set the time in milliseconds after which a task expires (0 == infinite).
620 *
621 * @param taskExpireTime a 64-bit time in milliseconds.
622 */
623 void setTaskExpireTime(int64_t taskExpireTime) {
624 taskExpireTime_ = taskExpireTime;
625 }
626
627 /**
David Reiss01fe1532010-03-09 05:19:25 +0000628 * Determine if the server is currently overloaded.
629 * This function checks the maximums for open connections and connections
630 * currently in processing, and sets an overload condition if they are
631 * exceeded. The overload will persist until both values are below the
632 * current hysteresis fraction of their maximums.
633 *
634 * @return true if an overload condition exists, false if not.
635 */
636 bool serverOverloaded();
637
638 /** Pop and discard next task on threadpool wait queue.
639 *
640 * @return true if a task was discarded, false if the wait queue was empty.
641 */
642 bool drainPendingTask();
643
644 /**
David Reiss89a12942010-10-06 17:10:52 +0000645 * Get the starting size of a TConnection object's write buffer.
646 *
647 * @return # bytes we initialize a TConnection object's write buffer to.
648 */
649 size_t getWriteBufferDefaultSize() const {
650 return writeBufferDefaultSize_;
651 }
652
653 /**
654 * Set the starting size of a TConnection object's write buffer.
655 *
656 * @param size # bytes we initialize a TConnection object's write buffer to.
657 */
658 void setWriteBufferDefaultSize(size_t size) {
659 writeBufferDefaultSize_ = size;
660 }
661
662 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000663 * Get the maximum size of read buffer allocated to idle TConnection objects.
664 *
David Reiss89a12942010-10-06 17:10:52 +0000665 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000666 */
667 size_t getIdleReadBufferLimit() const {
668 return idleReadBufferLimit_;
669 }
670
671 /**
672 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
673 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000674 *
David Reiss89a12942010-10-06 17:10:52 +0000675 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000676 */
677 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000678 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000679 }
680
681 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000682 * Set the maximum size read buffer allocated to idle TConnection objects.
683 * If a TConnection object is found (either on connection close or between
684 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000685 * allocated to its read buffer, we free it and allow it to be reinitialized
686 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000687 *
688 * @param limit of bytes beyond which we will shrink buffers when checked.
689 */
690 void setIdleReadBufferLimit(size_t limit) {
691 idleReadBufferLimit_ = limit;
692 }
693
694 /**
695 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
696 * Set the maximum size read buffer allocated to idle TConnection objects.
697 * If a TConnection object is found (either on connection close or between
698 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000699 * allocated to its read buffer, we free it and allow it to be reinitialized
700 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000701 *
702 * @param limit of bytes beyond which we will shrink buffers when checked.
703 */
704 void setIdleBufferMemLimit(size_t limit) {
705 idleReadBufferLimit_ = limit;
706 }
707
Jake Farrellb0d95602011-12-06 01:17:26 +0000708
David Reiss54bec5d2010-10-06 17:10:45 +0000709
710 /**
711 * Get the maximum size of write buffer allocated to idle TConnection objects.
712 *
713 * @return # bytes beyond which we will reallocate buffers when checked.
714 */
715 size_t getIdleWriteBufferLimit() const {
716 return idleWriteBufferLimit_;
717 }
718
719 /**
720 * Set the maximum size write buffer allocated to idle TConnection objects.
721 * If a TConnection object is found (either on connection close or between
722 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000723 * allocated to its write buffer, we destroy and construct that buffer with
724 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000725 *
726 * @param limit of bytes beyond which we will shrink buffers when idle.
727 */
David Reiss54bec5d2010-10-06 17:10:45 +0000728 void setIdleWriteBufferLimit(size_t limit) {
729 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000730 }
731
David Reiss01fe1532010-03-09 05:19:25 +0000732 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000733 * Get # of calls made between buffer size checks. 0 means disabled.
734 *
735 * @return # of calls between buffer size checks.
736 */
737 int32_t getResizeBufferEveryN() const {
738 return resizeBufferEveryN_;
739 }
740
741 /**
742 * Check buffer sizes every "count" calls. This allows buffer limits
743 * to be enforced for persistant connections with a controllable degree
744 * of overhead. 0 disables checks except at connection close.
745 *
746 * @param count the number of calls between checks, or 0 to disable
747 */
748 void setResizeBufferEveryN(int32_t count) {
749 resizeBufferEveryN_ = count;
750 }
751
Jake Farrellb0d95602011-12-06 01:17:26 +0000752 /**
753 * Main workhorse function, starts up the server listening on a port and
754 * loops over the libevent handler.
755 */
756 void serve();
David Reiss54bec5d2010-10-06 17:10:45 +0000757
Jake Farrellb0d95602011-12-06 01:17:26 +0000758 /**
759 * Causes the server to terminate gracefully (can be called from any thread).
760 */
761 void stop();
David Reiss54bec5d2010-10-06 17:10:45 +0000762
Jake Farrellb0d95602011-12-06 01:17:26 +0000763 /// Creates a socket to listen on and binds it to the local port.
764 void createAndListenOnSocket();
765
766 /**
767 * Takes a socket created by createAndListenOnSocket() and sets various
768 * options on it to prepare for use in the server.
769 *
770 * @param fd descriptor of socket to be initialized/
771 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400772 void listenSocket(THRIFT_SOCKET fd);
Roger Meier6f2a5032013-07-08 23:35:25 +0200773
774 /**
775 * Register the optional user-provided event-base (for single-thread servers)
776 *
777 * This method should be used when the server is running in a single-thread
778 * mode, and the event base is provided by the user (i.e., the caller).
779 *
780 * @param user_event_base the user-provided event-base. The user is
781 * responsible for freeing the event base memory.
782 */
783 void registerEvents(event_base* user_event_base);
784
785 /**
786 * Returns the optional user-provided event-base (for single-thread servers).
787 */
788 event_base* getUserEventBase() const { return userEventBase_; }
789
790 private:
791 /**
792 * Callback function that the threadmanager calls when a task reaches
793 * its expiration time. It is needed to clean up the expired connection.
794 *
795 * @param task the runnable associated with the expired task.
796 */
797 void expireClose(boost::shared_ptr<Runnable> task);
798
David Reiss54bec5d2010-10-06 17:10:45 +0000799 /**
David Reiss01fe1532010-03-09 05:19:25 +0000800 * Return an initialized connection object. Creates or recovers from
801 * pool a TConnection and initializes it with the provided socket FD
802 * and flags.
803 *
804 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000805 * @param addr the sockaddr of the client
806 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000807 * @return pointer to initialized TConnection object.
808 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400809 TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
Jake Farrellb0d95602011-12-06 01:17:26 +0000810 socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000811
David Reiss01fe1532010-03-09 05:19:25 +0000812 /**
813 * Returns a connection to pool or deletion. If the connection pool
814 * (a stack) isn't full, place the connection object on it, otherwise
815 * just delete it.
816 *
817 * @param connection the TConection being returned.
818 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000819 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000820};
Mark Slee2f6404d2006-10-10 01:37:40 +0000821
Jake Farrellb0d95602011-12-06 01:17:26 +0000822class TNonblockingIOThread : public Runnable {
823 public:
824 // Creates an IO thread and sets up the event base. The listenSocket should
825 // be a valid FD on which listen() has already been called. If the
826 // listenSocket is < 0, accepting will not be done.
827 TNonblockingIOThread(TNonblockingServer* server,
828 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400829 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000830 bool useHighPriority);
831
832 ~TNonblockingIOThread();
833
834 // Returns the event-base for this thread.
835 event_base* getEventBase() const { return eventBase_; }
836
837 // Returns the server for this thread.
838 TNonblockingServer* getServer() const { return server_; }
839
840 // Returns the number of this IO thread.
841 int getThreadNumber() const { return number_; }
842
843 // Returns the thread id associated with this object. This should
844 // only be called after the thread has been started.
Roger Meier12d70532011-12-14 23:35:28 +0000845 Thread::id_t getThreadId() const { return threadId_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000846
847 // Returns the send-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000848 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000849
850 // Returns the read-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000851 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000852
853 // Returns the actual thread object associated with this IO thread.
854 boost::shared_ptr<Thread> getThread() const { return thread_; }
855
856 // Sets the actual thread object associated with this IO thread.
857 void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
858
859 // Used by TConnection objects to indicate processing has finished.
860 bool notify(TNonblockingServer::TConnection* conn);
861
862 // Enters the event loop and does not return until a call to stop().
863 virtual void run();
864
865 // Exits the event loop as soon as possible.
866 void stop();
867
868 // Ensures that the event-loop thread is fully finished and shut down.
869 void join();
870
Roger Meier6f2a5032013-07-08 23:35:25 +0200871 /// Registers the events for the notification & listen sockets
872 void registerEvents();
873
Jake Farrellb0d95602011-12-06 01:17:26 +0000874 private:
David Reiss01fe1532010-03-09 05:19:25 +0000875 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000876 * C-callable event handler for signaling task completion. Provides a
877 * callback that libevent can understand that will read a connection
878 * object's address from a pipe and call connection->transition() for
879 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000880 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000881 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000882 */
Roger Meier12d70532011-12-14 23:35:28 +0000883 static void notifyHandler(evutil_socket_t fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000884
885 /**
David Reiss01fe1532010-03-09 05:19:25 +0000886 * C-callable event handler for listener events. Provides a callback
887 * that libevent can understand which invokes server->handleEvent().
888 *
889 * @param fd the descriptor the event occured on.
890 * @param which the flags associated with the event.
891 * @param v void* callback arg where we placed TNonblockingServer's "this".
892 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000893 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000894 ((TNonblockingServer*)v)->handleEvent(fd, which);
895 }
896
Jake Farrellb0d95602011-12-06 01:17:26 +0000897 /// Exits the loop ASAP in case of shutdown or error.
898 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000899
David Reiss01fe1532010-03-09 05:19:25 +0000900 /// Create the pipe used to notify I/O process of task completion.
901 void createNotificationPipe();
902
Jake Farrellb0d95602011-12-06 01:17:26 +0000903 /// Unregisters our events for notification and listen sockets.
904 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000905
Jake Farrellb0d95602011-12-06 01:17:26 +0000906 /// Sets (or clears) high priority scheduling status for the current thread.
907 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000908
Jake Farrellb0d95602011-12-06 01:17:26 +0000909 private:
910 /// associated server
911 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000912
Jake Farrellb0d95602011-12-06 01:17:26 +0000913 /// thread number (for debugging).
914 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000915
Jake Farrellb0d95602011-12-06 01:17:26 +0000916 /// The actual physical thread id.
Roger Meier12d70532011-12-14 23:35:28 +0000917 Thread::id_t threadId_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000918
919 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400920 THRIFT_SOCKET listenSocket_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000921
922 /// Sets a high scheduling priority when running
923 bool useHighPriority_;
924
925 /// pointer to eventbase to be used for looping
926 event_base* eventBase_;
927
Roger Meier6f2a5032013-07-08 23:35:25 +0200928 /// Set to true if this class is responsible for freeing the event base
929 /// memory.
930 bool ownEventBase_;
931
Jake Farrellb0d95602011-12-06 01:17:26 +0000932 /// Used with eventBase_ for connection events (only in listener thread)
933 struct event serverEvent_;
934
935 /// Used with eventBase_ for task completion notification
936 struct event notificationEvent_;
937
938 /// File descriptors for pipe used for task completion notification.
Roger Meier12d70532011-12-14 23:35:28 +0000939 evutil_socket_t notificationPipeFDs_[2];
Jake Farrellb0d95602011-12-06 01:17:26 +0000940
941 /// Actual IO Thread
942 boost::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000943};
944
T Jake Lucianib5e62212009-01-31 22:36:20 +0000945}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000946
Jake Farrellb0d95602011-12-06 01:17:26 +0000947#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_