blob: 9b47aa539d72f651c4b2db6b3e8cc43454df7687 [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <cstring>
21#include <sys/socket.h>
22#include <sys/poll.h>
23#include <sys/types.h>
24#include <netinet/in.h>
25#include <netinet/tcp.h>
26#include <netdb.h>
27#include <fcntl.h>
28#include <errno.h>
29
30#include "TSocket.h"
31#include "TServerSocket.h"
32#include <boost/shared_ptr.hpp>
33
34namespace apache { namespace thrift { namespace transport {
35
36using namespace std;
37using boost::shared_ptr;
38
39TServerSocket::TServerSocket(int port) :
40 port_(port),
41 serverSocket_(-1),
42 acceptBacklog_(1024),
43 sendTimeout_(0),
44 recvTimeout_(0),
45 retryLimit_(0),
46 retryDelay_(0),
47 tcpSendBuffer_(0),
48 tcpRecvBuffer_(0),
49 intSock1_(-1),
50 intSock2_(-1) {}
51
52TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
53 port_(port),
54 serverSocket_(-1),
55 acceptBacklog_(1024),
56 sendTimeout_(sendTimeout),
57 recvTimeout_(recvTimeout),
58 retryLimit_(0),
59 retryDelay_(0),
60 tcpSendBuffer_(0),
61 tcpRecvBuffer_(0),
62 intSock1_(-1),
63 intSock2_(-1) {}
64
65TServerSocket::~TServerSocket() {
66 close();
67}
68
69void TServerSocket::setSendTimeout(int sendTimeout) {
70 sendTimeout_ = sendTimeout;
71}
72
73void TServerSocket::setRecvTimeout(int recvTimeout) {
74 recvTimeout_ = recvTimeout;
75}
76
77void TServerSocket::setRetryLimit(int retryLimit) {
78 retryLimit_ = retryLimit;
79}
80
81void TServerSocket::setRetryDelay(int retryDelay) {
82 retryDelay_ = retryDelay;
83}
84
85void TServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
86 tcpSendBuffer_ = tcpSendBuffer;
87}
88
89void TServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
90 tcpRecvBuffer_ = tcpRecvBuffer;
91}
92
93void TServerSocket::listen() {
94 int sv[2];
95 if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
96 GlobalOutput.perror("TServerSocket::listen() socketpair() ", errno);
97 intSock1_ = -1;
98 intSock2_ = -1;
99 } else {
100 intSock1_ = sv[1];
101 intSock2_ = sv[0];
102 }
103
104 struct addrinfo hints, *res, *res0;
105 int error;
106 char port[sizeof("65536") + 1];
107 std::memset(&hints, 0, sizeof(hints));
108 hints.ai_family = PF_UNSPEC;
109 hints.ai_socktype = SOCK_STREAM;
110 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
111 sprintf(port, "%d", port_);
112
113 // Wildcard address
114 error = getaddrinfo(NULL, port, &hints, &res0);
115 if (error) {
116 GlobalOutput.printf("getaddrinfo %d: %s", error, gai_strerror(error));
117 close();
118 throw TTransportException(TTransportException::NOT_OPEN, "Could not resolve host for server socket.");
119 }
120
121 // Pick the ipv6 address first since ipv4 addresses can be mapped
122 // into ipv6 space.
123 for (res = res0; res; res = res->ai_next) {
124 if (res->ai_family == AF_INET6 || res->ai_next == NULL)
125 break;
126 }
127
128 serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
129 if (serverSocket_ == -1) {
130 int errno_copy = errno;
131 GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
132 close();
133 throw TTransportException(TTransportException::NOT_OPEN, "Could not create server socket.", errno_copy);
134 }
135
136 // Set reusaddress to prevent 2MSL delay on accept
137 int one = 1;
138 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
139 &one, sizeof(one))) {
140 int errno_copy = errno;
141 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_REUSEADDR ", errno_copy);
142 close();
143 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_REUSEADDR", errno_copy);
144 }
145
146 // Set TCP buffer sizes
147 if (tcpSendBuffer_ > 0) {
148 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_SNDBUF,
149 &tcpSendBuffer_, sizeof(tcpSendBuffer_))) {
150 int errno_copy = errno;
151 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
152 close();
153 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_SNDBUF", errno_copy);
154 }
155 }
156
157 if (tcpRecvBuffer_ > 0) {
158 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_RCVBUF,
159 &tcpRecvBuffer_, sizeof(tcpRecvBuffer_))) {
160 int errno_copy = errno;
161 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
162 close();
163 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_RCVBUF", errno_copy);
164 }
165 }
166
167 // Defer accept
168 #ifdef TCP_DEFER_ACCEPT
169 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
170 &one, sizeof(one))) {
171 int errno_copy = errno;
172 GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_DEFER_ACCEPT ", errno_copy);
173 close();
174 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_DEFER_ACCEPT", errno_copy);
175 }
176 #endif // #ifdef TCP_DEFER_ACCEPT
177
178 #ifdef IPV6_V6ONLY
179 int zero = 0;
180 if (-1 == setsockopt(serverSocket_, IPPROTO_IPV6, IPV6_V6ONLY,
181 &zero, sizeof(zero))) {
182 GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY ", errno);
183 }
184 #endif // #ifdef IPV6_V6ONLY
185
186 // Turn linger off, don't want to block on calls to close
187 struct linger ling = {0, 0};
188 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
189 &ling, sizeof(ling))) {
190 int errno_copy = errno;
191 GlobalOutput.perror("TServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
192 close();
193 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
194 }
195
196 // TCP Nodelay, speed over bandwidth
197 if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
198 &one, sizeof(one))) {
199 int errno_copy = errno;
200 GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
201 close();
202 throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
203 }
204
205 // Set NONBLOCK on the accept socket
206 int flags = fcntl(serverSocket_, F_GETFL, 0);
207 if (flags == -1) {
208 int errno_copy = errno;
209 GlobalOutput.perror("TServerSocket::listen() fcntl() F_GETFL ", errno_copy);
210 throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
211 }
212
213 if (-1 == fcntl(serverSocket_, F_SETFL, flags | O_NONBLOCK)) {
214 int errno_copy = errno;
215 GlobalOutput.perror("TServerSocket::listen() fcntl() O_NONBLOCK ", errno_copy);
216 throw TTransportException(TTransportException::NOT_OPEN, "fcntl() failed", errno_copy);
217 }
218
219 // prepare the port information
220 // we may want to try to bind more than once, since SO_REUSEADDR doesn't
221 // always seem to work. The client can configure the retry variables.
222 int retries = 0;
223 do {
224 if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
225 break;
226 }
227
228 // use short circuit evaluation here to only sleep if we need to
229 } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
230
231 // free addrinfo
232 freeaddrinfo(res0);
233
234 // throw an error if we failed to bind properly
235 if (retries > retryLimit_) {
236 char errbuf[1024];
237 sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
238 GlobalOutput(errbuf);
239 close();
240 throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
241 }
242
243 // Call listen
244 if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
245 int errno_copy = errno;
246 GlobalOutput.perror("TServerSocket::listen() listen() ", errno_copy);
247 close();
248 throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
249 }
250
251 // The socket is now listening!
252}
253
254shared_ptr<TTransport> TServerSocket::acceptImpl() {
255 if (serverSocket_ < 0) {
256 throw TTransportException(TTransportException::NOT_OPEN, "TServerSocket not listening");
257 }
258
259 struct pollfd fds[2];
260
261 int maxEintrs = 5;
262 int numEintrs = 0;
263
264 while (true) {
265 std::memset(fds, 0 , sizeof(fds));
266 fds[0].fd = serverSocket_;
267 fds[0].events = POLLIN;
268 if (intSock2_ >= 0) {
269 fds[1].fd = intSock2_;
270 fds[1].events = POLLIN;
271 }
272 int ret = poll(fds, 2, -1);
273
274 if (ret < 0) {
275 // error cases
276 if (errno == EINTR && (numEintrs++ < maxEintrs)) {
277 // EINTR needs to be handled manually and we can tolerate
278 // a certain number
279 continue;
280 }
281 int errno_copy = errno;
282 GlobalOutput.perror("TServerSocket::acceptImpl() poll() ", errno_copy);
283 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
284 } else if (ret > 0) {
285 // Check for an interrupt signal
286 if (intSock2_ >= 0 && (fds[1].revents & POLLIN)) {
287 int8_t buf;
288 if (-1 == recv(intSock2_, &buf, sizeof(int8_t), 0)) {
289 GlobalOutput.perror("TServerSocket::acceptImpl() recv() interrupt ", errno);
290 }
291 throw TTransportException(TTransportException::INTERRUPTED);
292 }
293
294 // Check for the actual server socket being ready
295 if (fds[0].revents & POLLIN) {
296 break;
297 }
298 } else {
299 GlobalOutput("TServerSocket::acceptImpl() poll 0");
300 throw TTransportException(TTransportException::UNKNOWN);
301 }
302 }
303
304 struct sockaddr_storage clientAddress;
305 int size = sizeof(clientAddress);
306 int clientSocket = ::accept(serverSocket_,
307 (struct sockaddr *) &clientAddress,
308 (socklen_t *) &size);
309
310 if (clientSocket < 0) {
311 int errno_copy = errno;
312 GlobalOutput.perror("TServerSocket::acceptImpl() ::accept() ", errno_copy);
313 throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
314 }
315
316 // Make sure client socket is blocking
317 int flags = fcntl(clientSocket, F_GETFL, 0);
318 if (flags == -1) {
319 int errno_copy = errno;
320 GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_GETFL ", errno_copy);
321 throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_GETFL)", errno_copy);
322 }
323
324 if (-1 == fcntl(clientSocket, F_SETFL, flags & ~O_NONBLOCK)) {
325 int errno_copy = errno;
326 GlobalOutput.perror("TServerSocket::acceptImpl() fcntl() F_SETFL ~O_NONBLOCK ", errno_copy);
327 throw TTransportException(TTransportException::UNKNOWN, "fcntl(F_SETFL)", errno_copy);
328 }
329
330 shared_ptr<TSocket> client(new TSocket(clientSocket));
331 if (sendTimeout_ > 0) {
332 client->setSendTimeout(sendTimeout_);
333 }
334 if (recvTimeout_ > 0) {
335 client->setRecvTimeout(recvTimeout_);
336 }
337
338 return client;
339}
340
341void TServerSocket::interrupt() {
342 if (intSock1_ >= 0) {
343 int8_t byte = 0;
344 if (-1 == send(intSock1_, &byte, sizeof(int8_t), 0)) {
345 GlobalOutput.perror("TServerSocket::interrupt() send() ", errno);
346 }
347 }
348}
349
350void TServerSocket::close() {
351 if (serverSocket_ >= 0) {
352 shutdown(serverSocket_, SHUT_RDWR);
353 ::close(serverSocket_);
354 }
355 if (intSock1_ >= 0) {
356 ::close(intSock1_);
357 }
358 if (intSock2_ >= 0) {
359 ::close(intSock2_);
360 }
361 serverSocket_ = -1;
362 intSock1_ = -1;
363 intSock2_ = -1;
364}
365
366}}} // apache::thrift::transport