blob: 1684b64a04587bf37fcad770574edceba51c3d4a [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
23#include <Thrift.h>
24#include <server/TServer.h>
25#include <transport/TBufferTransports.h>
26#include <concurrency/ThreadManager.h>
27#include <stack>
28#include <string>
29#include <errno.h>
30#include <cstdlib>
31#include <event.h>
32
33namespace apache { namespace thrift { namespace server {
34
35using apache::thrift::transport::TMemoryBuffer;
36using apache::thrift::protocol::TProtocol;
37using apache::thrift::concurrency::Runnable;
38using apache::thrift::concurrency::ThreadManager;
39
40// Forward declaration of class
41class TConnection;
42
43/**
44 * This is a non-blocking server in C++ for high performance that operates a
45 * single IO thread. It assumes that all incoming requests are framed with a
46 * 4 byte length indicator and writes out responses using the same framing.
47 *
48 * It does not use the TServerTransport framework, but rather has socket
49 * operations hardcoded for use with select.
50 *
51 */
52class TNonblockingServer : public TServer {
53 private:
54
55 // Listen backlog
56 static const int LISTEN_BACKLOG = 1024;
57
58 // Default limit on size of idle connection pool
59 static const size_t CONNECTION_STACK_LIMIT = 1024;
60
61 // Maximum size of buffer allocated to idle connection
62 static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
63
64 // Server socket file descriptor
65 int serverSocket_;
66
67 // Port server runs on
68 int port_;
69
70 // For processing via thread pool, may be NULL
71 boost::shared_ptr<ThreadManager> threadManager_;
72
73 // Is thread pool processing?
74 bool threadPoolProcessing_;
75
76 // The event base for libevent
77 event_base* eventBase_;
78
79 // Event struct, for use with eventBase_
80 struct event serverEvent_;
81
82 // Number of TConnection object we've created
83 size_t numTConnections_;
84
85 // Limit for how many TConnection objects to cache
86 size_t connectionStackLimit_;
87
88 /**
89 * Max read buffer size for an idle connection. When we place an idle
90 * TConnection into connectionStack_, we insure that its read buffer is
91 * reduced to this size to insure that idle connections don't hog memory.
92 */
93 uint32_t idleBufferMemLimit_;
94
95 /**
96 * This is a stack of all the objects that have been created but that
97 * are NOT currently in use. When we close a connection, we place it on this
98 * stack so that the object can be reused later, rather than freeing the
99 * memory and reallocating a new object later.
100 */
101 std::stack<TConnection*> connectionStack_;
102
103 void handleEvent(int fd, short which);
104
105 public:
106 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
107 int port) :
108 TServer(processor),
109 serverSocket_(-1),
110 port_(port),
111 threadPoolProcessing_(false),
112 eventBase_(NULL),
113 numTConnections_(0),
114 connectionStackLimit_(CONNECTION_STACK_LIMIT),
115 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
116
117 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
118 boost::shared_ptr<TProtocolFactory> protocolFactory,
119 int port,
120 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
121 TServer(processor),
122 serverSocket_(-1),
123 port_(port),
124 threadManager_(threadManager),
125 eventBase_(NULL),
126 numTConnections_(0),
127 connectionStackLimit_(CONNECTION_STACK_LIMIT),
128 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
129 setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
130 setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
131 setInputProtocolFactory(protocolFactory);
132 setOutputProtocolFactory(protocolFactory);
133 setThreadManager(threadManager);
134 }
135
136 TNonblockingServer(boost::shared_ptr<TProcessor> processor,
137 boost::shared_ptr<TTransportFactory> inputTransportFactory,
138 boost::shared_ptr<TTransportFactory> outputTransportFactory,
139 boost::shared_ptr<TProtocolFactory> inputProtocolFactory,
140 boost::shared_ptr<TProtocolFactory> outputProtocolFactory,
141 int port,
142 boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
143 TServer(processor),
144 serverSocket_(0),
145 port_(port),
146 threadManager_(threadManager),
147 eventBase_(NULL),
148 numTConnections_(0),
149 connectionStackLimit_(CONNECTION_STACK_LIMIT),
150 idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
151 setInputTransportFactory(inputTransportFactory);
152 setOutputTransportFactory(outputTransportFactory);
153 setInputProtocolFactory(inputProtocolFactory);
154 setOutputProtocolFactory(outputProtocolFactory);
155 setThreadManager(threadManager);
156 }
157
158 ~TNonblockingServer() {}
159
160 void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
161 threadManager_ = threadManager;
162 threadPoolProcessing_ = (threadManager != NULL);
163 }
164
165 boost::shared_ptr<ThreadManager> getThreadManager() {
166 return threadManager_;
167 }
168
169 /**
170 * Get the maximum number of unused TConnection we will hold in reserve.
171 *
172 * @return the current limit on TConnection pool size.
173 */
174 size_t getConnectionStackLimit() const {
175 return connectionStackLimit_;
176 }
177
178 /**
179 * Set the maximum number of unused TConnection we will hold in reserve.
180 *
181 * @param sz the new limit for TConnection pool size.
182 */
183 void setConnectionStackLimit(size_t sz) {
184 connectionStackLimit_ = sz;
185 }
186
187 bool isThreadPoolProcessing() const {
188 return threadPoolProcessing_;
189 }
190
191 void addTask(boost::shared_ptr<Runnable> task) {
192 threadManager_->add(task);
193 }
194
195 event_base* getEventBase() const {
196 return eventBase_;
197 }
198
199 void incrementNumConnections() {
200 ++numTConnections_;
201 }
202
203 void decrementNumConnections() {
204 --numTConnections_;
205 }
206
207 size_t getNumConnections() {
208 return numTConnections_;
209 }
210
211 size_t getNumIdleConnections() {
212 return connectionStack_.size();
213 }
214
215 /**
216 * Get the maximum limit of memory allocated to idle TConnection objects.
217 *
218 * @return # bytes beyond which we will shrink buffers when idle.
219 */
220 size_t getIdleBufferMemLimit() const {
221 return idleBufferMemLimit_;
222 }
223
224 /**
225 * Set the maximum limit of memory allocated to idle TConnection objects.
226 * If a TConnection object goes idle with more than this much memory
227 * allocated to its buffer, we shrink it to this value.
228 *
229 * @param limit of bytes beyond which we will shrink buffers when idle.
230 */
231 void setIdleBufferMemLimit(size_t limit) {
232 idleBufferMemLimit_ = limit;
233 }
234
235 TConnection* createConnection(int socket, short flags);
236
237 void returnConnection(TConnection* connection);
238
239 static void eventHandler(int fd, short which, void* v) {
240 ((TNonblockingServer*)v)->handleEvent(fd, which);
241 }
242
243 void listenSocket();
244
245 void listenSocket(int fd);
246
247 void registerEvents(event_base* base);
248
249 void serve();
250};
251
252/**
253 * Two states for sockets, recv and send mode
254 */
255enum TSocketState {
256 SOCKET_RECV,
257 SOCKET_SEND
258};
259
260/**
261 * Four states for the nonblocking servr:
262 * 1) initialize
263 * 2) read 4 byte frame size
264 * 3) read frame of data
265 * 4) send back data (if any)
266 */
267enum TAppState {
268 APP_INIT,
269 APP_READ_FRAME_SIZE,
270 APP_READ_REQUEST,
271 APP_WAIT_TASK,
272 APP_SEND_RESULT
273};
274
275/**
276 * Represents a connection that is handled via libevent. This connection
277 * essentially encapsulates a socket that has some associated libevent state.
278 */
279class TConnection {
280 private:
281
282 class Task;
283
284 // Server handle
285 TNonblockingServer* server_;
286
287 // Socket handle
288 int socket_;
289
290 // Libevent object
291 struct event event_;
292
293 // Libevent flags
294 short eventFlags_;
295
296 // Socket mode
297 TSocketState socketState_;
298
299 // Application state
300 TAppState appState_;
301
302 // How much data needed to read
303 uint32_t readWant_;
304
305 // Where in the read buffer are we
306 uint32_t readBufferPos_;
307
308 // Read buffer
309 uint8_t* readBuffer_;
310
311 // Read buffer size
312 uint32_t readBufferSize_;
313
314 // Write buffer
315 uint8_t* writeBuffer_;
316
317 // Write buffer size
318 uint32_t writeBufferSize_;
319
320 // How far through writing are we?
321 uint32_t writeBufferPos_;
322
323 // How many times have we read since our last buffer reset?
324 uint32_t numReadsSinceReset_;
325
326 // How many times have we written since our last buffer reset?
327 uint32_t numWritesSinceReset_;
328
329 // Task handle
330 int taskHandle_;
331
332 // Task event
333 struct event taskEvent_;
334
335 // Transport to read from
336 boost::shared_ptr<TMemoryBuffer> inputTransport_;
337
338 // Transport that processor writes to
339 boost::shared_ptr<TMemoryBuffer> outputTransport_;
340
341 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
342 boost::shared_ptr<TTransport> factoryInputTransport_;
343 boost::shared_ptr<TTransport> factoryOutputTransport_;
344
345 // Protocol decoder
346 boost::shared_ptr<TProtocol> inputProtocol_;
347
348 // Protocol encoder
349 boost::shared_ptr<TProtocol> outputProtocol_;
350
351 // Go into read mode
352 void setRead() {
353 setFlags(EV_READ | EV_PERSIST);
354 }
355
356 // Go into write mode
357 void setWrite() {
358 setFlags(EV_WRITE | EV_PERSIST);
359 }
360
361 // Set socket idle
362 void setIdle() {
363 setFlags(0);
364 }
365
366 // Set event flags
367 void setFlags(short eventFlags);
368
369 // Libevent handlers
370 void workSocket();
371
372 // Close this client and reset
373 void close();
374
375 public:
376
377 // Constructor
378 TConnection(int socket, short eventFlags, TNonblockingServer *s) {
379 readBuffer_ = (uint8_t*)std::malloc(1024);
380 if (readBuffer_ == NULL) {
381 throw new apache::thrift::TException("Out of memory.");
382 }
383 readBufferSize_ = 1024;
384
385 numReadsSinceReset_ = 0;
386 numWritesSinceReset_ = 0;
387
388 // Allocate input and output tranpsorts
389 // these only need to be allocated once per TConnection (they don't need to be
390 // reallocated on init() call)
391 inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
392 outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
393
394 init(socket, eventFlags, s);
395 server_->incrementNumConnections();
396 }
397
398 ~TConnection() {
399 server_->decrementNumConnections();
400 }
401
402 /**
403 * Check read buffer against a given limit and shrink it if exceeded.
404 *
405 * @param limit we limit buffer size to.
406 */
407 void checkIdleBufferMemLimit(uint32_t limit);
408
409 // Initialize
410 void init(int socket, short eventFlags, TNonblockingServer *s);
411
412 // Transition into a new state
413 void transition();
414
415 // Handler wrapper
416 static void eventHandler(int fd, short /* which */, void* v) {
417 assert(fd == ((TConnection*)v)->socket_);
418 ((TConnection*)v)->workSocket();
419 }
420
421 // Handler wrapper for task block
422 static void taskHandler(int fd, short /* which */, void* v) {
423 assert(fd == ((TConnection*)v)->taskHandle_);
424 if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
425 GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
426 }
427 ((TConnection*)v)->transition();
428 }
429
430};
431
432}}} // apache::thrift::server
433
434#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_