blob: 8b9048c8a4ef4691176358525f923b049ba002d7 [file] [log] [blame]
Marc Slemkoe03da182006-07-21 21:32:36 +00001#include <config.h>
Mark Sleee8540632006-05-30 09:24:40 +00002#include <sys/socket.h>
3#include <arpa/inet.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <netdb.h>
7#include <unistd.h>
8#include <errno.h>
Mark Slee29050782006-09-29 00:12:30 +00009#include <fcntl.h>
10#include <sys/select.h>
Mark Sleee8540632006-05-30 09:24:40 +000011
Mark Slee29050782006-09-29 00:12:30 +000012#include "concurrency/Monitor.h"
Marc Slemkod42a2c22006-08-10 03:30:18 +000013#include "TSocket.h"
14#include "TTransportException.h"
Mark Sleee8540632006-05-30 09:24:40 +000015
Marc Slemko6f038a72006-08-03 18:58:09 +000016namespace facebook { namespace thrift { namespace transport {
17
Mark Sleee8540632006-05-30 09:24:40 +000018using namespace std;
Mark Slee29050782006-09-29 00:12:30 +000019using namespace facebook::thrift::concurrency;
Mark Sleee8540632006-05-30 09:24:40 +000020
Mark Slee29050782006-09-29 00:12:30 +000021// Global var to track total socket sys calls
Mark Slee8d7e1f62006-06-07 06:48:56 +000022uint32_t g_socket_syscalls = 0;
23
24/**
25 * TSocket implementation.
26 *
27 * @author Mark Slee <mcslee@facebook.com>
28 */
29
Mark Sleee8540632006-05-30 09:24:40 +000030// Mutex to protect syscalls to netdb
Mark Slee29050782006-09-29 00:12:30 +000031static Monitor s_netdb_monitor;
Mark Sleee8540632006-05-30 09:24:40 +000032
33// TODO(mcslee): Make this an option to the socket class
34#define MAX_RECV_RETRIES 20
Mark Slee29050782006-09-29 00:12:30 +000035
36TSocket::TSocket(string host, int port) :
37 host_(host),
38 port_(port),
39 socket_(0),
40 connTimeout_(0),
41 sendTimeout_(0),
42 recvTimeout_(0),
43 lingerOn_(1),
44 lingerVal_(0),
45 noDelay_(1) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000046 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
47 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Sleee8540632006-05-30 09:24:40 +000048}
49
Mark Slee29050782006-09-29 00:12:30 +000050TSocket::TSocket(int socket) :
51 host_(""),
52 port_(0),
53 socket_(socket),
54 connTimeout_(0),
55 sendTimeout_(0),
56 recvTimeout_(0),
57 lingerOn_(1),
58 lingerVal_(0),
59 noDelay_(1) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000060 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
61 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Slee29050782006-09-29 00:12:30 +000062}
63
Mark Sleee8540632006-05-30 09:24:40 +000064TSocket::~TSocket() {
65 close();
66}
67
Mark Slee8d7e1f62006-06-07 06:48:56 +000068bool TSocket::isOpen() {
69 return (socket_ > 0);
70}
71
Mark Sleeb9ff32a2006-11-16 01:00:24 +000072bool TSocket::peek() {
73 if (!isOpen()) {
74 return false;
75 }
76 uint8_t buf;
77 int r = recv(socket_, &buf, 1, MSG_PEEK);
78 if (r == -1) {
79 perror("TSocket::peek()");
80 close();
81 throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno);
82 }
83 return (r > 0);
84}
85
Mark Slee8d7e1f62006-06-07 06:48:56 +000086void TSocket::open() {
Mark Sleee8540632006-05-30 09:24:40 +000087 // Create socket
88 socket_ = socket(AF_INET, SOCK_STREAM, 0);
89 if (socket_ == -1) {
Mark Slee8d7e1f62006-06-07 06:48:56 +000090 perror("TSocket::open() socket");
91 close();
92 throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +000093 }
Mark Slee29050782006-09-29 00:12:30 +000094
95 // Send timeout
96 if (sendTimeout_ > 0) {
97 setSendTimeout(sendTimeout_);
98 }
99
100 // Recv timeout
101 if (recvTimeout_ > 0) {
102 setRecvTimeout(recvTimeout_);
103 }
104
105 // Linger
106 setLinger(lingerOn_, lingerVal_);
107
108 // No delay
109 setNoDelay(noDelay_);
110
Mark Slee8d7e1f62006-06-07 06:48:56 +0000111 // Lookup the hostname
Mark Sleee8540632006-05-30 09:24:40 +0000112 struct sockaddr_in addr;
113 addr.sin_family = AF_INET;
114 addr.sin_port = htons(port_);
115
Mark Sleee8540632006-05-30 09:24:40 +0000116 {
Mark Slee29050782006-09-29 00:12:30 +0000117 // Scope lock on host entry lookup
118 Synchronized s(s_netdb_monitor);
Mark Sleee8540632006-05-30 09:24:40 +0000119 struct hostent *host_entry = gethostbyname(host_.c_str());
120
121 if (host_entry == NULL) {
Mark Slee29050782006-09-29 00:12:30 +0000122 perror("TSocket: dns error: failed call to gethostbyname.");
Mark Sleee8540632006-05-30 09:24:40 +0000123 close();
Mark Slee8d7e1f62006-06-07 06:48:56 +0000124 throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
Mark Sleee8540632006-05-30 09:24:40 +0000125 }
126
127 addr.sin_port = htons(port_);
128 memcpy(&addr.sin_addr.s_addr,
129 host_entry->h_addr_list[0],
130 host_entry->h_length);
131 }
Mark Slee29050782006-09-29 00:12:30 +0000132
133 // Set the socket to be non blocking for connect if a timeout exists
134 int flags = fcntl(socket_, F_GETFL, 0);
135 if (connTimeout_ > 0) {
136 fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
137 } else {
138 fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
139 }
140
141 // Conn timeout
142 struct timeval c = {(int)(connTimeout_/1000),
143 (int)((connTimeout_%1000)*1000)};
Mark Sleee8540632006-05-30 09:24:40 +0000144
145 // Connect the socket
146 int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
147
Mark Slee29050782006-09-29 00:12:30 +0000148 if (ret == 0) {
149 goto done;
150 }
151
152 if (errno != EINPROGRESS) {
Mark Sleee8540632006-05-30 09:24:40 +0000153 close();
Mark Slee29050782006-09-29 00:12:30 +0000154 char buff[1024];
155 sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
156 perror(buff);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000157 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000158 }
159
Mark Slee29050782006-09-29 00:12:30 +0000160 fd_set fds;
161 FD_ZERO(&fds);
162 FD_SET(socket_, &fds);
163 ret = select(socket_+1, NULL, &fds, NULL, &c);
164
165 if (ret > 0) {
166 // Ensure connected
167 int val;
168 socklen_t lon;
169 lon = sizeof(int);
170 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
171 if (ret2 == -1) {
172 close();
173 perror("TSocket::open() getsockopt SO_ERROR");
174 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
175 }
176 if (val == 0) {
177 goto done;
178 }
179 close();
180 perror("TSocket::open() SO_ERROR was set");
181 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
182 } else if (ret == 0) {
183 close();
184 perror("TSocket::open() timeed out");
185 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
186 } else {
187 close();
188 perror("TSocket::open() select error");
189 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
190 }
191
192 done:
193 // Set socket back to normal mode (blocking)
194 fcntl(socket_, F_SETFL, flags);
Mark Sleee8540632006-05-30 09:24:40 +0000195}
196
197void TSocket::close() {
198 if (socket_ > 0) {
199 shutdown(socket_, SHUT_RDWR);
200 ::close(socket_);
201 }
202 socket_ = 0;
203}
204
Mark Slee8d7e1f62006-06-07 06:48:56 +0000205uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
206 if (socket_ <= 0) {
207 throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
208 }
Mark Sleee8540632006-05-30 09:24:40 +0000209
Mark Sleee8540632006-05-30 09:24:40 +0000210 uint32_t retries = 0;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000211
212 try_again:
213 // Read from the socket
214 int got = recv(socket_, buf, len, 0);
215 ++g_socket_syscalls;
216
217 // Check for error on read
218 if (got < 0) {
219 perror("TSocket::read()");
220
221 // If temporarily out of resources, sleep a bit and try again
222 if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
223 usleep(50);
224 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000225 }
226
Mark Slee8d7e1f62006-06-07 06:48:56 +0000227 // If interrupted, try again
228 if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
229 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000230 }
231
Mark Slee8d7e1f62006-06-07 06:48:56 +0000232 // If we disconnect with no linger time
233 if (errno == ECONNRESET) {
234 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
235 }
236
237 // This ish isn't open
238 if (errno == ENOTCONN) {
239 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
240 }
241
242 // Timed out!
243 if (errno == ETIMEDOUT) {
244 throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
245 }
246
247 // Some other error, whatevz
248 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
249 }
250
251 // The remote host has closed the socket
252 if (got == 0) {
253 close();
254 return 0;
Mark Sleee8540632006-05-30 09:24:40 +0000255 }
256
257 // Pack data into string
Mark Slee8d7e1f62006-06-07 06:48:56 +0000258 return got;
Mark Sleee8540632006-05-30 09:24:40 +0000259}
260
Mark Slee8d7e1f62006-06-07 06:48:56 +0000261void TSocket::write(const uint8_t* buf, uint32_t len) {
262 if (socket_ <= 0) {
263 throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
264 }
265
Mark Sleee8540632006-05-30 09:24:40 +0000266 uint32_t sent = 0;
267
Mark Slee8d7e1f62006-06-07 06:48:56 +0000268 while (sent < len) {
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000269
270 int flags = 0;
Mark Slee29050782006-09-29 00:12:30 +0000271 #ifdef MSG_NOSIGNAL
Mark Slee8d7e1f62006-06-07 06:48:56 +0000272 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
273 // check for the EPIPE return condition and close the socket in that case
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000274 flags |= MSG_NOSIGNAL;
Mark Slee29050782006-09-29 00:12:30 +0000275 #endif // ifdef MSG_NOSIGNAL
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000276
277 int b = send(socket_, buf + sent, len - sent, flags);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000278 ++g_socket_syscalls;
279
Mark Sleee8540632006-05-30 09:24:40 +0000280 // Fail on a send error
281 if (b < 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000282 if (errno == EPIPE) {
283 close();
284 throw TTransportException(TTX_NOT_OPEN, "EPIPE");
285 }
286
287 if (errno == ECONNRESET) {
288 close();
289 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
290 }
291
292 if (errno == ENOTCONN) {
293 close();
294 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
295 }
296
297 perror("TSocket::write() send < 0");
298 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000299 }
300
301 // Fail on blocked send
302 if (b == 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000303 throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
Mark Sleee8540632006-05-30 09:24:40 +0000304 }
Mark Sleee8540632006-05-30 09:24:40 +0000305 sent += b;
306 }
307}
308
Mark Slee8d7e1f62006-06-07 06:48:56 +0000309void TSocket::setLinger(bool on, int linger) {
Mark Slee29050782006-09-29 00:12:30 +0000310 lingerOn_ = on;
311 lingerVal_ = linger;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000312 if (socket_ <= 0) {
313 return;
314 }
315
Mark Slee29050782006-09-29 00:12:30 +0000316 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
317 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
318 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000319 perror("TSocket::setLinger()");
Mark Sleee8540632006-05-30 09:24:40 +0000320 }
Mark Sleee8540632006-05-30 09:24:40 +0000321}
322
Mark Slee8d7e1f62006-06-07 06:48:56 +0000323void TSocket::setNoDelay(bool noDelay) {
Mark Slee29050782006-09-29 00:12:30 +0000324 noDelay_ = noDelay;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000325 if (socket_ <= 0) {
326 return;
327 }
328
Mark Sleee8540632006-05-30 09:24:40 +0000329 // Set socket to NODELAY
Mark Slee29050782006-09-29 00:12:30 +0000330 int v = noDelay_ ? 1 : 0;
331 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
332 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000333 perror("TSocket::setNoDelay()");
Mark Sleee8540632006-05-30 09:24:40 +0000334 }
Mark Sleee8540632006-05-30 09:24:40 +0000335}
Mark Slee29050782006-09-29 00:12:30 +0000336
337void TSocket::setConnTimeout(int ms) {
338 connTimeout_ = ms;
339}
340
341void TSocket::setRecvTimeout(int ms) {
342 recvTimeout_ = ms;
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000343 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
344 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Slee29050782006-09-29 00:12:30 +0000345 if (socket_ <= 0) {
346 return;
347 }
348
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000349 // Copy because select may modify
350 struct timeval r = recvTimeval_;
Mark Slee29050782006-09-29 00:12:30 +0000351 int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
352 if (ret == -1) {
353 perror("TSocket::setRecvTimeout()");
354 }
355}
356
357void TSocket::setSendTimeout(int ms) {
358 sendTimeout_ = ms;
359 if (socket_ <= 0) {
360 return;
361 }
362
363 struct timeval s = {(int)(sendTimeout_/1000),
364 (int)((sendTimeout_%1000)*1000)};
365 int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
366 if (ret == -1) {
367 perror("TSocket::setSendTimeout()");
368 }
369}
370
Marc Slemko6f038a72006-08-03 18:58:09 +0000371}}} // facebook::thrift::transport