From bbe36c5d35bca5177c2a5472b58d784d93769da1 Mon Sep 17 00:00:00 2001 From: Bryan Duxbury Date: Fri, 5 Nov 2010 17:14:52 +0000 Subject: [PATCH] THRIFT-970. java: Under heavy load, THttpClient may fail with 'too many open files' This patch updates our THttpClient to have two different modes of operation: its current functionality and a new mode that uses Apache's HttpClient library to provide higher throughput and better pooling functionality. Patch: Mathias Herberts git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1031668 13f79535-47bb-0310-9956-ffa450edef68 --- lib/java/ivy.xml | 1 + .../apache/thrift/transport/THttpClient.java | 186 +++++++++++++++++- 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/lib/java/ivy.xml b/lib/java/ivy.xml index cff7136a..99ef9729 100644 --- a/lib/java/ivy.xml +++ b/lib/java/ivy.xml @@ -33,5 +33,6 @@ + diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java index 41923531..0d39ff04 100644 --- a/lib/java/src/org/apache/thrift/transport/THttpClient.java +++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.IOException; @@ -28,17 +29,46 @@ import java.net.HttpURLConnection; import java.util.HashMap; import java.util.Map; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.params.CoreConnectionPNames; + /** * HTTP implementation of the TTransport interface. Used for working with a - * Thrift web services implementation. + * Thrift web services implementation (using for example TServlet). + * + * This class offers two implementations of the HTTP transport. + * One uses HttpURLConnection instances, the other HttpClient from Apache + * Http Components. + * The chosen implementation depends on the constructor used to + * create the THttpClient instance. + * Using the THttpClient(String url) constructor or passing null as the + * HttpClient to THttpClient(String url, HttpClient client) will create an + * instance which will use HttpURLConnection. + * + * When using HttpClient, the following configuration leads to 5-15% + * better performance than the HttpURLConnection implementation: + * + * http.protocol.version=HttpVersion.HTTP_1_1 + * http.protocol.content-charset=UTF-8 + * http.protocol.expect-continue=false + * http.connection.stalecheck=false * + * Also note that under high load, the HttpURLConnection implementation + * may exhaust the open file descriptor limit. + * + * @see THRIFT-970 */ + public class THttpClient extends TTransport { private URL url_ = null; - private final ByteArrayOutputStream requestBuffer_ = - new ByteArrayOutputStream(); + private final ByteArrayOutputStream requestBuffer_ = new ByteArrayOutputStream(); private InputStream inputStream_ = null; @@ -48,9 +78,54 @@ public class THttpClient extends TTransport { private Map customHeaders_ = null; + private final HttpHost host; + + private final HttpClient client; + + public static class Factory extends TTransportFactory { + + private final String url; + private final HttpClient client; + + public Factory(String url) { + this.url = url; + this.client = null; + } + + public Factory(String url, HttpClient client) { + this.url = url; + this.client = client; + } + + @Override + public TTransport getTransport(TTransport trans) { + try { + if (null != client) { + return new THttpClient(url, client); + } else { + return new THttpClient(url); + } + } catch (TTransportException tte) { + return null; + } + } + } + public THttpClient(String url) throws TTransportException { try { url_ = new URL(url); + this.client = null; + this.host = null; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public THttpClient(String url, HttpClient client) throws TTransportException { + try { + url_ = new URL(url); + this.client = client; + this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol()); } catch (IOException iox) { throw new TTransportException(iox); } @@ -58,10 +133,20 @@ public class THttpClient extends TTransport { public void setConnectTimeout(int timeout) { connectTimeout_ = timeout; + if (null != this.client) { + // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the + // same HttpClient is used for something else. + client.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, connectTimeout_); + } } public void setReadTimeout(int timeout) { readTimeout_ = timeout; + if (null != this.client) { + // WARNING, this modifies the HttpClient params, this might have an impact elsewhere if the + // same HttpClient is used for something else. + client.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT, readTimeout_); + } } public void setCustomHeaders(Map headers) { @@ -111,7 +196,102 @@ public class THttpClient extends TTransport { requestBuffer_.write(buf, off, len); } + private void flushUsingHttpClient() throws TTransportException { + + if (null == this.client) { + throw new TTransportException("Null HttpClient, aborting."); + } + + // Extract request and reset buffer + byte[] data = requestBuffer_.toByteArray(); + requestBuffer_.reset(); + + HttpPost post = null; + + InputStream is = null; + + try { + // Set request to path + query string + post = new HttpPost(this.url_.getFile()); + + // + // Headers are added to the HttpPost instance, not + // to HttpClient. + // + + post.setHeader("Content-Type", "application/x-thrift"); + post.setHeader("Accept", "application/x-thrift"); + post.setHeader("User-Agent", "Java/THttpClient/HC"); + + if (null != customHeaders_) { + for (Map.Entry header : customHeaders_.entrySet()) { + post.setHeader(header.getKey(), header.getValue()); + } + } + + post.setEntity(new ByteArrayEntity(data)); + + HttpResponse response = this.client.execute(this.host, post); + int responseCode = response.getStatusLine().getStatusCode(); + + if (responseCode != HttpStatus.SC_OK) { + throw new TTransportException("HTTP Response code: " + responseCode); + } + + // Read the responses into a byte array so we can release the connection + // early. This implies that the whole content will have to be read in + // memory, and that momentarly we might use up twice the memory (while the + // thrift struct is being read up the chain). + // Proceeding differently might lead to exhaustion of connections and thus + // to app failure. + + is = response.getEntity().getContent(); + + byte[] buf = new byte[1024]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + int len = 0; + do { + len = is.read(buf); + if (len > 0) { + baos.write(buf, 0, len); + } + } while (-1 != len); + + try { + // Indicate we're done with the content. + response.getEntity().consumeContent(); + } catch (IOException ioe) { + // We ignore this exception, it might only mean the server has no + // keep-alive capability. + } + + inputStream_ = new ByteArrayInputStream(baos.toByteArray()); + } catch (IOException ioe) { + // Abort method so the connection gets released back to the connection manager + if (null != post) { + post.abort(); + } + throw new TTransportException(ioe); + } finally { + if (null != is) { + // Close the entity's input stream, this will release the underlying connection + try { + is.close(); + } catch (IOException ioe) { + throw new TTransportException(ioe); + } + } + } + } + public void flush() throws TTransportException { + + if (null != this.client) { + flushUsingHttpClient(); + return; + } + // Extract request and reset buffer byte[] data = requestBuffer_.toByteArray(); requestBuffer_.reset(); -- 2.17.1