THRIFT-900. cpp: Unix domain socket
authorBryan Duxbury <bryanduxbury@apache.org>
Tue, 28 Sep 2010 14:36:07 +0000 (14:36 +0000)
committerBryan Duxbury <bryanduxbury@apache.org>
Tue, 28 Sep 2010 14:36:07 +0000 (14:36 +0000)
This patch adds a new Unix Socket transport.

Patch: Roger Meier

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1002179 13f79535-47bb-0310-9956-ffa450edef68

lib/cpp/src/transport/TServerSocket.cpp
lib/cpp/src/transport/TServerSocket.h
lib/cpp/src/transport/TSocket.cpp
lib/cpp/src/transport/TSocket.h

index 2f14fd5..836f6ba 100644 (file)
@@ -20,6 +20,7 @@
 #include <cstring>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <sys/poll.h>
 #include <sys/types.h>
 #include <netinet/in.h>
@@ -68,6 +69,20 @@ TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
   intSock1_(-1),
   intSock2_(-1) {}
 
+TServerSocket::TServerSocket(string path) :
+  port_(0),
+  path_(path),
+  serverSocket_(-1),
+  acceptBacklog_(1024),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  retryLimit_(0),
+  retryDelay_(0),
+  tcpSendBuffer_(0),
+  tcpRecvBuffer_(0),
+  intSock1_(-1),
+  intSock2_(-1) {}
+
 TServerSocket::~TServerSocket() {
   close();
 }
@@ -131,7 +146,12 @@ void TServerSocket::listen() {
       break;
   }
 
-  serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (! path_.empty()) {
+    serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+  } else {
+    serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  }
+
   if (serverSocket_ == -1) {
     int errno_copy = errno;
     GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
@@ -201,13 +221,16 @@ void TServerSocket::listen() {
     throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
   }
 
-  // TCP Nodelay, speed over bandwidth
-  if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
-                       &one, sizeof(one))) {
-    int errno_copy = errno;
-    GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
-    close();
-    throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+  // Unix Sockets do not need that
+  if (path_.empty()) {
+    // TCP Nodelay, speed over bandwidth
+    if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+                         &one, sizeof(one))) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+      close();
+      throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+    }
   }
 
   // Set NONBLOCK on the accept socket
@@ -228,21 +251,49 @@ void TServerSocket::listen() {
   // we may want to try to bind more than once, since SO_REUSEADDR doesn't
   // always seem to work. The client can configure the retry variables.
   int retries = 0;
-  do {
-    if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
-      break;
+
+  if (! path_.empty()) {
+    // Unix Domain Socket
+    struct sockaddr_un address;
+    socklen_t len;
+
+    if (path_.length() > sizeof(address.sun_path)) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
+      throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
     }
 
-    // use short circuit evaluation here to only sleep if we need to
-  } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+    address.sun_family = AF_UNIX;
+    sprintf(address.sun_path, path_.c_str());
+    len = sizeof(address);
+
+    do {
+      if (0 == bind(serverSocket_, (struct sockaddr *) &address, len)) {
+        break;
+      }
+      // use short circuit evaluation here to only sleep if we need to
+    } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+  } else {
+    do {
+      if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
+        break;
+      }
+      // use short circuit evaluation here to only sleep if we need to
+    } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
 
-  // free addrinfo
-  freeaddrinfo(res0);
+    // free addrinfo
+    freeaddrinfo(res0);
+  }
 
   // throw an error if we failed to bind properly
   if (retries > retryLimit_) {
     char errbuf[1024];
-    sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+    if (! path_.empty()) {
+      sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str());
+    }
+    else {
+      sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+    }
     GlobalOutput(errbuf);
     close();
     throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
