Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-12-18 09:56:04 -0500
committerSimone Bordet2015-12-18 09:56:31 -0500
commit988e596c71da79980b66985e0c2d6269424f3154 (patch)
treeb30df425f6dadd9f8dd8dbe0f99e3db478871043
parent0713c17cfa9eb09078fcf497faea282bbfb952a4 (diff)
downloadorg.eclipse.jetty.project-988e596c71da79980b66985e0c2d6269424f3154.tar.gz
org.eclipse.jetty.project-988e596c71da79980b66985e0c2d6269424f3154.tar.xz
org.eclipse.jetty.project-988e596c71da79980b66985e0c2d6269424f3154.zip
484585 - Avoid sending request using a connection that is idle timing out.
Added guard to avoid that the idle timeout expires just before sending the request. Reworked the way idle timeouts are handled, to support the case where the idle timeout just expired and the request can be tried on a different connection/channel.
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java64
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java28
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java34
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/SendFailure.java37
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java38
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java5
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java39
-rw-r--r--jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java35
-rw-r--r--jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpDestinationOverFCGI.java5
-rw-r--r--jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/MultiplexHttpDestinationOverFCGI.java5
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java3
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java37
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java2
-rw-r--r--jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java13
-rw-r--r--jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java6
-rw-r--r--jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java13
-rw-r--r--jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpDestinationOverHTTP2.java5
-rw-r--r--tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java12
-rw-r--r--tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java228
19 files changed, 506 insertions, 103 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
index e859d72eed..e382ddd6a8 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
@@ -23,6 +23,8 @@ import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Connection;
@@ -35,11 +37,15 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
public abstract class HttpConnection implements Connection
{
+ private static final Logger LOG = Log.getLogger(HttpConnection.class);
private static final HttpField CHUNKED_FIELD = new HttpField(HttpHeader.TRANSFER_ENCODING, HttpHeaderValue.CHUNKED);
+ private final AtomicInteger idleTimeoutState = new AtomicInteger();
private final HttpDestination destination;
protected HttpConnection(HttpDestination destination)
@@ -72,10 +78,12 @@ public abstract class HttpConnection implements Connection
HttpExchange exchange = new HttpExchange(getHttpDestination(), (HttpRequest)request, listeners);
- send(exchange);
+ SendFailure result = send(exchange);
+ if (result != null)
+ request.abort(result.failure);
}
- protected abstract void send(HttpExchange exchange);
+ protected abstract SendFailure send(HttpExchange exchange);
protected void normalizeRequest(Request request)
{
@@ -167,6 +175,58 @@ public abstract class HttpConnection implements Connection
return builder;
}
+ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
+ {
+ // Forbid idle timeouts for the time window where
+ // the request is associated to the channel and sent.
+ // Use a counter to support multiplexed requests.
+ boolean send = false;
+ while (true)
+ {
+ int current = idleTimeoutState.get();
+ if (current < 0)
+ break;
+ if (idleTimeoutState.compareAndSet(current, current + 1))
+ {
+ send = true;
+ break;
+ }
+ }
+
+ if (send)
+ {
+ HttpRequest request = exchange.getRequest();
+ SendFailure result;
+ if (channel.associate(exchange))
+ {
+ channel.send();
+ result = null;
+ }
+ else
+ {
+ channel.release();
+ result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
+ }
+ idleTimeoutState.decrementAndGet();
+ return result;
+ }
+ else
+ {
+ return new SendFailure(new TimeoutException(), true);
+ }
+ }
+
+ public boolean onIdleTimeout()
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Idle timeout state {} - {}", idleTimeoutState, this);
+ if (idleTimeoutState.compareAndSet(0, -1))
+ close(new TimeoutException("idle_timeout"));
+ return false;
+ }
+
+ protected abstract void close(Throwable failure);
+
@Override
public String toString()
{
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
index fbf8bd9cb2..e2423276e7 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
@@ -69,7 +69,8 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
}
case CONNECTED:
{
- process(connection);
+ if (process(connection))
+ break;
return;
}
default:
@@ -88,7 +89,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
C connection = this.connection = (C)result;
if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
{
- process(connection);
+ send();
}
else
{
@@ -104,7 +105,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
abort(x);
}
- protected void process(final C connection)
+ protected boolean process(final C connection)
{
while (true)
{
@@ -112,7 +113,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
int count = requestsPerConnection.get();
int next = count + 1;
if (next > max)
- return;
+ return false;
if (requestsPerConnection.compareAndSet(count, next))
{
@@ -122,7 +123,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
if (exchange == null)
{
requestsPerConnection.decrementAndGet();
- return;
+ return false;
}
final Request request = exchange.getRequest();
@@ -139,8 +140,21 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
}
else
{
- send(connection, exchange);
+ SendFailure result = send(connection, exchange);
+ if (result != null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Send failed {} for {}", result, exchange);
+ if (result.retry)
+ {
+ if (enqueue(getHttpExchanges(), exchange))
+ return true;
+ }
+
+ request.abort(result.failure);
+ }
}
+ return getHttpExchanges().peek() != null;
}
}
}
@@ -177,7 +191,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
}
}
- protected abstract void send(C connection, HttpExchange exchange);
+ protected abstract SendFailure send(C connection, HttpExchange exchange);
private enum ConnectState
{
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java
index 3c4e73b1ae..8d9bf08b10 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java
@@ -105,9 +105,15 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
private void process()
{
- C connection = acquire();
- if (connection != null)
- process(connection);
+ while (true)
+ {
+ C connection = acquire();
+ if (connection == null)
+ break;
+ boolean proceed = process(connection);
+ if (!proceed)
+ break;
+ }
}
/**
@@ -118,8 +124,9 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
* <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
*
* @param connection the new connection
+ * @return whether to perform more processing
*/
- public void process(final C connection)
+ public boolean process(final C connection)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
@@ -129,13 +136,13 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
{
if (!connectionPool.release(connection))
connection.close();
-
if (!client.isRunning())
{
if (LOG.isDebugEnabled())
LOG.debug("{} is stopping", client);
connection.close();
}
+ return false;
}
else
{
@@ -152,12 +159,25 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
}
else
{
- send(connection, exchange);
+ SendFailure result = send(connection, exchange);
+ if (result != null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Send failed {} for {}", result, exchange);
+ if (result.retry)
+ {
+ if (enqueue(getHttpExchanges(), exchange))
+ return true;
+ }
+
+ request.abort(result.failure);
+ }
}
+ return getHttpExchanges().peek() != null;
}
}
- protected abstract void send(C connection, HttpExchange exchange);
+ protected abstract SendFailure send(C connection, HttpExchange exchange);
@Override
public void release(Connection c)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SendFailure.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SendFailure.java
new file mode 100644
index 0000000000..1aeaa69213
--- /dev/null
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SendFailure.java
@@ -0,0 +1,37 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.client;
+
+public class SendFailure
+{
+ public final Throwable failure;
+ public final boolean retry;
+
+ public SendFailure(Throwable failure, boolean retry)
+ {
+ this.failure = failure;
+ this.retry = retry;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s[failure=%s,retry=%b]", super.toString(), failure, retry);
+ }
+}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
index 8108e5491c..d9fcb24cd8 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
@@ -19,13 +19,13 @@
package org.eclipse.jetty.client.http;
import java.nio.channels.AsynchronousCloseException;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@@ -76,9 +76,9 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
delegate.send(request, listener);
}
- protected void send(HttpExchange exchange)
+ protected SendFailure send(HttpExchange exchange)
{
- delegate.send(exchange);
+ return delegate.send(exchange);
}
@Override
@@ -97,10 +97,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
@Override
protected boolean onReadTimeout()
{
- if (LOG.isDebugEnabled())
- LOG.debug("Idle timeout {}", this);
- close(new TimeoutException());
- return false;
+ boolean close = delegate.onIdleTimeout();
+ if (!close && !isClosed())
+ fillInterested();
+ return close;
}
@Override
@@ -134,7 +134,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
protected void close(Throwable failure)
{
- if (softClose())
+ if (closed.compareAndSet(false, true))
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
@@ -150,11 +150,6 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
}
}
- public boolean softClose()
- {
- return closed.compareAndSet(false, true);
- }
-
protected boolean abort(Throwable failure)
{
HttpExchange exchange = channel.getHttpExchange();
@@ -191,21 +186,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
}
@Override
- protected void send(HttpExchange exchange)
+ protected SendFailure send(HttpExchange exchange)
{
Request request = exchange.getRequest();
normalizeRequest(request);
- // Save the old idle timeout to restore it
+ // Save the old idle timeout to restore it.
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout());
- // One channel per connection, just delegate the send
- if (channel.associate(exchange))
- channel.send();
- else
- channel.release();
+ // One channel per connection, just delegate the send.
+ return send(channel, exchange);
}
@Override
@@ -215,6 +207,12 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
}
@Override
+ protected void close(Throwable failure)
+ {
+ HttpConnectionOverHTTP.this.close(failure);
+ }
+
+ @Override
public String toString()
{
return HttpConnectionOverHTTP.this.toString();
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java
index 304ba96d35..ca9011ae54 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTP.java
@@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PoolingHttpDestination;
+import org.eclipse.jetty.client.SendFailure;
public class HttpDestinationOverHTTP extends PoolingHttpDestination<HttpConnectionOverHTTP>
{
@@ -31,8 +32,8 @@ public class HttpDestinationOverHTTP extends PoolingHttpDestination<HttpConnecti
}
@Override
- protected void send(HttpConnectionOverHTTP connection, HttpExchange exchange)
+ protected SendFailure send(HttpConnectionOverHTTP connection, HttpExchange exchange)
{
- connection.send(exchange);
+ return connection.send(exchange);
}
}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
index 6117b1be60..a4b471d491 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
@@ -467,37 +467,24 @@ public class HttpClientTest extends AbstractHttpClientServerTest
@Test
public void test_QueuedRequest_IsSent_WhenPreviousRequestClosedConnection() throws Exception
{
- start(new EmptyServerHandler());
+ start(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ if (target.endsWith("/one"))
+ baseRequest.getHttpChannel().getEndPoint().close();
+ else
+ baseRequest.setHandled(true);
+ }
+ });
client.setMaxConnectionsPerDestination(1);
- final long idleTimeout = 1000;
- client.setIdleTimeout(idleTimeout);
- final CountDownLatch latch = new CountDownLatch(3);
+ final CountDownLatch latch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/one")
- .listener(new Request.Listener.Adapter()
- {
- @Override
- public void onBegin(Request request)
- {
- try
- {
- TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
- }
- catch (InterruptedException x)
- {
- x.printStackTrace();
- }
- }
-
- @Override
- public void onFailure(Request request, Throwable failure)
- {
- latch.countDown();
- }
- })
.onResponseFailure((response, failure) -> latch.countDown())
.send(null);
@@ -511,7 +498,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
})
.send(null);
- Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java
index 18aaa22d3d..f9da69f3b4 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java
@@ -24,13 +24,13 @@ import java.nio.channels.AsynchronousCloseException;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@@ -93,9 +93,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
delegate.send(request, listener);
}
- protected void send(HttpExchange exchange)
+ protected SendFailure send(HttpExchange exchange)
{
- delegate.send(exchange);
+ return delegate.send(exchange);
}
@Override
@@ -192,8 +192,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
@Override
protected boolean onReadTimeout()
{
- close(new TimeoutException());
- return false;
+ boolean close = delegate.onIdleTimeout();
+ if (!close && !isClosed())
+ fillInterested();
+ return close;
}
protected void release(HttpChannelOverFCGI channel)
@@ -202,6 +204,11 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
destination.release(this);
}
+ public boolean isClosed()
+ {
+ return closed.get();
+ }
+
@Override
public void close()
{
@@ -217,10 +224,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
getHttpDestination().close(this);
getEndPoint().shutdownOutput();
if (LOG.isDebugEnabled())
- LOG.debug("{} oshut", this);
+ LOG.debug("Shutdown {}", this);
getEndPoint().close();
if (LOG.isDebugEnabled())
- LOG.debug("{} closed", this);
+ LOG.debug("Closed {}", this);
abort(failure);
}
@@ -298,7 +305,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
@Override
- protected void send(HttpExchange exchange)
+ protected SendFailure send(HttpExchange exchange)
{
Request request = exchange.getRequest();
normalizeRequest(request);
@@ -307,10 +314,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
int id = acquireRequest();
HttpChannelOverFCGI channel = newHttpChannel(id, request);
channels.put(id, channel);
- if (channel.associate(exchange))
- channel.send();
- else
- channel.release();
+
+ return send(channel, exchange);
}
@Override
@@ -320,6 +325,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
@Override
+ protected void close(Throwable failure)
+ {
+ HttpConnectionOverFCGI.this.close(failure);
+ }
+
+ @Override
public String toString()
{
return HttpConnectionOverFCGI.this.toString();
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpDestinationOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpDestinationOverFCGI.java
index f6adf480c4..75c19a7efe 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpDestinationOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpDestinationOverFCGI.java
@@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PoolingHttpDestination;
+import org.eclipse.jetty.client.SendFailure;
public class HttpDestinationOverFCGI extends PoolingHttpDestination<HttpConnectionOverFCGI>
{
@@ -31,8 +32,8 @@ public class HttpDestinationOverFCGI extends PoolingHttpDestination<HttpConnecti
}
@Override
- protected void send(HttpConnectionOverFCGI connection, HttpExchange exchange)
+ protected SendFailure send(HttpConnectionOverFCGI connection, HttpExchange exchange)
{
- connection.send(exchange);
+ return connection.send(exchange);
}
}
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/MultiplexHttpDestinationOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/MultiplexHttpDestinationOverFCGI.java
index 77f2259d80..97adce594d 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/MultiplexHttpDestinationOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/MultiplexHttpDestinationOverFCGI.java
@@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.SendFailure;
public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination<HttpConnectionOverFCGI>
{
@@ -31,8 +32,8 @@ public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination<H
}
@Override
- protected void send(HttpConnectionOverFCGI connection, HttpExchange exchange)
+ protected SendFailure send(HttpConnectionOverFCGI connection, HttpExchange exchange)
{
- connection.send(exchange);
+ return connection.send(exchange);
}
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
index 48d5d9ce3a..13629de0b2 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
@@ -117,7 +117,8 @@ public class HTTP2Connection extends AbstractConnection
{
if (LOG.isDebugEnabled())
LOG.debug("Idle timeout {}ms expired on {}", getEndPoint().getIdleTimeout(), this);
- session.onIdleTimeout();
+ if (!session.onIdleTimeout())
+ fillInterested();
return false;
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 64465ab4df..89480da632 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -535,7 +535,13 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
{
- byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
+ byte[] payload = null;
+ if (reason != null)
+ {
+ // Trim the reason to avoid attack vectors.
+ reason = reason.substring(0, Math.min(reason.length(), 32));
+ payload = reason.getBytes(StandardCharsets.UTF_8);
+ }
GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
control(null, callback, frame);
return true;
@@ -826,30 +832,34 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
* stuck because of TCP congestion), therefore we terminate.
* See {@link #onGoAway(GoAwayFrame)}.
*
+ * @return true if the session has been closed, false otherwise
* @see #onGoAway(GoAwayFrame)
* @see #close(int, String, Callback)
* @see #onShutdown()
*/
@Override
- public void onIdleTimeout()
+ public boolean onIdleTimeout()
{
switch (closed.get())
{
case NOT_CLOSED:
{
- // Real idle timeout, just close.
- close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
- break;
+ if (notifyIdleTimeout(this))
+ {
+ close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
+ return true;
+ }
+ return false;
}
case LOCALLY_CLOSED:
case REMOTELY_CLOSED:
{
abort(new TimeoutException());
- break;
+ return true;
}
default:
{
- break;
+ return true;
}
}
}
@@ -974,6 +984,19 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
+ protected boolean notifyIdleTimeout(Session session)
+ {
+ try
+ {
+ return listener.onIdleTimeout(session);
+ }
+ catch (Throwable x)
+ {
+ LOG.info("Failure while notifying listener " + listener, x);
+ return true;
+ }
+ }
+
protected void notifyFailure(Session session, Throwable failure)
{
try
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
index 4e12150976..4a40bfb29a 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java
@@ -118,7 +118,7 @@ public interface ISession extends Session
* @see #onShutdown()
* @see #close(int, String, Callback)
*/
- public void onIdleTimeout();
+ public boolean onIdleTimeout();
/**
* <p>Callback method invoked during an HTTP/1.1 to HTTP/2 upgrade requests
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java
index 16bdc08f3b..89a7959887 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java
@@ -201,6 +201,13 @@ public interface Session
public void onClose(Session session, GoAwayFrame frame);
/**
+ * <p>Callback method invoked when the idle timeout expired.</p>
+ * @param session the session
+ * @return whether the session should be closed
+ */
+ public boolean onIdleTimeout(Session session);
+
+ /**
* <p>Callback method invoked when a failure has been detected for this session.</p>
*
* @param session the session
@@ -246,6 +253,12 @@ public interface Session
}
@Override
+ public boolean onIdleTimeout(Session session)
+ {
+ return true;
+ }
+
+ @Override
public void onFailure(Session session, Throwable failure)
{
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
index 6dd6c23051..3cb97035a4 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
@@ -175,6 +175,12 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
}
@Override
+ public boolean onIdleTimeout(Session session)
+ {
+ return connection.onIdleTimeout();
+ }
+
+ @Override
public void onFailure(Session session, Throwable failure)
{
connection.close(failure);
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
index eb31ac4b13..5c5372f62e 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
@@ -25,6 +25,7 @@ import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
@@ -47,16 +48,15 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
}
@Override
- protected void send(HttpExchange exchange)
+ protected SendFailure send(HttpExchange exchange)
{
normalizeRequest(exchange.getRequest());
+
// One connection maps to N channels, so for each exchange we create a new channel.
HttpChannel channel = newHttpChannel();
channels.add(channel);
- if (channel.associate(exchange))
- channel.send();
- else
- channel.release();
+
+ return send(channel, exchange);
}
protected HttpChannelOverHTTP2 newHttpChannel()
@@ -76,12 +76,13 @@ public class HttpConnectionOverHTTP2 extends HttpConnection
close(new AsynchronousCloseException());
}
+ @Override
protected void close(Throwable failure)
{
// First close then abort, to be sure that the connection cannot be reused
// from an onFailure() handler or by blocking code waiting for completion.
getHttpDestination().close(this);
- session.close(ErrorCode.NO_ERROR.code, null, Callback.NOOP);
+ session.close(ErrorCode.NO_ERROR.code, failure.getMessage(), Callback.NOOP);
abort(failure);
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpDestinationOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpDestinationOverHTTP2.java
index b83d52371a..d9ed502c6e 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpDestinationOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpDestinationOverHTTP2.java
@@ -22,6 +22,7 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.SendFailure;
public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination<HttpConnectionOverHTTP2>
{
@@ -31,8 +32,8 @@ public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination<HttpConne
}
@Override
- protected void send(HttpConnectionOverHTTP2 connection, HttpExchange exchange)
+ protected SendFailure send(HttpConnectionOverHTTP2 connection, HttpExchange exchange)
{
- connection.send(exchange);
+ return connection.send(exchange);
}
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java
index bebe9bb84c..bda662baf5 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java
@@ -73,6 +73,12 @@ public abstract class AbstractTest
public void start(Handler handler) throws Exception
{
+ startServer(handler);
+ startClient();
+ }
+
+ protected void startServer(Handler handler) throws Exception
+ {
sslContextFactory = new SslContextFactory();
sslContextFactory.setKeyStorePath("src/test/resources/keystore.jks");
sslContextFactory.setKeyStorePassword("storepwd");
@@ -80,12 +86,6 @@ public abstract class AbstractTest
sslContextFactory.setTrustStorePassword("storepwd");
sslContextFactory.setUseCipherSuitesOrder(true);
sslContextFactory.setCipherComparator(HTTP2Cipher.COMPARATOR);
- startServer(handler);
- startClient();
- }
-
- private void startServer(Handler handler) throws Exception
- {
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java
new file mode 100644
index 0000000000..8ce1f3f256
--- /dev/null
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpChannelAssociationTest.java
@@ -0,0 +1,228 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http.client;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.HttpDestination;
+import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.api.Connection;
+import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
+import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
+import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
+import org.eclipse.jetty.fcgi.client.http.HttpChannelOverFCGI;
+import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
+import org.eclipse.jetty.fcgi.client.http.HttpConnectionOverFCGI;
+import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.http2.client.http.HttpChannelOverHTTP2;
+import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
+import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2;
+import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.Promise;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class HttpChannelAssociationTest extends AbstractTest
+{
+ public HttpChannelAssociationTest(Transport transport)
+ {
+ super(transport);
+ }
+
+ @Test
+ public void testAssociationFailedAbortsRequest() throws Exception
+ {
+ startServer(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ }
+ });
+
+ client = new HttpClient(newHttpClientTransport(transport, exchange -> false), sslContextFactory);
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ client.setExecutor(clientThreads);
+ client.start();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.newRequest(newURI())
+ .send(result ->
+ {
+ if (result.isFailed())
+ latch.countDown();
+ });
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testIdleTimeoutJustBeforeAssociation() throws Exception
+ {
+ startServer(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ baseRequest.setHandled(true);
+ }
+ });
+
+ long idleTimeout = 1000;
+ client = new HttpClient(newHttpClientTransport(transport, exchange ->
+ {
+ // We idle timeout just before the association,
+ // we must be able to send the request successfully.
+ sleep(2 * idleTimeout);
+ return true;
+ }), sslContextFactory);
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ client.setExecutor(clientThreads);
+ client.setIdleTimeout(idleTimeout);
+ client.start();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ client.newRequest(newURI())
+ .send(result ->
+ {
+ if (result.isSucceeded())
+ latch.countDown();
+ });
+
+ Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
+ }
+
+ private HttpClientTransport newHttpClientTransport(Transport transport, Predicate<HttpExchange> code)
+ {
+ switch (transport)
+ {
+ case HTTP:
+ case HTTPS:
+ {
+ return new HttpClientTransportOverHTTP(1)
+ {
+ @Override
+ protected HttpConnectionOverHTTP newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
+ {
+ return new HttpConnectionOverHTTP(endPoint, destination, promise)
+ {
+ @Override
+ protected HttpChannelOverHTTP newHttpChannel()
+ {
+ return new HttpChannelOverHTTP(this)
+ {
+ @Override
+ public boolean associate(HttpExchange exchange)
+ {
+ return code.test(exchange) && super.associate(exchange);
+ }
+ };
+ }
+ };
+ }
+ };
+ }
+ case H2C:
+ case H2:
+ {
+ HTTP2Client http2Client = new HTTP2Client();
+ http2Client.setSelectors(1);
+ return new HttpClientTransportOverHTTP2(http2Client)
+ {
+ @Override
+ protected HttpConnectionOverHTTP2 newHttpConnection(HttpDestination destination, Session session)
+ {
+ return new HttpConnectionOverHTTP2(destination, session)
+ {
+ @Override
+ protected HttpChannelOverHTTP2 newHttpChannel()
+ {
+ return new HttpChannelOverHTTP2(getHttpDestination(), this, getSession())
+ {
+ @Override
+ public boolean associate(HttpExchange exchange)
+ {
+ return code.test(exchange) && super.associate(exchange);
+ }
+ };
+ }
+ };
+ }
+ };
+ }
+ case FCGI:
+ {
+ return new HttpClientTransportOverFCGI(1, false, "")
+ {
+ @Override
+ protected HttpConnectionOverFCGI newHttpConnection(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
+ {
+ return new HttpConnectionOverFCGI(endPoint, destination, promise, isMultiplexed())
+ {
+ @Override
+ protected HttpChannelOverFCGI newHttpChannel(int id, org.eclipse.jetty.client.api.Request request)
+ {
+ return new HttpChannelOverFCGI(this, getFlusher(), id, request.getIdleTimeout())
+ {
+ @Override
+ public boolean associate(HttpExchange exchange)
+ {
+ return code.test(exchange) && super.associate(exchange);
+ }
+ };
+ }
+ };
+ }
+ };
+ }
+ default:
+ {
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+
+ private void sleep(long time)
+ {
+ try
+ {
+ Thread.sleep(time);
+ }
+ catch (InterruptedException x)
+ {
+ throw new RuntimeException(x);
+ }
+ }
+}

Back to the top