blob: 532d4ae962d27b761f63cd33c4f2e75e8f51c2df [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Mark Slee2f6404d2006-10-10 01:37:40 +000020#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
Roger Meier49ff8b12012-04-13 09:12:31 +000023#include <thrift/Thrift.h>
24#include <thrift/server/TServer.h>
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040025#include <thrift/transport/PlatformSocket.h>
Roger Meier49ff8b12012-04-13 09:12:31 +000026#include <thrift/transport/TBufferTransports.h>
27#include <thrift/transport/TSocket.h>
28#include <thrift/concurrency/ThreadManager.h>
David Reiss01fe1532010-03-09 05:19:25 +000029#include <climits>
Roger Meier49ff8b12012-04-13 09:12:31 +000030#include <thrift/concurrency/Thread.h>
31#include <thrift/concurrency/PlatformThreadFactory.h>
32#include <thrift/concurrency/Mutex.h>
Mark Slee2f6404d2006-10-10 01:37:40 +000033#include <stack>
Jake Farrellb0d95602011-12-06 01:17:26 +000034#include <vector>
David Reiss9b209552008-04-08 06:26:05 +000035#include <string>
David Reissd7a16f42008-02-19 22:47:29 +000036#include <cstdlib>
Bryan Duxbury266b1732011-09-01 16:50:28 +000037#ifdef HAVE_UNISTD_H
David Reiss5105b2e2009-05-21 02:28:27 +000038#include <unistd.h>
Bryan Duxbury266b1732011-09-01 16:50:28 +000039#endif
Mark Slee2f6404d2006-10-10 01:37:40 +000040#include <event.h>
41
Jake Farrellb0d95602011-12-06 01:17:26 +000042
43
T Jake Lucianib5e62212009-01-31 22:36:20 +000044namespace apache { namespace thrift { namespace server {
Mark Slee2f6404d2006-10-10 01:37:40 +000045
T Jake Lucianib5e62212009-01-31 22:36:20 +000046using apache::thrift::transport::TMemoryBuffer;
David Reiss105961d2010-10-06 17:10:17 +000047using apache::thrift::transport::TSocket;
T Jake Lucianib5e62212009-01-31 22:36:20 +000048using apache::thrift::protocol::TProtocol;
49using apache::thrift::concurrency::Runnable;
50using apache::thrift::concurrency::ThreadManager;
Roger Meier12d70532011-12-14 23:35:28 +000051using apache::thrift::concurrency::PlatformThreadFactory;
Jake Farrellb0d95602011-12-06 01:17:26 +000052using apache::thrift::concurrency::ThreadFactory;
53using apache::thrift::concurrency::Thread;
54using apache::thrift::concurrency::Mutex;
55using apache::thrift::concurrency::Guard;
Mark Slee2f6404d2006-10-10 01:37:40 +000056
Roger Meier30aae0c2011-07-08 12:23:31 +000057#ifdef LIBEVENT_VERSION_NUMBER
58#define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
59#define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
60#define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
61#else
62// assume latest version 1 series
63#define LIBEVENT_VERSION_MAJOR 1
64#define LIBEVENT_VERSION_MINOR 14
65#define LIBEVENT_VERSION_REL 13
66#define LIBEVENT_VERSION_NUMBER ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
67#endif
68
69#if LIBEVENT_VERSION_NUMBER < 0x02000000
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -040070 typedef THRIFT_SOCKET evutil_socket_t;
Roger Meier30aae0c2011-07-08 12:23:31 +000071#endif
72
73#ifndef SOCKOPT_CAST_T
Roger Meier84e4a3c2011-09-16 20:58:44 +000074# ifndef _WIN32
75# define SOCKOPT_CAST_T void
76# else
77# define SOCKOPT_CAST_T char
78# endif // _WIN32
Roger Meier30aae0c2011-07-08 12:23:31 +000079#endif
80
81template<class T>
82inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
83 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
84}
85
86template<class T>
87inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
88 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
89}
90
Mark Slee2f6404d2006-10-10 01:37:40 +000091/**
Jake Farrellb0d95602011-12-06 01:17:26 +000092 * This is a non-blocking server in C++ for high performance that
93 * operates a set of IO threads (by default only one). It assumes that
94 * all incoming requests are framed with a 4 byte length indicator and
95 * writes out responses using the same framing.
Mark Slee2f6404d2006-10-10 01:37:40 +000096 *
97 * It does not use the TServerTransport framework, but rather has socket
98 * operations hardcoded for use with select.
99 *
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 */
David Reiss01fe1532010-03-09 05:19:25 +0000101
102
103/// Overload condition actions.
104enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108};
109
Jake Farrellb0d95602011-12-06 01:17:26 +0000110class TNonblockingIOThread;
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112class TNonblockingServer : public TServer {
113 private:
Bryan Duxbury526fa8e2011-08-29 20:28:23 +0000114 class TConnection;
115
Jake Farrellb0d95602011-12-06 01:17:26 +0000116 friend class TNonblockingIOThread;
117 private:
David Reiss01fe1532010-03-09 05:19:25 +0000118 /// Listen backlog
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 static const int LISTEN_BACKLOG = 1024;
120
David Reiss01fe1532010-03-09 05:19:25 +0000121 /// Default limit on size of idle connection pool
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000122 static const size_t CONNECTION_STACK_LIMIT = 1024;
123
Roger Meier3781c242011-12-11 20:07:21 +0000124 /// Default limit on frame size
125 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
126
David Reiss01fe1532010-03-09 05:19:25 +0000127 /// Default limit on total number of connected sockets
128 static const int MAX_CONNECTIONS = INT_MAX;
129
130 /// Default limit on connections in handler/task processing
131 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
132
David Reiss89a12942010-10-06 17:10:52 +0000133 /// Default size of write buffer
134 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
135
David Reiss54bec5d2010-10-06 17:10:45 +0000136 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
137 static const int IDLE_READ_BUFFER_LIMIT = 1024;
138
139 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
140 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
141
142 /// # of calls before resizing oversized buffers (0 = check only on close)
143 static const int RESIZE_BUFFER_EVERY_N = 512;
144
Jake Farrellb0d95602011-12-06 01:17:26 +0000145 /// # of IO threads to use by default
146 static const int DEFAULT_IO_THREADS = 1;
147
Jake Farrellb0d95602011-12-06 01:17:26 +0000148 /// # of IO threads this server will use
149 size_t numIOThreads_;
150
151 /// Whether to set high scheduling priority for IO threads
152 bool useHighPriorityIOThreads_;
153
David Reiss01fe1532010-03-09 05:19:25 +0000154 /// Server socket file descriptor
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400155 THRIFT_SOCKET serverSocket_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156
David Reiss01fe1532010-03-09 05:19:25 +0000157 /// Port server runs on
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 int port_;
159
Roger Meier6f2a5032013-07-08 23:35:25 +0200160 /// The optional user-provided event-base (for single-thread servers)
161 event_base* userEventBase_;
162
David Reiss01fe1532010-03-09 05:19:25 +0000163 /// For processing via thread pool, may be NULL
Mark Sleee02385b2007-06-09 01:21:16 +0000164 boost::shared_ptr<ThreadManager> threadManager_;
165
David Reiss01fe1532010-03-09 05:19:25 +0000166 /// Is thread pool processing?
Mark Sleee02385b2007-06-09 01:21:16 +0000167 bool threadPoolProcessing_;
168
Jake Farrellb0d95602011-12-06 01:17:26 +0000169 // Factory to create the IO threads
Roger Meier12d70532011-12-14 23:35:28 +0000170 boost::shared_ptr<PlatformThreadFactory> ioThreadFactory_;
Mark Slee79b16942007-11-26 19:05:29 +0000171
Jake Farrellb0d95602011-12-06 01:17:26 +0000172 // Vector of IOThread objects that will handle our IO
173 std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
Mark Slee79b16942007-11-26 19:05:29 +0000174
Jake Farrellb0d95602011-12-06 01:17:26 +0000175 // Index of next IO Thread to be used (for round-robin)
Roger Meierd0cdecf2011-12-08 19:34:01 +0000176 uint32_t nextIOThread_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000177
178 // Synchronizes access to connection stack and similar data
179 Mutex connMutex_;
David Reiss01fe1532010-03-09 05:19:25 +0000180
181 /// Number of TConnection object we've created
David Reiss1997f102008-04-29 00:29:41 +0000182 size_t numTConnections_;
183
David Reiss9e8073c2010-03-09 05:19:39 +0000184 /// Number of Connections processing or waiting to process
David Reiss01fe1532010-03-09 05:19:25 +0000185 size_t numActiveProcessors_;
186
187 /// Limit for how many TConnection objects to cache
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000188 size_t connectionStackLimit_;
189
David Reiss01fe1532010-03-09 05:19:25 +0000190 /// Limit for number of connections processing or waiting to process
191 size_t maxActiveProcessors_;
192
193 /// Limit for number of open connections
194 size_t maxConnections_;
195
Roger Meier3781c242011-12-11 20:07:21 +0000196 /// Limit for frame size
197 size_t maxFrameSize_;
198
David Reiss068f4162010-03-09 05:19:45 +0000199 /// Time in milliseconds before an unperformed task expires (0 == infinite).
200 int64_t taskExpireTime_;
201
David Reiss01fe1532010-03-09 05:19:25 +0000202 /**
203 * Hysteresis for overload state. This is the fraction of the overload
204 * value that needs to be reached before the overload state is cleared;
205 * must be <= 1.0.
206 */
207 double overloadHysteresis_;
208
209 /// Action to take when we're overloaded.
210 TOverloadAction overloadAction_;
211
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000212 /**
David Reiss89a12942010-10-06 17:10:52 +0000213 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
214 * and found to be exceeded, reinitialized) to this size.
215 */
216 size_t writeBufferDefaultSize_;
217
218 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000219 * Max read buffer size for an idle TConnection. When we place an idle
220 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
David Reiss89a12942010-10-06 17:10:52 +0000221 * we will free the buffer (such that it will be reinitialized by the next
222 * received frame) if it has exceeded this limit. 0 disables this check.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000223 */
David Reiss54bec5d2010-10-06 17:10:45 +0000224 size_t idleReadBufferLimit_;
225
226 /**
227 * Max write buffer size for an idle connection. When we place an idle
228 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
229 * we insure that its write buffer is <= to this size; otherwise we
David Reiss89a12942010-10-06 17:10:52 +0000230 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
231 * idle connections don't hog memory. 0 disables this check.
David Reiss54bec5d2010-10-06 17:10:45 +0000232 */
233 size_t idleWriteBufferLimit_;
234
235 /**
236 * Every N calls we check the buffer size limits on a connected TConnection.
237 * 0 disables (i.e. the checks are only done when a connection closes).
238 */
239 int32_t resizeBufferEveryN_;
David Reiss01fe1532010-03-09 05:19:25 +0000240
241 /// Set if we are currently in an overloaded state.
242 bool overloaded_;
243
244 /// Count of connections dropped since overload started
245 uint32_t nConnectionsDropped_;
246
247 /// Count of connections dropped on overload since server started
248 uint64_t nTotalConnectionsDropped_;
249
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 /**
251 * This is a stack of all the objects that have been created but that
252 * are NOT currently in use. When we close a connection, we place it on this
253 * stack so that the object can be reused later, rather than freeing the
254 * memory and reallocating a new object later.
255 */
256 std::stack<TConnection*> connectionStack_;
257
David Reiss01fe1532010-03-09 05:19:25 +0000258 /**
Roger Meier0c04fcc2013-03-22 19:52:08 +0100259 * This container holds pointers to all active connections. This container
260 * allows the server to clean up unlcosed connection objects at destruction,
261 * which in turn allows their transports, protocols, processors and handlers
262 * to deallocate and clean up correctly.
263 */
264 std::vector<TConnection*> activeConnections_;
265
266 /**
David Reiss01fe1532010-03-09 05:19:25 +0000267 * Called when server socket had something happen. We accept all waiting
268 * client connections on listen socket fd and assign TConnection objects
269 * to handle those requests.
270 *
271 * @param fd the listen socket.
272 * @param which the event flag that triggered the handler.
273 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400274 void handleEvent(THRIFT_SOCKET fd, short which);
Mark Slee2f6404d2006-10-10 01:37:40 +0000275
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000276 void init(int port) {
Roger Meier0be9ffa2013-07-19 21:10:01 +0200277 serverSocket_ = THRIFT_INVALID_SOCKET;
Jake Farrellb0d95602011-12-06 01:17:26 +0000278 numIOThreads_ = DEFAULT_IO_THREADS;
279 nextIOThread_ = 0;
280 useHighPriorityIOThreads_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000281 port_ = port;
Roger Meier6f2a5032013-07-08 23:35:25 +0200282 userEventBase_ = NULL;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000283 threadPoolProcessing_ = false;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000284 numTConnections_ = 0;
285 numActiveProcessors_ = 0;
286 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
287 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
288 maxConnections_ = MAX_CONNECTIONS;
Roger Meier3781c242011-12-11 20:07:21 +0000289 maxFrameSize_ = MAX_FRAME_SIZE;
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000290 taskExpireTime_ = 0;
291 overloadHysteresis_ = 0.8;
292 overloadAction_ = T_OVERLOAD_NO_ACTION;
293 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
294 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
295 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
296 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
297 overloaded_ = false;
298 nConnectionsDropped_ = 0;
299 nTotalConnectionsDropped_ = 0;
300 }
Mark Sleef9373392007-01-24 19:41:57 +0000301
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000302 public:
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000303 template<typename ProcessorFactory>
304 TNonblockingServer(
305 const boost::shared_ptr<ProcessorFactory>& processorFactory,
306 int port,
307 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
308 TServer(processorFactory) {
309 init(port);
310 }
311
312 template<typename Processor>
313 TNonblockingServer(const boost::shared_ptr<Processor>& processor,
314 int port,
315 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000316 TServer(processor) {
317 init(port);
318 }
319
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000320 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000321 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000322 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000323 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
324 int port,
325 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000326 boost::shared_ptr<ThreadManager>(),
327 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
328 TServer(processorFactory) {
329
330 init(port);
331
332 setInputProtocolFactory(protocolFactory);
333 setOutputProtocolFactory(protocolFactory);
334 setThreadManager(threadManager);
335 }
336
337 template<typename Processor>
338 TNonblockingServer(
339 const boost::shared_ptr<Processor>& processor,
340 const boost::shared_ptr<TProtocolFactory>& protocolFactory,
341 int port,
342 const boost::shared_ptr<ThreadManager>& threadManager =
343 boost::shared_ptr<ThreadManager>(),
344 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000345 TServer(processor) {
346
347 init(port);
348
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000349 setInputProtocolFactory(protocolFactory);
350 setOutputProtocolFactory(protocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000351 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000352 }
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000353
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000354 template<typename ProcessorFactory>
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000355 TNonblockingServer(
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000356 const boost::shared_ptr<ProcessorFactory>& processorFactory,
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000357 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
358 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
359 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
360 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
361 int port,
362 const boost::shared_ptr<ThreadManager>& threadManager =
Bryan Duxbury7a9fb812011-09-01 18:31:53 +0000363 boost::shared_ptr<ThreadManager>(),
364 THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) :
365 TServer(processorFactory) {
366
367 init(port);
368
369 setInputTransportFactory(inputTransportFactory);
370 setOutputTransportFactory(outputTransportFactory);
371 setInputProtocolFactory(inputProtocolFactory);
372 setOutputProtocolFactory(outputProtocolFactory);
373 setThreadManager(threadManager);
374 }
375
376 template<typename Processor>
377 TNonblockingServer(
378 const boost::shared_ptr<Processor>& processor,
379 const boost::shared_ptr<TTransportFactory>& inputTransportFactory,
380 const boost::shared_ptr<TTransportFactory>& outputTransportFactory,
381 const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory,
382 const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory,
383 int port,
384 const boost::shared_ptr<ThreadManager>& threadManager =
385 boost::shared_ptr<ThreadManager>(),
386 THRIFT_OVERLOAD_IF(Processor, TProcessor)) :
Bryan Duxburyc7fed1f2011-08-29 18:01:24 +0000387 TServer(processor) {
388
389 init(port);
390
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000391 setInputTransportFactory(inputTransportFactory);
392 setOutputTransportFactory(outputTransportFactory);
393 setInputProtocolFactory(inputProtocolFactory);
394 setOutputProtocolFactory(outputProtocolFactory);
Mark Sleee02385b2007-06-09 01:21:16 +0000395 setThreadManager(threadManager);
Aditya Agarwal9abb0d62007-01-24 22:53:54 +0000396 }
Mark Slee79b16942007-11-26 19:05:29 +0000397
David Reiss8ede8182010-09-02 15:26:28 +0000398 ~TNonblockingServer();
Mark Slee2f6404d2006-10-10 01:37:40 +0000399
David Reiss068f4162010-03-09 05:19:45 +0000400 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
Mark Sleee02385b2007-06-09 01:21:16 +0000401
David Reiss1997f102008-04-29 00:29:41 +0000402 boost::shared_ptr<ThreadManager> getThreadManager() {
403 return threadManager_;
404 }
405
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000406 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000407 * Sets the number of IO threads used by this server. Can only be used before
408 * the call to serve() and has no effect afterwards. We always use a
409 * PosixThreadFactory for the IO worker threads, because they must joinable
410 * for clean shutdown.
411 */
412 void setNumIOThreads(size_t numThreads) {
413 numIOThreads_ = numThreads;
414 }
415
416 /** Return whether the IO threads will get high scheduling priority */
417 bool useHighPriorityIOThreads() const {
418 return useHighPriorityIOThreads_;
419 }
420
421 /** Set whether the IO threads will get high scheduling priority. */
422 void setUseHighPriorityIOThreads(bool val) {
423 useHighPriorityIOThreads_ = val;
424 }
425
426 /** Return the number of IO threads used by this server. */
427 size_t getNumIOThreads() const {
428 return numIOThreads_;
429 }
430
431 /**
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000432 * Get the maximum number of unused TConnection we will hold in reserve.
433 *
434 * @return the current limit on TConnection pool size.
435 */
David Reiss260fa932009-04-02 23:51:39 +0000436 size_t getConnectionStackLimit() const {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000437 return connectionStackLimit_;
438 }
439
440 /**
441 * Set the maximum number of unused TConnection we will hold in reserve.
442 *
443 * @param sz the new limit for TConnection pool size.
444 */
David Reiss260fa932009-04-02 23:51:39 +0000445 void setConnectionStackLimit(size_t sz) {
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000446 connectionStackLimit_ = sz;
447 }
448
Mark Slee79b16942007-11-26 19:05:29 +0000449 bool isThreadPoolProcessing() const {
Mark Sleee02385b2007-06-09 01:21:16 +0000450 return threadPoolProcessing_;
451 }
452
453 void addTask(boost::shared_ptr<Runnable> task) {
David Reiss068f4162010-03-09 05:19:45 +0000454 threadManager_->add(task, 0LL, taskExpireTime_);
Mark Sleee02385b2007-06-09 01:21:16 +0000455 }
456
David Reiss01fe1532010-03-09 05:19:25 +0000457 /**
458 * Return the count of sockets currently connected to.
459 *
460 * @return count of connected sockets.
461 */
462 size_t getNumConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000463 return numTConnections_;
464 }
465
David Reiss01fe1532010-03-09 05:19:25 +0000466 /**
Roger Meierec8027f2012-04-11 21:43:25 +0000467 * Return the count of sockets currently connected to.
468 *
469 * @return count of connected sockets.
470 */
471 size_t getNumActiveConnections() const {
472 return getNumConnections() - getNumIdleConnections();
473 }
474
475 /**
David Reiss01fe1532010-03-09 05:19:25 +0000476 * Return the count of connection objects allocated but not in use.
477 *
478 * @return count of idle connection objects.
479 */
480 size_t getNumIdleConnections() const {
David Reiss1997f102008-04-29 00:29:41 +0000481 return connectionStack_.size();
482 }
483
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000484 /**
David Reiss01fe1532010-03-09 05:19:25 +0000485 * Return count of number of connections which are currently processing.
486 * This is defined as a connection where all data has been received and
487 * either assigned a task (when threading) or passed to a handler (when
488 * not threading), and where the handler has not yet returned.
489 *
490 * @return # of connections currently processing.
491 */
492 size_t getNumActiveProcessors() const {
493 return numActiveProcessors_;
494 }
495
496 /// Increment the count of connections currently processing.
497 void incrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000498 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000499 ++numActiveProcessors_;
500 }
501
502 /// Decrement the count of connections currently processing.
503 void decrementActiveProcessors() {
Jake Farrellb0d95602011-12-06 01:17:26 +0000504 Guard g(connMutex_);
David Reiss01fe1532010-03-09 05:19:25 +0000505 if (numActiveProcessors_ > 0) {
506 --numActiveProcessors_;
507 }
508 }
509
510 /**
511 * Get the maximum # of connections allowed before overload.
512 *
513 * @return current setting.
514 */
515 size_t getMaxConnections() const {
516 return maxConnections_;
517 }
518
519 /**
520 * Set the maximum # of connections allowed before overload.
521 *
522 * @param maxConnections new setting for maximum # of connections.
523 */
524 void setMaxConnections(size_t maxConnections) {
525 maxConnections_ = maxConnections;
526 }
527
528 /**
529 * Get the maximum # of connections waiting in handler/task before overload.
530 *
531 * @return current setting.
532 */
533 size_t getMaxActiveProcessors() const {
534 return maxActiveProcessors_;
535 }
536
537 /**
538 * Set the maximum # of connections waiting in handler/task before overload.
539 *
540 * @param maxActiveProcessors new setting for maximum # of active processes.
541 */
542 void setMaxActiveProcessors(size_t maxActiveProcessors) {
543 maxActiveProcessors_ = maxActiveProcessors;
544 }
545
546 /**
Roger Meier3781c242011-12-11 20:07:21 +0000547 * Get the maximum allowed frame size.
548 *
549 * If a client tries to send a message larger than this limit,
550 * its connection will be closed.
551 *
552 * @return Maxium frame size, in bytes.
553 */
554 size_t getMaxFrameSize() const {
555 return maxFrameSize_;
556 }
557
558 /**
559 * Set the maximum allowed frame size.
560 *
561 * @param maxFrameSize The new maximum frame size.
562 */
563 void setMaxFrameSize(size_t maxFrameSize) {
564 maxFrameSize_ = maxFrameSize;
565 }
566
567 /**
David Reiss01fe1532010-03-09 05:19:25 +0000568 * Get fraction of maximum limits before an overload condition is cleared.
569 *
570 * @return hysteresis fraction
571 */
572 double getOverloadHysteresis() const {
573 return overloadHysteresis_;
574 }
575
576 /**
577 * Set fraction of maximum limits before an overload condition is cleared.
578 * A good value would probably be between 0.5 and 0.9.
579 *
580 * @param hysteresisFraction fraction <= 1.0.
581 */
582 void setOverloadHysteresis(double hysteresisFraction) {
583 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
584 overloadHysteresis_ = hysteresisFraction;
585 }
586 }
587
588 /**
589 * Get the action the server will take on overload.
590 *
591 * @return a TOverloadAction enum value for the currently set action.
592 */
593 TOverloadAction getOverloadAction() const {
594 return overloadAction_;
595 }
596
597 /**
598 * Set the action the server is to take on overload.
599 *
600 * @param overloadAction a TOverloadAction enum value for the action.
601 */
602 void setOverloadAction(TOverloadAction overloadAction) {
603 overloadAction_ = overloadAction;
604 }
605
606 /**
David Reiss068f4162010-03-09 05:19:45 +0000607 * Get the time in milliseconds after which a task expires (0 == infinite).
608 *
609 * @return a 64-bit time in milliseconds.
610 */
611 int64_t getTaskExpireTime() const {
612 return taskExpireTime_;
613 }
614
615 /**
616 * Set the time in milliseconds after which a task expires (0 == infinite).
617 *
618 * @param taskExpireTime a 64-bit time in milliseconds.
619 */
620 void setTaskExpireTime(int64_t taskExpireTime) {
621 taskExpireTime_ = taskExpireTime;
622 }
623
624 /**
David Reiss01fe1532010-03-09 05:19:25 +0000625 * Determine if the server is currently overloaded.
626 * This function checks the maximums for open connections and connections
627 * currently in processing, and sets an overload condition if they are
628 * exceeded. The overload will persist until both values are below the
629 * current hysteresis fraction of their maximums.
630 *
631 * @return true if an overload condition exists, false if not.
632 */
633 bool serverOverloaded();
634
635 /** Pop and discard next task on threadpool wait queue.
636 *
637 * @return true if a task was discarded, false if the wait queue was empty.
638 */
639 bool drainPendingTask();
640
641 /**
David Reiss89a12942010-10-06 17:10:52 +0000642 * Get the starting size of a TConnection object's write buffer.
643 *
644 * @return # bytes we initialize a TConnection object's write buffer to.
645 */
646 size_t getWriteBufferDefaultSize() const {
647 return writeBufferDefaultSize_;
648 }
649
650 /**
651 * Set the starting size of a TConnection object's write buffer.
652 *
653 * @param size # bytes we initialize a TConnection object's write buffer to.
654 */
655 void setWriteBufferDefaultSize(size_t size) {
656 writeBufferDefaultSize_ = size;
657 }
658
659 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000660 * Get the maximum size of read buffer allocated to idle TConnection objects.
661 *
David Reiss89a12942010-10-06 17:10:52 +0000662 * @return # bytes beyond which we will dealloc idle buffer.
David Reiss54bec5d2010-10-06 17:10:45 +0000663 */
664 size_t getIdleReadBufferLimit() const {
665 return idleReadBufferLimit_;
666 }
667
668 /**
669 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
670 * Get the maximum size of read buffer allocated to idle TConnection objects.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000671 *
David Reiss89a12942010-10-06 17:10:52 +0000672 * @return # bytes beyond which we will dealloc idle buffer.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000673 */
674 size_t getIdleBufferMemLimit() const {
David Reiss54bec5d2010-10-06 17:10:45 +0000675 return idleReadBufferLimit_;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000676 }
677
678 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000679 * Set the maximum size read buffer allocated to idle TConnection objects.
680 * If a TConnection object is found (either on connection close or between
681 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000682 * allocated to its read buffer, we free it and allow it to be reinitialized
683 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000684 *
685 * @param limit of bytes beyond which we will shrink buffers when checked.
686 */
687 void setIdleReadBufferLimit(size_t limit) {
688 idleReadBufferLimit_ = limit;
689 }
690
691 /**
692 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
693 * Set the maximum size read buffer allocated to idle TConnection objects.
694 * If a TConnection object is found (either on connection close or between
695 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000696 * allocated to its read buffer, we free it and allow it to be reinitialized
697 * on the next received frame.
David Reiss54bec5d2010-10-06 17:10:45 +0000698 *
699 * @param limit of bytes beyond which we will shrink buffers when checked.
700 */
701 void setIdleBufferMemLimit(size_t limit) {
702 idleReadBufferLimit_ = limit;
703 }
704
Jake Farrellb0d95602011-12-06 01:17:26 +0000705
David Reiss54bec5d2010-10-06 17:10:45 +0000706
707 /**
708 * Get the maximum size of write buffer allocated to idle TConnection objects.
709 *
710 * @return # bytes beyond which we will reallocate buffers when checked.
711 */
712 size_t getIdleWriteBufferLimit() const {
713 return idleWriteBufferLimit_;
714 }
715
716 /**
717 * Set the maximum size write buffer allocated to idle TConnection objects.
718 * If a TConnection object is found (either on connection close or between
719 * calls when resizeBufferEveryN_ is set) with more than this much memory
David Reiss89a12942010-10-06 17:10:52 +0000720 * allocated to its write buffer, we destroy and construct that buffer with
721 * writeBufferDefaultSize_ bytes.
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000722 *
723 * @param limit of bytes beyond which we will shrink buffers when idle.
724 */
David Reiss54bec5d2010-10-06 17:10:45 +0000725 void setIdleWriteBufferLimit(size_t limit) {
726 idleWriteBufferLimit_ = limit;
Kevin Clarkcbcd63a2009-03-19 03:50:05 +0000727 }
728
David Reiss01fe1532010-03-09 05:19:25 +0000729 /**
David Reiss54bec5d2010-10-06 17:10:45 +0000730 * Get # of calls made between buffer size checks. 0 means disabled.
731 *
732 * @return # of calls between buffer size checks.
733 */
734 int32_t getResizeBufferEveryN() const {
735 return resizeBufferEveryN_;
736 }
737
738 /**
739 * Check buffer sizes every "count" calls. This allows buffer limits
740 * to be enforced for persistant connections with a controllable degree
741 * of overhead. 0 disables checks except at connection close.
742 *
743 * @param count the number of calls between checks, or 0 to disable
744 */
745 void setResizeBufferEveryN(int32_t count) {
746 resizeBufferEveryN_ = count;
747 }
748
Jake Farrellb0d95602011-12-06 01:17:26 +0000749 /**
750 * Main workhorse function, starts up the server listening on a port and
751 * loops over the libevent handler.
752 */
753 void serve();
David Reiss54bec5d2010-10-06 17:10:45 +0000754
Jake Farrellb0d95602011-12-06 01:17:26 +0000755 /**
756 * Causes the server to terminate gracefully (can be called from any thread).
757 */
758 void stop();
David Reiss54bec5d2010-10-06 17:10:45 +0000759
Jake Farrellb0d95602011-12-06 01:17:26 +0000760 /// Creates a socket to listen on and binds it to the local port.
761 void createAndListenOnSocket();
762
763 /**
764 * Takes a socket created by createAndListenOnSocket() and sets various
765 * options on it to prepare for use in the server.
766 *
767 * @param fd descriptor of socket to be initialized/
768 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400769 void listenSocket(THRIFT_SOCKET fd);
Roger Meier6f2a5032013-07-08 23:35:25 +0200770
771 /**
772 * Register the optional user-provided event-base (for single-thread servers)
773 *
774 * This method should be used when the server is running in a single-thread
775 * mode, and the event base is provided by the user (i.e., the caller).
776 *
777 * @param user_event_base the user-provided event-base. The user is
778 * responsible for freeing the event base memory.
779 */
780 void registerEvents(event_base* user_event_base);
781
782 /**
783 * Returns the optional user-provided event-base (for single-thread servers).
784 */
785 event_base* getUserEventBase() const { return userEventBase_; }
786
787 private:
788 /**
789 * Callback function that the threadmanager calls when a task reaches
790 * its expiration time. It is needed to clean up the expired connection.
791 *
792 * @param task the runnable associated with the expired task.
793 */
794 void expireClose(boost::shared_ptr<Runnable> task);
795
David Reiss54bec5d2010-10-06 17:10:45 +0000796 /**
David Reiss01fe1532010-03-09 05:19:25 +0000797 * Return an initialized connection object. Creates or recovers from
798 * pool a TConnection and initializes it with the provided socket FD
799 * and flags.
800 *
801 * @param socket FD of socket associated with this connection.
David Reiss105961d2010-10-06 17:10:17 +0000802 * @param addr the sockaddr of the client
803 * @param addrLen the length of addr
David Reiss01fe1532010-03-09 05:19:25 +0000804 * @return pointer to initialized TConnection object.
805 */
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400806 TConnection* createConnection(THRIFT_SOCKET socket, const sockaddr* addr,
Jake Farrellb0d95602011-12-06 01:17:26 +0000807 socklen_t addrLen);
Mark Slee2f6404d2006-10-10 01:37:40 +0000808
David Reiss01fe1532010-03-09 05:19:25 +0000809 /**
810 * Returns a connection to pool or deletion. If the connection pool
811 * (a stack) isn't full, place the connection object on it, otherwise
812 * just delete it.
813 *
814 * @param connection the TConection being returned.
815 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000816 void returnConnection(TConnection* connection);
Jake Farrellb0d95602011-12-06 01:17:26 +0000817};
Mark Slee2f6404d2006-10-10 01:37:40 +0000818
Jake Farrellb0d95602011-12-06 01:17:26 +0000819class TNonblockingIOThread : public Runnable {
820 public:
821 // Creates an IO thread and sets up the event base. The listenSocket should
822 // be a valid FD on which listen() has already been called. If the
823 // listenSocket is < 0, accepting will not be done.
824 TNonblockingIOThread(TNonblockingServer* server,
825 int number,
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400826 THRIFT_SOCKET listenSocket,
Jake Farrellb0d95602011-12-06 01:17:26 +0000827 bool useHighPriority);
828
829 ~TNonblockingIOThread();
830
831 // Returns the event-base for this thread.
832 event_base* getEventBase() const { return eventBase_; }
833
834 // Returns the server for this thread.
835 TNonblockingServer* getServer() const { return server_; }
836
837 // Returns the number of this IO thread.
838 int getThreadNumber() const { return number_; }
839
840 // Returns the thread id associated with this object. This should
841 // only be called after the thread has been started.
Roger Meier12d70532011-12-14 23:35:28 +0000842 Thread::id_t getThreadId() const { return threadId_; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000843
844 // Returns the send-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000845 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000846
847 // Returns the read-fd for task complete notifications.
Roger Meier12d70532011-12-14 23:35:28 +0000848 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
Jake Farrellb0d95602011-12-06 01:17:26 +0000849
850 // Returns the actual thread object associated with this IO thread.
851 boost::shared_ptr<Thread> getThread() const { return thread_; }
852
853 // Sets the actual thread object associated with this IO thread.
854 void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
855
856 // Used by TConnection objects to indicate processing has finished.
857 bool notify(TNonblockingServer::TConnection* conn);
858
859 // Enters the event loop and does not return until a call to stop().
860 virtual void run();
861
862 // Exits the event loop as soon as possible.
863 void stop();
864
865 // Ensures that the event-loop thread is fully finished and shut down.
866 void join();
867
Roger Meier6f2a5032013-07-08 23:35:25 +0200868 /// Registers the events for the notification & listen sockets
869 void registerEvents();
870
Jake Farrellb0d95602011-12-06 01:17:26 +0000871 private:
David Reiss01fe1532010-03-09 05:19:25 +0000872 /**
Jake Farrellb0d95602011-12-06 01:17:26 +0000873 * C-callable event handler for signaling task completion. Provides a
874 * callback that libevent can understand that will read a connection
875 * object's address from a pipe and call connection->transition() for
876 * that object.
David Reiss068f4162010-03-09 05:19:45 +0000877 *
Jake Farrellb0d95602011-12-06 01:17:26 +0000878 * @param fd the descriptor the event occurred on.
David Reiss068f4162010-03-09 05:19:45 +0000879 */
Roger Meier12d70532011-12-14 23:35:28 +0000880 static void notifyHandler(evutil_socket_t fd, short which, void* v);
David Reiss068f4162010-03-09 05:19:45 +0000881
882 /**
David Reiss01fe1532010-03-09 05:19:25 +0000883 * C-callable event handler for listener events. Provides a callback
884 * that libevent can understand which invokes server->handleEvent().
885 *
886 * @param fd the descriptor the event occured on.
887 * @param which the flags associated with the event.
888 * @param v void* callback arg where we placed TNonblockingServer's "this".
889 */
Jake Farrellb0d95602011-12-06 01:17:26 +0000890 static void listenHandler(evutil_socket_t fd, short which, void* v) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000891 ((TNonblockingServer*)v)->handleEvent(fd, which);
892 }
893
Jake Farrellb0d95602011-12-06 01:17:26 +0000894 /// Exits the loop ASAP in case of shutdown or error.
895 void breakLoop(bool error);
Mark Slee79b16942007-11-26 19:05:29 +0000896
David Reiss01fe1532010-03-09 05:19:25 +0000897 /// Create the pipe used to notify I/O process of task completion.
898 void createNotificationPipe();
899
Jake Farrellb0d95602011-12-06 01:17:26 +0000900 /// Unregisters our events for notification and listen sockets.
901 void cleanupEvents();
David Reiss01fe1532010-03-09 05:19:25 +0000902
Jake Farrellb0d95602011-12-06 01:17:26 +0000903 /// Sets (or clears) high priority scheduling status for the current thread.
904 void setCurrentThreadHighPriority(bool value);
David Reiss01fe1532010-03-09 05:19:25 +0000905
Jake Farrellb0d95602011-12-06 01:17:26 +0000906 private:
907 /// associated server
908 TNonblockingServer* server_;
Mark Slee79b16942007-11-26 19:05:29 +0000909
Jake Farrellb0d95602011-12-06 01:17:26 +0000910 /// thread number (for debugging).
911 const int number_;
Bryan Duxbury76c43682011-08-24 21:26:48 +0000912
Jake Farrellb0d95602011-12-06 01:17:26 +0000913 /// The actual physical thread id.
Roger Meier12d70532011-12-14 23:35:28 +0000914 Thread::id_t threadId_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000915
916 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400917 THRIFT_SOCKET listenSocket_;
Jake Farrellb0d95602011-12-06 01:17:26 +0000918
919 /// Sets a high scheduling priority when running
920 bool useHighPriority_;
921
922 /// pointer to eventbase to be used for looping
923 event_base* eventBase_;
924
Roger Meier6f2a5032013-07-08 23:35:25 +0200925 /// Set to true if this class is responsible for freeing the event base
926 /// memory.
927 bool ownEventBase_;
928
Jake Farrellb0d95602011-12-06 01:17:26 +0000929 /// Used with eventBase_ for connection events (only in listener thread)
930 struct event serverEvent_;
931
932 /// Used with eventBase_ for task completion notification
933 struct event notificationEvent_;
934
935 /// File descriptors for pipe used for task completion notification.
Roger Meier12d70532011-12-14 23:35:28 +0000936 evutil_socket_t notificationPipeFDs_[2];
Jake Farrellb0d95602011-12-06 01:17:26 +0000937
938 /// Actual IO Thread
939 boost::shared_ptr<Thread> thread_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000940};
941
T Jake Lucianib5e62212009-01-31 22:36:20 +0000942}}} // apache::thrift::server
Mark Slee2f6404d2006-10-10 01:37:40 +0000943
Jake Farrellb0d95602011-12-06 01:17:26 +0000944#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_