index a6be017..8cd521f 100644 (file)
@@ -36,6 +36,7 @@ class TServerSocket : public TServerTransport {
  public:
   TServerSocket(int port);
   TServerSocket(int port, int sendTimeout, int recvTimeout);
+  TServerSocket(std::string path);
 
   ~TServerSocket();
 
@@ -58,6 +59,7 @@ class TServerSocket : public TServerTransport {
 
  private:
   int port_;
+  std::string path_;
   int serverSocket_;
   int acceptBacklog_;
   int sendTimeout_;
index 5da33bb..951ddcf 100644 (file)
@@ -21,6 +21,7 @@
 #include <cstring>
 #include <sstream>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <sys/poll.h>
 #include <sys/types.h>
 #include <arpa/inet.h>
@@ -50,6 +51,23 @@ uint32_t g_socket_syscalls = 0;
 TSocket::TSocket(string host, int port) :
   host_(host),
   port_(port),
+  path_(""),
+  socket_(-1),
+  connTimeout_(0),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  lingerOn_(1),
+  lingerVal_(0),
+  noDelay_(1),
+  maxRecvRetries_(5) {
+  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+}
+
+TSocket::TSocket(string path) :
+  host_(""),
+  port_(0),
+  path_(path),
   socket_(-1),
   connTimeout_(0),
   sendTimeout_(0),
@@ -65,6 +83,7 @@ TSocket::TSocket(string host, int port) :
 TSocket::TSocket() :
   host_(""),
   port_(0),
+  path_(""),
   socket_(-1),
   connTimeout_(0),
   sendTimeout_(0),
@@ -80,6 +99,7 @@ TSocket::TSocket() :
 TSocket::TSocket(int socket) :
   host_(""),
   port_(0),
+  path_(""),
   socket_(socket),
   connTimeout_(0),
   sendTimeout_(0),
@@ -130,7 +150,12 @@ void TSocket::openConnection(struct addrinfo *res) {
     return;
   }
 
-  socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (! path_.empty()) {
+    socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+  } else {
+    socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  }
+
   if (socket_ == -1) {
     int errno_copy = errno;
     GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
@@ -179,7 +204,24 @@ void TSocket::openConnection(struct addrinfo *res) {
   }
 
   // Connect the socket
-  int ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+  int ret;
+  if (! path_.empty()) {
+    struct sockaddr_un address;
+    socklen_t len;
+
+    if (path_.length() > sizeof(address.sun_path)) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
+      throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
+    }
+
+    address.sun_family = AF_UNIX;
+    sprintf(address.sun_path, path_.c_str());
+    len = sizeof(address);
+    ret = connect(socket_, (struct sockaddr *) &address, len);
+  } else {
+    ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+  }
 
   // success case
   if (ret == 0) {
@@ -237,6 +279,24 @@ void TSocket::open() {
   if (isOpen()) {
     return;
   }
+  if (! path_.empty()) {
+    unix_open();
+  } else {
+    local_open();
+  }
+}
+
+void TSocket::unix_open(){
+  if (! path_.empty()) {
+    // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
+    openConnection(NULL);
+  }
+}
+
+void TSocket::local_open(){
+  if (isOpen()) {
+    return;
+  }
 
   // Validate port number
   if (port_ < 0 || port_ > 0xFFFF) {
index 0184362..f69a9a1 100644 (file)
@@ -59,6 +59,14 @@ class TSocket : public TTransport {
    */
   TSocket(std::string host, int port);
 
+  /**
+   * Constructs a new Unix domain socket.
+   * Note that this does NOT actually connect the socket.
+   *
+   * @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift"
+   */
+  TSocket(std::string path);
+
   /**
    * Destroyes the socket object, closing it if necessary.
    */
@@ -217,6 +225,9 @@ class TSocket : public TTransport {
   /** Port number to connect on */
   int port_;
 
+  /** UNIX domain socket path */
+  std::string path_;
+
   /** Underlying UNIX socket handle */
   int socket_;
 
@@ -246,6 +257,10 @@ class TSocket : public TTransport {
 
   /** Whether to use low minimum TCP retransmission timeout */
   static bool useLowMinRto_;
+
+ private:
+  void unix_open();
+  void local_open();
 };
 
 }}} // apache::thrift::transport