blob: c8bfcb5c82f59a60b6b5a38f9e0335734d299b9a [file] [log] [blame]
Mark Slee2f6404d2006-10-10 01:37:40 +00001#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
2#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
3
Mark Slee4af6ed72006-10-25 19:02:49 +00004#include <Thrift.h>
5#include <server/TServer.h>
6#include <transport/TTransportUtils.h>
Mark Slee2f6404d2006-10-10 01:37:40 +00007#include <stack>
8#include <event.h>
9
Mark Slee2f6404d2006-10-10 01:37:40 +000010namespace facebook { namespace thrift { namespace server {
11
12using boost::shared_ptr;
13
14// Forward declaration of class
15class TConnection;
16
17/**
18 * This is a non-blocking server in C++ for high performance that operates a
19 * single IO thread. It assumes that all incoming requests are framed with a
20 * 4 byte length indicator and writes out responses using the same framing.
21 *
22 * It does not use the TServerTransport framework, but rather has socket
23 * operations hardcoded for use with select.
24 *
25 * @author Mark Slee <mcslee@facebook.com>
26 */
27class TNonblockingServer : public TServer {
28 private:
29
30 // Listen backlog
31 static const int LISTEN_BACKLOG = 1024;
32
33 // Server socket file descriptor
34 int serverSocket_;
35
36 // Port server runs on
37 int port_;
38
Mark Slee92f00fb2006-10-25 01:28:17 +000039 // Whether to frame responses
40 bool frameResponses_;
41
Mark Slee2f6404d2006-10-10 01:37:40 +000042 /**
43 * This is a stack of all the objects that have been created but that
44 * are NOT currently in use. When we close a connection, we place it on this
45 * stack so that the object can be reused later, rather than freeing the
46 * memory and reallocating a new object later.
47 */
48 std::stack<TConnection*> connectionStack_;
49
50 void handleEvent(int fd, short which);
51
52 public:
Aditya Agarwal1ea90522007-01-19 02:02:12 +000053 TNonblockingServer(shared_ptr<TProcessor> processor,
54 shared_ptr<TProtocolFactory> protocolFactory,
55 int port) :
56 TServer(processor, protocolFactory),
Mark Slee92f00fb2006-10-25 01:28:17 +000057 serverSocket_(0),
58 port_(port),
59 frameResponses_(true) {}
Aditya Agarwal1ea90522007-01-19 02:02:12 +000060
61 TNonblockingServer(shared_ptr<TProcessor> processor,
62 shared_ptr<TProtocolFactory> protocolFactory,
63 shared_ptr<TTransportFactory> transportFactory,
64 int port) :
65 TServer(processor, protocolFactory, transportFactory),
66 serverSocket_(0),
67 port_(port),
68 frameResponses_(true) {}
69
Mark Slee2f6404d2006-10-10 01:37:40 +000070 ~TNonblockingServer() {}
71
Mark Slee92f00fb2006-10-25 01:28:17 +000072 void setFrameResponses(bool frameResponses) {
73 frameResponses_ = frameResponses;
74 }
75
76 bool getFrameResponses() {
77 return frameResponses_;
78 }
79
Mark Slee2f6404d2006-10-10 01:37:40 +000080 TConnection* createConnection(int socket, short flags);
81
82 void returnConnection(TConnection* connection);
83
84 static void eventHandler(int fd, short which, void* v) {
85 ((TNonblockingServer*)v)->handleEvent(fd, which);
86 }
87
88 void serve();
89};
90
91/**
92 * Two states for sockets, recv and send mode
93 */
94enum TSocketState {
95 SOCKET_RECV,
96 SOCKET_SEND
97};
98
99/**
100 * Four states for the nonblocking servr:
101 * 1) initialize
102 * 2) read 4 byte frame size
103 * 3) read frame of data
104 * 4) send back data (if any)
105 */
106enum TAppState {
107 APP_INIT,
108 APP_READ_FRAME_SIZE,
109 APP_READ_REQUEST,
Mark Slee92f00fb2006-10-25 01:28:17 +0000110 APP_SEND_FRAME_SIZE,
Mark Slee2f6404d2006-10-10 01:37:40 +0000111 APP_SEND_RESULT
112};
113
114/**
115 * Represents a connection that is handled via libevent. This connection
116 * essentially encapsulates a socket that has some associated libevent state.
117 */
118class TConnection {
119 private:
120
121 // Server handle
122 TNonblockingServer* server_;
123
124 // Socket handle
125 int socket_;
126
127 // Libevent object
128 struct event event_;
129
130 // Libevent flags
131 short eventFlags_;
132
133 // Socket mode
134 TSocketState socketState_;
135
136 // Application state
137 TAppState appState_;
138
139 // How much data needed to read
140 uint32_t readWant_;
141
142 // Where in the read buffer are we
143 uint32_t readBufferPos_;
144
145 // Read buffer
146 uint8_t* readBuffer_;
147
148 // Read buffer size
149 uint32_t readBufferSize_;
150
151 // Write buffer
152 uint8_t* writeBuffer_;
153
154 // Write buffer size
155 uint32_t writeBufferSize_;
156
157 // How far through writing are we?
158 uint32_t writeBufferPos_;
159
Mark Slee92f00fb2006-10-25 01:28:17 +0000160 // Frame size
161 int32_t frameSize_;
162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 // Transport to read from
164 shared_ptr<TMemoryBuffer> inputTransport_;
165
166 // Transport that processor writes to
167 shared_ptr<TMemoryBuffer> outputTransport_;
168
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000169 // extra transport generated by transport factory (e.g. BufferedRouterTransport)
170 shared_ptr<TTransport> factoryInputTransport_;
171 // shared_ptr<TTransport> factoryOutputTransport_;
172
Mark Slee4af6ed72006-10-25 19:02:49 +0000173 // Protocol encoder
174 shared_ptr<TProtocol> outputProtocol_;
175
176 // Protocol decoder
177 shared_ptr<TProtocol> inputProtocol_;
178
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 // Go into read mode
180 void setRead() {
181 setFlags(EV_READ | EV_PERSIST);
182 }
183
184 // Go into write mode
185 void setWrite() {
186 setFlags(EV_WRITE | EV_PERSIST);
187 }
188
189 // Set event flags
190 void setFlags(short eventFlags);
191
192 // Libevent handlers
193 void workSocket();
194
195 // Close this client and reset
196 void close();
197
198 public:
199
200 // Constructor
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000201 TConnection(int socket, short eventFlags, TNonblockingServer *s,
202 shared_ptr<TTransportFactory> transportFactory) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000203 readBuffer_ = (uint8_t*)malloc(1024);
204 if (readBuffer_ == NULL) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000205 throw new facebook::thrift::TException("Out of memory.");
Mark Slee2f6404d2006-10-10 01:37:40 +0000206 }
207 readBufferSize_ = 1024;
208
209 // Allocate input and output tranpsorts
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000210 // these only need to be allocated once per TConnection (they don't need to be
211 // reallocated on init() call)
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
213 outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
Aditya Agarwal1ea90522007-01-19 02:02:12 +0000214
Mark Slee2f6404d2006-10-10 01:37:40 +0000215 init(socket, eventFlags, s);
216 }
217
218 // Initialize
219 void init(int socket, short eventFlags, TNonblockingServer *s);
220
221 // Transition into a new state
222 void transition();
223
224 // Handler wrapper
225 static void eventHandler(int fd, short which, void* v) {
226 assert(fd = ((TConnection*)v)->socket_);
227 ((TConnection*)v)->workSocket();
228 }
229};
230
231}}} // facebook::thrift::server
232
233#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_