blob: e3615caf243d966a6a55aa9f520166f134139487 [file] [log] [blame]
Marc Slemkoe03da182006-07-21 21:32:36 +00001#include <config.h>
Mark Sleee8540632006-05-30 09:24:40 +00002#include <sys/socket.h>
3#include <arpa/inet.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <netdb.h>
7#include <unistd.h>
8#include <errno.h>
Mark Slee29050782006-09-29 00:12:30 +00009#include <fcntl.h>
10#include <sys/select.h>
Mark Sleee8540632006-05-30 09:24:40 +000011
Mark Slee29050782006-09-29 00:12:30 +000012#include "concurrency/Monitor.h"
Marc Slemkod42a2c22006-08-10 03:30:18 +000013#include "TSocket.h"
14#include "TTransportException.h"
Mark Sleee8540632006-05-30 09:24:40 +000015
Marc Slemko6f038a72006-08-03 18:58:09 +000016namespace facebook { namespace thrift { namespace transport {
17
Mark Sleee8540632006-05-30 09:24:40 +000018using namespace std;
Mark Slee29050782006-09-29 00:12:30 +000019using namespace facebook::thrift::concurrency;
Mark Sleee8540632006-05-30 09:24:40 +000020
Mark Slee29050782006-09-29 00:12:30 +000021// Global var to track total socket sys calls
Mark Slee8d7e1f62006-06-07 06:48:56 +000022uint32_t g_socket_syscalls = 0;
23
24/**
25 * TSocket implementation.
26 *
27 * @author Mark Slee <mcslee@facebook.com>
28 */
29
Mark Sleee8540632006-05-30 09:24:40 +000030// Mutex to protect syscalls to netdb
Mark Slee29050782006-09-29 00:12:30 +000031static Monitor s_netdb_monitor;
Mark Sleee8540632006-05-30 09:24:40 +000032
33// TODO(mcslee): Make this an option to the socket class
34#define MAX_RECV_RETRIES 20
Mark Slee29050782006-09-29 00:12:30 +000035
36TSocket::TSocket(string host, int port) :
37 host_(host),
38 port_(port),
39 socket_(0),
40 connTimeout_(0),
41 sendTimeout_(0),
42 recvTimeout_(0),
43 lingerOn_(1),
44 lingerVal_(0),
45 noDelay_(1) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000046 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
47 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Sleee8540632006-05-30 09:24:40 +000048}
49
Aditya Agarwalebc99e02007-01-15 23:14:58 +000050TSocket::TSocket() :
51 host_(""),
52 port_(0),
53 socket_(0),
54 connTimeout_(0),
55 sendTimeout_(0),
56 recvTimeout_(0),
57 lingerOn_(1),
58 lingerVal_(0),
59 noDelay_(1) {
60 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
61 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
62}
63
Mark Slee29050782006-09-29 00:12:30 +000064TSocket::TSocket(int socket) :
65 host_(""),
66 port_(0),
67 socket_(socket),
68 connTimeout_(0),
69 sendTimeout_(0),
70 recvTimeout_(0),
71 lingerOn_(1),
72 lingerVal_(0),
73 noDelay_(1) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000074 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
75 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Slee29050782006-09-29 00:12:30 +000076}
77
Mark Sleee8540632006-05-30 09:24:40 +000078TSocket::~TSocket() {
79 close();
80}
81
Mark Slee8d7e1f62006-06-07 06:48:56 +000082bool TSocket::isOpen() {
83 return (socket_ > 0);
84}
85
Mark Sleeb9ff32a2006-11-16 01:00:24 +000086bool TSocket::peek() {
87 if (!isOpen()) {
88 return false;
89 }
90 uint8_t buf;
91 int r = recv(socket_, &buf, 1, MSG_PEEK);
92 if (r == -1) {
93 perror("TSocket::peek()");
94 close();
95 throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno);
96 }
97 return (r > 0);
98}
99
Mark Slee8d7e1f62006-06-07 06:48:56 +0000100void TSocket::open() {
Mark Sleee8540632006-05-30 09:24:40 +0000101 // Create socket
102 socket_ = socket(AF_INET, SOCK_STREAM, 0);
103 if (socket_ == -1) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000104 perror("TSocket::open() socket");
105 close();
106 throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000107 }
Mark Slee29050782006-09-29 00:12:30 +0000108
109 // Send timeout
110 if (sendTimeout_ > 0) {
111 setSendTimeout(sendTimeout_);
112 }
113
114 // Recv timeout
115 if (recvTimeout_ > 0) {
116 setRecvTimeout(recvTimeout_);
117 }
118
119 // Linger
120 setLinger(lingerOn_, lingerVal_);
121
122 // No delay
123 setNoDelay(noDelay_);
124
Mark Slee8d7e1f62006-06-07 06:48:56 +0000125 // Lookup the hostname
Mark Sleee8540632006-05-30 09:24:40 +0000126 struct sockaddr_in addr;
127 addr.sin_family = AF_INET;
128 addr.sin_port = htons(port_);
129
Mark Sleee8540632006-05-30 09:24:40 +0000130 {
Mark Slee29050782006-09-29 00:12:30 +0000131 // Scope lock on host entry lookup
132 Synchronized s(s_netdb_monitor);
Mark Sleee8540632006-05-30 09:24:40 +0000133 struct hostent *host_entry = gethostbyname(host_.c_str());
134
135 if (host_entry == NULL) {
Mark Slee29050782006-09-29 00:12:30 +0000136 perror("TSocket: dns error: failed call to gethostbyname.");
Mark Sleee8540632006-05-30 09:24:40 +0000137 close();
Mark Slee8d7e1f62006-06-07 06:48:56 +0000138 throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
Mark Sleee8540632006-05-30 09:24:40 +0000139 }
140
141 addr.sin_port = htons(port_);
142 memcpy(&addr.sin_addr.s_addr,
143 host_entry->h_addr_list[0],
144 host_entry->h_length);
145 }
Mark Slee29050782006-09-29 00:12:30 +0000146
147 // Set the socket to be non blocking for connect if a timeout exists
148 int flags = fcntl(socket_, F_GETFL, 0);
149 if (connTimeout_ > 0) {
150 fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
151 } else {
152 fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
153 }
154
155 // Conn timeout
156 struct timeval c = {(int)(connTimeout_/1000),
157 (int)((connTimeout_%1000)*1000)};
Mark Sleee8540632006-05-30 09:24:40 +0000158
159 // Connect the socket
160 int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
161
Mark Slee29050782006-09-29 00:12:30 +0000162 if (ret == 0) {
163 goto done;
164 }
165
166 if (errno != EINPROGRESS) {
Mark Sleee8540632006-05-30 09:24:40 +0000167 close();
Mark Slee29050782006-09-29 00:12:30 +0000168 char buff[1024];
169 sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
170 perror(buff);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000171 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000172 }
173
Mark Slee29050782006-09-29 00:12:30 +0000174 fd_set fds;
175 FD_ZERO(&fds);
176 FD_SET(socket_, &fds);
177 ret = select(socket_+1, NULL, &fds, NULL, &c);
178
179 if (ret > 0) {
180 // Ensure connected
181 int val;
182 socklen_t lon;
183 lon = sizeof(int);
184 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
185 if (ret2 == -1) {
186 close();
187 perror("TSocket::open() getsockopt SO_ERROR");
188 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
189 }
190 if (val == 0) {
191 goto done;
192 }
193 close();
194 perror("TSocket::open() SO_ERROR was set");
195 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
196 } else if (ret == 0) {
197 close();
198 perror("TSocket::open() timeed out");
199 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
200 } else {
201 close();
202 perror("TSocket::open() select error");
203 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
204 }
205
206 done:
207 // Set socket back to normal mode (blocking)
208 fcntl(socket_, F_SETFL, flags);
Mark Sleee8540632006-05-30 09:24:40 +0000209}
210
211void TSocket::close() {
212 if (socket_ > 0) {
213 shutdown(socket_, SHUT_RDWR);
214 ::close(socket_);
215 }
216 socket_ = 0;
217}
218
Mark Slee8d7e1f62006-06-07 06:48:56 +0000219uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
220 if (socket_ <= 0) {
221 throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
222 }
Mark Sleee8540632006-05-30 09:24:40 +0000223
Mark Sleee8540632006-05-30 09:24:40 +0000224 uint32_t retries = 0;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000225
226 try_again:
227 // Read from the socket
228 int got = recv(socket_, buf, len, 0);
229 ++g_socket_syscalls;
230
231 // Check for error on read
232 if (got < 0) {
233 perror("TSocket::read()");
234
235 // If temporarily out of resources, sleep a bit and try again
236 if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
237 usleep(50);
238 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000239 }
240
Mark Slee8d7e1f62006-06-07 06:48:56 +0000241 // If interrupted, try again
242 if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
243 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000244 }
245
Mark Slee8d7e1f62006-06-07 06:48:56 +0000246 // If we disconnect with no linger time
247 if (errno == ECONNRESET) {
248 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
249 }
250
251 // This ish isn't open
252 if (errno == ENOTCONN) {
253 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
254 }
255
256 // Timed out!
257 if (errno == ETIMEDOUT) {
258 throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
259 }
260
261 // Some other error, whatevz
262 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
263 }
264
265 // The remote host has closed the socket
266 if (got == 0) {
267 close();
268 return 0;
Mark Sleee8540632006-05-30 09:24:40 +0000269 }
270
271 // Pack data into string
Mark Slee8d7e1f62006-06-07 06:48:56 +0000272 return got;
Mark Sleee8540632006-05-30 09:24:40 +0000273}
274
Mark Slee8d7e1f62006-06-07 06:48:56 +0000275void TSocket::write(const uint8_t* buf, uint32_t len) {
276 if (socket_ <= 0) {
277 throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
278 }
279
Mark Sleee8540632006-05-30 09:24:40 +0000280 uint32_t sent = 0;
281
Mark Slee8d7e1f62006-06-07 06:48:56 +0000282 while (sent < len) {
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000283
284 int flags = 0;
Mark Slee29050782006-09-29 00:12:30 +0000285 #ifdef MSG_NOSIGNAL
Mark Slee8d7e1f62006-06-07 06:48:56 +0000286 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
287 // check for the EPIPE return condition and close the socket in that case
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000288 flags |= MSG_NOSIGNAL;
Mark Slee29050782006-09-29 00:12:30 +0000289 #endif // ifdef MSG_NOSIGNAL
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000290
291 int b = send(socket_, buf + sent, len - sent, flags);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000292 ++g_socket_syscalls;
293
Mark Sleee8540632006-05-30 09:24:40 +0000294 // Fail on a send error
295 if (b < 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000296 if (errno == EPIPE) {
297 close();
298 throw TTransportException(TTX_NOT_OPEN, "EPIPE");
299 }
300
301 if (errno == ECONNRESET) {
302 close();
303 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
304 }
305
306 if (errno == ENOTCONN) {
307 close();
308 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
309 }
310
311 perror("TSocket::write() send < 0");
312 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000313 }
314
315 // Fail on blocked send
316 if (b == 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000317 throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
Mark Sleee8540632006-05-30 09:24:40 +0000318 }
Mark Sleee8540632006-05-30 09:24:40 +0000319 sent += b;
320 }
321}
322
Aditya Agarwalebc99e02007-01-15 23:14:58 +0000323void TSocket::setHost(string host) {
324 host_ = host;
325}
326
327void TSocket::setPort(int port) {
328 port_ = port;
329}
330
Mark Slee8d7e1f62006-06-07 06:48:56 +0000331void TSocket::setLinger(bool on, int linger) {
Mark Slee29050782006-09-29 00:12:30 +0000332 lingerOn_ = on;
333 lingerVal_ = linger;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000334 if (socket_ <= 0) {
335 return;
336 }
337
Mark Slee29050782006-09-29 00:12:30 +0000338 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
339 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
340 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000341 perror("TSocket::setLinger()");
Mark Sleee8540632006-05-30 09:24:40 +0000342 }
Mark Sleee8540632006-05-30 09:24:40 +0000343}
344
Mark Slee8d7e1f62006-06-07 06:48:56 +0000345void TSocket::setNoDelay(bool noDelay) {
Mark Slee29050782006-09-29 00:12:30 +0000346 noDelay_ = noDelay;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000347 if (socket_ <= 0) {
348 return;
349 }
350
Mark Sleee8540632006-05-30 09:24:40 +0000351 // Set socket to NODELAY
Mark Slee29050782006-09-29 00:12:30 +0000352 int v = noDelay_ ? 1 : 0;
353 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
354 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000355 perror("TSocket::setNoDelay()");
Mark Sleee8540632006-05-30 09:24:40 +0000356 }
Mark Sleee8540632006-05-30 09:24:40 +0000357}
Mark Slee29050782006-09-29 00:12:30 +0000358
359void TSocket::setConnTimeout(int ms) {
360 connTimeout_ = ms;
361}
362
363void TSocket::setRecvTimeout(int ms) {
364 recvTimeout_ = ms;
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000365 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
366 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Slee29050782006-09-29 00:12:30 +0000367 if (socket_ <= 0) {
368 return;
369 }
370
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000371 // Copy because select may modify
372 struct timeval r = recvTimeval_;
Mark Slee29050782006-09-29 00:12:30 +0000373 int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
374 if (ret == -1) {
375 perror("TSocket::setRecvTimeout()");
376 }
377}
378
379void TSocket::setSendTimeout(int ms) {
380 sendTimeout_ = ms;
381 if (socket_ <= 0) {
382 return;
383 }
384
385 struct timeval s = {(int)(sendTimeout_/1000),
386 (int)((sendTimeout_%1000)*1000)};
387 int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
388 if (ret == -1) {
389 perror("TSocket::setSendTimeout()");
390 }
391}
392
Marc Slemko6f038a72006-08-03 18:58:09 +0000393}}} // facebook::thrift::transport