Java libraries for Thrift

Summary: The basic Thrift stack implemented in Java, still in need of a lot of work but fully functional.

Reviewed By: aditya

Test Plan: Unit tests are the NEXT checkin, I swear

Notes: Perf on the Java stuff actually isn't that bad, and it's far from optimized at the moment. Barely any tweaking has been done. Testing shows that a Java server with the C++ client has RPC performance within 2x of the pure C++ implementations. This is pretty sweet, since this cost will be eclipsed by the cost of whatever processing is being done on an actual server doing real work.




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664715 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/transport/TIOStreamTransport.java b/lib/java/src/transport/TIOStreamTransport.java
new file mode 100644
index 0000000..aa4adf1
--- /dev/null
+++ b/lib/java/src/transport/TIOStreamTransport.java
@@ -0,0 +1,138 @@
+package com.facebook.thrift.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This is the most commonly used base transport. It takes an InputStream
+ * and an OutputStream and uses those to perform all transport operations.
+ * This allows for compatibility with all the nice constructs Java already
+ * has to provide a variety of types of streams.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TIOStreamTransport extends TTransport {
+
+  /** Underlying inputStream */
+  protected InputStream inputStream_ = null;
+
+  /** Underlying outputStream */
+  protected OutputStream outputStream_ = null;
+
+  /**
+   * Subclasses can invoke the default constructor and then assign the input
+   * streams in the open method.
+   */
+  protected TIOStreamTransport() {}
+
+  /**
+   * Input stream constructor.
+   *
+   * @param is Input stream to read from
+   */
+  public TIOStreamTransport(InputStream is) {
+    inputStream_ = is;
+  }
+
+  /**
+   * Output stream constructor.
+   *
+   * @param os Output stream to read from
+   */
+  public TIOStreamTransport(OutputStream os) {
+    outputStream_ = os;
+  }
+
+  /**
+   * Two-way stream constructor.
+   *
+   * @param is Input stream to read from
+   * @param os Output stream to read from
+   */ 
+  public TIOStreamTransport(InputStream is, OutputStream os) {
+    inputStream_ = is;
+    outputStream_ = os;
+  }
+
+  /**
+   * The streams must already be open at construction time, so this should
+   * always return true.
+   *
+   * @return true
+   */
+  public boolean isOpen() {
+    return true;
+  }
+
+  /**
+   * The streams must already be open. This method does nothing.
+   */
+  public void open() throws TTransportException {}
+
+  /**
+   * Closes both the input and output streams.
+   */
+  public void close() {
+    if (inputStream_ != null) {
+      try {
+        inputStream_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Error closing input stream: " +
+                           iox.getMessage());
+      }
+      inputStream_ = null;
+    }
+    if (outputStream_ != null) {
+      try {
+        outputStream_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Error closing output stream: " +
+                           iox.getMessage());
+      }
+      outputStream_ = null;
+    }
+  }
+
+  /**
+   * Reads from the underlying input stream if not null.
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if (inputStream_ == null) {
+      throw new TTransportException("Cannot read from null inputStream");
+    }
+    try {
+      return inputStream_.read(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  /**
+   * Writes to the underlying output stream if not null.
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if (outputStream_ == null) {
+      throw new TTransportException("Cannot write to null outputStream");
+    }
+    try {
+      outputStream_.write(buf, off, len);
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  /**
+   * Flushes the underlying output stream if not null.
+   */
+  public void flush() throws TTransportException {
+    if (outputStream_ == null) {
+      throw new TTransportException("Cannot flush null outputStream");
+    }
+    try {
+      outputStream_.flush();
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+}
diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java
new file mode 100644
index 0000000..a885fa1
--- /dev/null
+++ b/lib/java/src/transport/TServerSocket.java
@@ -0,0 +1,46 @@
+package com.facebook.thrift.transport;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * Wrapper around ServerSocket for Thrift.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TServerSocket extends TServerTransport {
+  
+  private ServerSocket serverSocket_;
+  
+  public TServerSocket(ServerSocket serverSocket) {
+    serverSocket_ = serverSocket;
+  }
+
+  public void listen() throws TTransportException {}
+  
+  protected TSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException("No underlying server socket.");
+    }
+    try {
+      Socket result = serverSocket_.accept();
+      return new TSocket(result);
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Could not close server socket: " +
+                           iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+}
diff --git a/lib/java/src/transport/TServerTransport.java b/lib/java/src/transport/TServerTransport.java
new file mode 100644
index 0000000..6faeaab
--- /dev/null
+++ b/lib/java/src/transport/TServerTransport.java
@@ -0,0 +1,23 @@
+package com.facebook.thrift.transport;
+
+/**
+ * Server transport. Object which provides client transports.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public abstract class TServerTransport {
+
+  public abstract void listen() throws TTransportException;
+
+  public final TTransport accept() throws TTransportException {
+    TTransport transport = acceptImpl();
+    if (transport == null) {
+      throw new TTransportException("accept() may not return NULL");
+    }
+    return transport;
+  }
+
+  public abstract void close();
+
+  protected abstract TTransport acceptImpl() throws TTransportException;
+}
diff --git a/lib/java/src/transport/TSocket.java b/lib/java/src/transport/TSocket.java
new file mode 100644
index 0000000..2092f11
--- /dev/null
+++ b/lib/java/src/transport/TSocket.java
@@ -0,0 +1,126 @@
+package com.facebook.thrift.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * Socket implementation of the TTransport interface. To be commented soon!
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TSocket extends TIOStreamTransport {
+
+  /** Wrapped Socket object */
+  private Socket socket_ = null;
+  
+  /** Remote host */
+  private String host_  = null;
+
+  /** Remote port */
+  private int port_ = 0;
+
+  /**
+   * Constructor that takes an already created socket.
+   *
+   * @param socket Already created socket object
+   * @throws TTransportException if there is an error setting up the streams
+   */
+  public TSocket(Socket socket) throws TTransportException {
+    socket_ = socket;
+    if (isOpen()) {
+      try {
+        inputStream_ = new BufferedInputStream(socket_.getInputStream());
+        outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
+      } catch (IOException iox) {
+        close();
+        throw new TTransportException(iox);
+      }
+    }
+  }
+
+  /**
+   * Creates a new unconnected socket that will connect to the given host
+   * on the given port.
+   *
+   * @param host Remote host
+   * @param port Remote port
+   */
+  public TSocket(String host, int port) {
+    socket_ = new Socket();
+    host_ = host;
+    port_ = port;
+  }
+
+  /**
+   * Returns a reference to the underlying socket. Can be used to set
+   * socket options, etc. If an underlying socket does not exist yet, this
+   * will create one.
+   */
+  public Socket getSocket() {
+    if (socket_ == null) {
+      socket_ = new Socket();
+    }
+    return socket_;
+  }
+
+  /**
+   * Checks whether the socket is connected.
+   */
+  public boolean isOpen() {
+    if (socket_ == null) {
+      return false;
+    }
+    return socket_.isConnected();
+  }
+
+  /**
+   * Connects the socket, creating a new socket object if necessary.
+   */
+  public void open() throws TTransportException {
+    if (socket_ == null) {
+      if (host_.length() == 0) {
+        throw new TTransportException("Cannot open null host.");
+      }
+      if (port_ <= 0) {
+        throw new TTransportException("Cannot open without port.");
+      }
+      socket_ = new Socket();
+    }
+
+    if (isOpen()) {
+      throw new TTransportException("Socket already connected.");
+    }
+
+    try {
+      socket_.connect(new InetSocketAddress(host_, port_));
+      inputStream_ = new BufferedInputStream(socket_.getInputStream());
+      outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
+    } catch (IOException iox) {
+      close();
+      throw new TTransportException(iox);
+    }
+  }
+
+  /**
+   * Closes the socket.
+   */
+  public void close() {
+    // Close the underlying streams
+    super.close();
+
+    // Close the socket
+    if (socket_ != null) {
+      try {
+        socket_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: exception closing socket: " +
+                           iox.getMessage());
+      }
+      socket_ = null;
+    }
+  }
+
+}
diff --git a/lib/java/src/transport/TTransport.java b/lib/java/src/transport/TTransport.java
new file mode 100644
index 0000000..00610df
--- /dev/null
+++ b/lib/java/src/transport/TTransport.java
@@ -0,0 +1,84 @@
+package com.facebook.thrift.transport;
+
+/**
+ * Generic class that encapsulates the I/O layer. This is basically a thin
+ * wrapper around the combined functionality of Java input/output streams.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public abstract class TTransport {
+
+  /**
+   * Queries whether the transport is open.
+   *
+   * @return True if the transport is open.
+   */
+  public abstract boolean isOpen();
+
+  /**
+   * Opens the transport for reading/writing.
+   *
+   * @throws TTransportException if the transport could not be opened
+   */
+  public abstract void open()
+    throws TTransportException;
+
+  /**
+   * Closes the transport.
+   */
+  public abstract void close();
+
+  /**
+   * Reads up to len bytes into buffer buf, starting att offset off.
+   *
+   * @param buf Array to read into
+   * @param off Index to start reading at
+   * @param len Maximum number of bytes to read
+   * @return The number of bytes actually read
+   * @throws TTransportException if there was an error reading data
+   */
+  public abstract int read(byte[] buf, int off, int len)
+    throws TTransportException;
+
+  /**
+   * Guarantees that all of len bytes are 
+   *
+   * @param buf Array to read into
+   * @param off Index to start reading at
+   * @param len Maximum number of bytes to read
+   * @return The number of bytes actually read, which must be equal to len
+   * @throws TTransportException if there was an error reading data
+   */
+  public int readAll(byte[] buf, int off, int len)
+    throws TTransportException {
+    int got = 0;
+    int ret = 0;
+    while (got < len) {
+      ret = read(buf, off+got, len-got);
+      if (ret == -1) {
+        throw new TTransportException("Cannot read. Remote side has closed.");
+      }
+      got += ret;
+    }
+    return got;
+  }
+
+  /**
+   * Writes up to len bytes from the buffer.
+   *
+   * @param buf The output data buffer
+   * @param off The offset to start writing from
+   * @param len The number of bytes to write
+   * @throws TTransportException if there was an error writing data
+   */
+  public abstract void write(byte[] buf, int off, int len)
+    throws TTransportException;
+
+  /**
+   * Flush any pending data out of a transport buffer.
+   *
+   * @throws TTransportException if there was an error writing out data.
+   */
+  public void flush() 
+    throws TTransportException {}
+}
diff --git a/lib/java/src/transport/TTransportException.java b/lib/java/src/transport/TTransportException.java
new file mode 100644
index 0000000..a67b6ed
--- /dev/null
+++ b/lib/java/src/transport/TTransportException.java
@@ -0,0 +1,26 @@
+package com.facebook.thrift.transport;
+
+import com.facebook.thrift.TException;
+
+/**
+ * Transport exceptions.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TTransportException extends TException {
+  public TTransportException() {
+    super();
+  }
+
+  public TTransportException(String message) {
+    super(message);
+  }
+
+  public TTransportException(Throwable cause) {
+    super(cause);
+  }
+
+  public TTransportException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}