Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimone Bordet2015-03-24 19:56:26 +0000
committerSimone Bordet2015-03-24 19:56:26 +0000
commitd20c7707b7fc2e82c75732fb944f65764a0b9d1c (patch)
tree14333ba11a2bc38c233723da3e7a79b3cf561ea9 /jetty-client
parent161317a5efc2d641bff61552c634e75e763f40e3 (diff)
parent1dc66b72dd3788c1903e55cf1c8afcb2d4f7d343 (diff)
downloadorg.eclipse.jetty.project-d20c7707b7fc2e82c75732fb944f65764a0b9d1c.tar.gz
org.eclipse.jetty.project-d20c7707b7fc2e82c75732fb944f65764a0b9d1c.tar.xz
org.eclipse.jetty.project-d20c7707b7fc2e82c75732fb944f65764a0b9d1c.zip
Merged branch 'jetty-9.2.x' into 'master'.
Diffstat (limited to 'jetty-client')
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java106
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java220
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java151
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java209
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java36
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java6
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java82
-rw-r--r--jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java7
8 files changed, 521 insertions, 296 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
index 74d762e893..cf18ba24b7 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
@@ -18,74 +18,126 @@
package org.eclipse.jetty.client;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.SpinLock;
public abstract class HttpChannel
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);
- private final AtomicReference<HttpExchange> exchange = new AtomicReference<>();
- private final HttpDestination destination;
+ private final SpinLock _lock = new SpinLock();
+ private final HttpDestination _destination;
+ private HttpExchange _exchange;
protected HttpChannel(HttpDestination destination)
{
- this.destination = destination;
+ this._destination = destination;
}
public HttpDestination getHttpDestination()
{
- return destination;
+ return _destination;
}
- public void associate(HttpExchange exchange)
+ /**
+ * <p>Associates the given {@code exchange} to this channel in order to be sent over the network.</p>
+ * <p>If the association is successful, the exchange can be sent. Otherwise, the channel must be
+ * disposed because whoever terminated the exchange did not do it - it did not have the channel yet.</p>
+ *
+ * @param exchange the exchange to associate
+ * @return true if the association was successful, false otherwise
+ */
+ public boolean associate(HttpExchange exchange)
{
- if (this.exchange.compareAndSet(null, exchange))
+ boolean result = false;
+ boolean abort = true;
+ try (SpinLock.Lock lock = _lock.lock())
{
- exchange.associate(this);
- if (LOG.isDebugEnabled())
- LOG.debug("{} associated to {}", exchange, this);
+ if (_exchange == null)
+ {
+ abort = false;
+ result = exchange.associate(this);
+ if (result)
+ _exchange = exchange;
+ }
}
- else
- {
+
+ if (abort)
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
- }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("{} associated {} to {}", exchange, result, this);
+
+ return result;
}
- public HttpExchange disassociate()
+ public boolean disassociate(HttpExchange exchange)
{
- HttpExchange exchange = this.exchange.getAndSet(null);
- if (exchange != null)
- exchange.disassociate(this);
+ boolean result = false;
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ HttpExchange existing = _exchange;
+ _exchange = null;
+ if (existing == exchange)
+ {
+ existing.disassociate(this);
+ result = true;
+ }
+ }
if (LOG.isDebugEnabled())
- LOG.debug("{} disassociated from {}", exchange, this);
- return exchange;
+ LOG.debug("{} disassociated {} from {}", exchange, result, this);
+ return result;
}
public HttpExchange getHttpExchange()
{
- return exchange.get();
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ return _exchange;
+ }
}
+ protected abstract HttpSender getHttpSender();
+
+ protected abstract HttpReceiver getHttpReceiver();
+
public abstract void send();
- public abstract void proceed(HttpExchange exchange, Throwable failure);
+ public abstract void release();
+
+ public void proceed(HttpExchange exchange, Throwable failure)
+ {
+ getHttpSender().proceed(exchange, failure);
+ }
- public abstract boolean abort(Throwable cause);
+ public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure)
+ {
+ boolean requestAborted = false;
+ if (requestFailure != null)
+ requestAborted = getHttpSender().abort(exchange, requestFailure);
- public abstract boolean abortResponse(Throwable cause);
+ boolean responseAborted = false;
+ if (responseFailure != null)
+ responseAborted = abortResponse(exchange, responseFailure);
+
+ return requestAborted || responseAborted;
+ }
+
+ public boolean abortResponse(HttpExchange exchange, Throwable failure)
+ {
+ return getHttpReceiver().abort(exchange, failure);
+ }
- public void exchangeTerminated(Result result)
+ public void exchangeTerminated(HttpExchange exchange, Result result)
{
- disassociate();
+ disassociate(exchange);
}
@Override
public String toString()
{
- return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), exchange);
+ return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), getHttpExchange());
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java
index 704884c1dc..708b26bce5 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java
@@ -19,7 +19,6 @@
package org.eclipse.jetty.client;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@@ -31,21 +30,14 @@ public class HttpExchange
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);
- private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
private final HttpDestination destination;
private final HttpRequest request;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
-
- enum State
- {
- PENDING, COMPLETED, TERMINATED
- }
-
- ;
private final SpinLock _lock = new SpinLock();
private State requestState = State.PENDING;
private State responseState = State.PENDING;
+ private HttpChannel _channel;
private Throwable requestFailure;
private Throwable responseFailure;
@@ -96,120 +88,187 @@ public class HttpExchange
}
}
- public void associate(HttpChannel channel)
+ /**
+ * <p>Associates the given {@code channel} to this exchange.</p>
+ * <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
+ *
+ * @param channel the channel to associate to this exchange
+ * @return true if the channel could be associated, false otherwise
+ */
+ boolean associate(HttpChannel channel)
{
- if (!this.channel.compareAndSet(null, channel))
- request.abort(new IllegalStateException());
+ boolean result = false;
+ boolean abort = false;
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ // Only associate if the exchange state is initial,
+ // as the exchange could be already failed.
+ if (requestState == State.PENDING && responseState == State.PENDING)
+ {
+ abort = _channel != null;
+ if (!abort)
+ {
+ _channel = channel;
+ result = true;
+ }
+ }
+ }
+
+ if (abort)
+ request.abort(new IllegalStateException(toString()));
+
+ return result;
}
- public void disassociate(HttpChannel channel)
+ void disassociate(HttpChannel channel)
{
- if (!this.channel.compareAndSet(channel, null))
- request.abort(new IllegalStateException());
+ boolean abort = false;
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
+ abort = true;
+ _channel = null;
+ }
+
+ if (abort)
+ request.abort(new IllegalStateException(toString()));
}
- public boolean requestComplete()
+ private HttpChannel getHttpChannel()
{
try (SpinLock.Lock lock = _lock.lock())
{
- if (requestState != State.PENDING)
- return false;
- requestState = State.COMPLETED;
- return true;
+ return _channel;
}
}
- public boolean responseComplete()
+ public boolean requestComplete(Throwable failure)
{
try (SpinLock.Lock lock = _lock.lock())
{
- if (responseState != State.PENDING)
- return false;
- responseState = State.COMPLETED;
- return true;
+ return completeRequest(failure);
}
}
- public Result terminateRequest(Throwable failure)
+ private boolean completeRequest(Throwable failure)
{
- try (SpinLock.Lock lock = _lock.lock())
+ if (requestState == State.PENDING)
{
- requestState = State.TERMINATED;
+ requestState = State.COMPLETED;
requestFailure = failure;
- if (State.TERMINATED.equals(responseState))
- return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
+ return true;
}
- return null;
+ return false;
}
- public Result terminateResponse(Throwable failure)
+ public boolean responseComplete(Throwable failure)
{
try (SpinLock.Lock lock = _lock.lock())
{
- responseState = State.TERMINATED;
- responseFailure = failure;
- if (requestState == State.TERMINATED)
- return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
+ return completeResponse(failure);
}
- return null;
}
-
- public boolean abort(Throwable cause)
+ private boolean completeResponse(Throwable failure)
{
- if (destination.remove(this))
+ if (responseState == State.PENDING)
{
- if (LOG.isDebugEnabled())
- LOG.debug("Aborting while queued {}: {}", this, cause);
- return fail(cause);
+ responseState = State.COMPLETED;
+ responseFailure = failure;
+ return true;
}
- else
- {
- HttpChannel channel = this.channel.get();
- if (channel == null)
- return fail(cause);
+ return false;
+ }
- boolean aborted = channel.abort(cause);
- if (LOG.isDebugEnabled())
- LOG.debug("Aborted ({}) while active {}: {}", aborted, this, cause);
- return aborted;
+ public Result terminateRequest()
+ {
+ Result result = null;
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ if (requestState == State.COMPLETED)
+ requestState = State.TERMINATED;
+ if (requestState == State.TERMINATED && responseState == State.TERMINATED)
+ result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Terminated request for {}, result: {}", this, result);
+
+ return result;
}
- private boolean fail(Throwable cause)
+ public Result terminateResponse()
{
- boolean notify = false;
+ Result result = null;
try (SpinLock.Lock lock = _lock.lock())
{
- if (requestState != State.TERMINATED)
- {
- requestState = State.TERMINATED;
- notify = true;
- requestFailure = cause;
- }
- if (responseState != State.TERMINATED)
- {
+ if (responseState == State.COMPLETED)
responseState = State.TERMINATED;
- notify = true;
- responseFailure = cause;
- }
+ if (requestState == State.TERMINATED && responseState == State.TERMINATED)
+ result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Terminated response for {}, result: {}", this, result);
+
+ return result;
+ }
+
+ public boolean abort(Throwable failure)
+ {
+ // Atomically change the state of this exchange to be completed.
+ // This will avoid that this exchange can be associated to a channel.
+ boolean abortRequest;
+ boolean abortResponse;
+ try (SpinLock.Lock lock = _lock.lock())
+ {
+ abortRequest = completeRequest(failure);
+ abortResponse = completeResponse(failure);
}
- if (notify)
+ if (LOG.isDebugEnabled())
+ LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure);
+
+ if (!abortRequest && !abortResponse)
+ return false;
+
+ // We failed this exchange, deal with it.
+
+ // Case #1: exchange was in the destination queue.
+ if (destination.remove(this))
{
if (LOG.isDebugEnabled())
- LOG.debug("Failing {}: {}", this, cause);
- destination.getRequestNotifier().notifyFailure(request, cause);
- List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
- ResponseNotifier responseNotifier = destination.getResponseNotifier();
- responseNotifier.notifyFailure(listeners, response, cause);
- responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause));
+ LOG.debug("Aborting while queued {}: {}", this, failure);
+ notifyFailureComplete(failure);
return true;
}
- else
+
+ HttpChannel channel = getHttpChannel();
+ if (channel == null)
{
- return false;
+ // Case #2: exchange was not yet associated.
+ // Because this exchange is failed, when associate() is called
+ // it will return false, and the caller will dispose the channel.
+ if (LOG.isDebugEnabled())
+ LOG.debug("Aborted before association {}: {}", this, failure);
+ notifyFailureComplete(failure);
+ return true;
}
+
+ // Case #3: exchange was already associated.
+ boolean aborted = channel.abort(this, abortRequest ? failure : null, abortResponse ? failure : null);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Aborted ({}) while active {}: {}", aborted, this, failure);
+ return aborted;
+ }
+
+ private void notifyFailureComplete(Throwable failure)
+ {
+ destination.getRequestNotifier().notifyFailure(request, failure);
+ List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
+ ResponseNotifier responseNotifier = destination.getResponseNotifier();
+ responseNotifier.notifyFailure(listeners, response, failure);
+ responseNotifier.notifyComplete(listeners, new Result(request, failure, response, failure));
}
public void resetResponse()
@@ -223,7 +282,7 @@ public class HttpExchange
public void proceed(Throwable failure)
{
- HttpChannel channel = this.channel.get();
+ HttpChannel channel = getHttpChannel();
if (channel != null)
channel.proceed(this, failure);
}
@@ -233,11 +292,16 @@ public class HttpExchange
{
try (SpinLock.Lock lock = _lock.lock())
{
- return String.format("%s@%x req=%s/%s res=%s/%s",
+ return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
HttpExchange.class.getSimpleName(),
hashCode(),
- requestState, requestFailure,
- responseState, responseFailure);
+ requestState, requestFailure, requestFailure,
+ responseState, responseFailure, responseFailure);
}
}
+
+ private enum State
+ {
+ PENDING, COMPLETED, TERMINATED
+ }
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
index 10555f9a56..8b5b9fb68d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
@@ -129,10 +129,11 @@ public abstract class HttpReceiver
ResponseNotifier notifier = destination.getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response);
- if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
- terminateResponse(exchange, failure);
+ if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
+ return true;
- return true;
+ terminateResponse(exchange);
+ return false;
}
/**
@@ -193,10 +194,11 @@ public abstract class HttpReceiver
}
}
- if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
- terminateResponse(exchange, failure);
+ if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
+ return true;
- return true;
+ terminateResponse(exchange);
+ return false;
}
protected void storeCookie(URI uri, HttpField field)
@@ -269,10 +271,11 @@ public abstract class HttpReceiver
}
}
- if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
- terminateResponse(exchange, failure);
+ if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
+ return true;
- return true;
+ terminateResponse(exchange);
+ return false;
}
/**
@@ -343,10 +346,11 @@ public abstract class HttpReceiver
}
}
- if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
- terminateResponse(exchange, failure);
+ if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
+ return true;
- return true;
+ terminateResponse(exchange);
+ return false;
}
/**
@@ -362,7 +366,7 @@ public abstract class HttpReceiver
{
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
- boolean completed = exchange.responseComplete();
+ boolean completed = exchange.responseComplete(null);
if (!completed)
return false;
@@ -371,12 +375,13 @@ public abstract class HttpReceiver
// Reset to be ready for another response.
reset();
- // Mark atomically the response as terminated and succeeded,
- // with respect to concurrency between request and response.
- Result result = exchange.terminateResponse(null);
+ // Mark atomically the response as terminated, with
+ // respect to concurrency between request and response.
+ Result result = exchange.terminateResponse();
- // It is important to notify *after* we reset and terminate
- // because the notification may trigger another request/response.
+ // Notify *after* resetting and terminating, because
+ // the notification may trigger another request/response,
+ // or the reset of the response in case of 100-Continue.
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response success {}", response);
@@ -409,60 +414,15 @@ public abstract class HttpReceiver
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
- boolean completed = exchange.responseComplete();
- if (!completed)
- return false;
-
- this.failure = failure;
-
- // Update the state to avoid more response processing.
- boolean fail;
- while (true)
- {
- ResponseState current = responseState.get();
- if (updateResponseState(current, ResponseState.FAILURE))
- {
- fail = current != ResponseState.TRANSIENT;
- break;
- }
- }
-
- dispose();
-
- Result result = failResponse(exchange, failure);
-
- if (fail)
- {
- terminateResponse(exchange, result);
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
- }
-
- return true;
- }
+ if (exchange.responseComplete(failure))
+ return abort(exchange, failure);
- private Result failResponse(HttpExchange exchange, Throwable failure)
- {
- // Mark atomically the response as terminated and failed,
- // with respect to concurrency between request and response.
- Result result = exchange.terminateResponse(failure);
-
- HttpResponse response = exchange.getResponse();
- if (LOG.isDebugEnabled())
- LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
- List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
- ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
- notifier.notifyFailure(listeners, response, failure);
-
- return result;
+ return false;
}
- private void terminateResponse(HttpExchange exchange, Throwable failure)
+ private void terminateResponse(HttpExchange exchange)
{
- Result result = failResponse(exchange, failure);
+ Result result = exchange.terminateResponse();
terminateResponse(exchange, result);
}
@@ -477,14 +437,14 @@ public abstract class HttpReceiver
{
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
- channel.exchangeTerminated(result);
+ channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
- channel.exchangeTerminated(result);
+ channel.exchangeTerminated(exchange, result);
}
}
@@ -512,9 +472,56 @@ public abstract class HttpReceiver
decoder = null;
}
- public boolean abort(Throwable cause)
+ public boolean abort(HttpExchange exchange, Throwable failure)
{
- return responseFailure(cause);
+ // Update the state to avoid more response processing.
+ boolean terminate;
+ out: while (true)
+ {
+ ResponseState current = responseState.get();
+ switch (current)
+ {
+ case FAILURE:
+ {
+ return false;
+ }
+ default:
+ {
+ if (updateResponseState(current, ResponseState.FAILURE))
+ {
+ terminate = current != ResponseState.TRANSIENT;
+ break out;
+ }
+ break;
+ }
+ }
+ }
+
+ this.failure = failure;
+
+ dispose();
+
+ HttpResponse response = exchange.getResponse();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
+ List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
+ ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
+ notifier.notifyFailure(listeners, response, failure);
+
+ if (terminate)
+ {
+ // Mark atomically the response as terminated, with
+ // respect to concurrency between request and response.
+ Result result = exchange.terminateResponse();
+ terminateResponse(exchange, result);
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
+ }
+
+ return true;
}
private boolean updateResponseState(ResponseState from, ResponseState to)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
index 8b515344d6..a7b68e5840 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java
@@ -162,10 +162,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
public void send(HttpExchange exchange)
{
- Request request = exchange.getRequest();
- if (!queuedToBegin(request))
+ if (!queuedToBegin(exchange))
return;
+ Request request = exchange.getRequest();
ContentProvider contentProvider = request.getContent();
HttpContent content = this.content = new HttpContent(contentProvider);
@@ -198,7 +198,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (contentProvider instanceof AsyncContentProvider)
((AsyncContentProvider)contentProvider).setListener(this);
- if (!beginToHeaders(request))
+ if (!beginToHeaders(exchange))
return;
sendHeaders(exchange, content, commitCallback);
@@ -209,46 +209,61 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
}
- protected boolean queuedToBegin(Request request)
+ protected boolean queuedToBegin(HttpExchange exchange)
{
if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
return false;
+
+ Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request begin {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request);
- if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
- terminateRequest(getHttpExchange(), failure);
- return true;
+
+ if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
+ return true;
+
+ terminateRequest(exchange);
+ return false;
}
- protected boolean beginToHeaders(Request request)
+ protected boolean beginToHeaders(HttpExchange exchange)
{
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
return false;
+
+ Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim());
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyHeaders(request);
- if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
- terminateRequest(getHttpExchange(), failure);
- return true;
+
+ if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
+ return true;
+
+ terminateRequest(exchange);
+ return false;
}
- protected boolean headersToCommit(Request request)
+ protected boolean headersToCommit(HttpExchange exchange)
{
if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
return false;
+
+ Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request committed {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyCommit(request);
- if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
- terminateRequest(getHttpExchange(), failure);
- return true;
+
+ if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
+ return true;
+
+ terminateRequest(exchange);
+ return false;
}
- protected boolean someToContent(Request request, ByteBuffer content)
+ protected boolean someToContent(HttpExchange exchange, ByteBuffer content)
{
RequestState current = requestState.get();
switch (current)
@@ -256,15 +271,20 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case COMMIT:
case CONTENT:
{
- if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
+ if (!updateRequestState(current, RequestState.TRANSIENT))
return false;
+
+ Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content));
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyContent(request, content);
- if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
- terminateRequest(getHttpExchange(), failure);
- return true;
+
+ if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT))
+ return true;
+
+ terminateRequest(exchange);
+ return false;
}
default:
{
@@ -283,7 +303,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
// Mark atomically the request as completed, with respect
// to concurrency between request success and request failure.
- boolean completed = exchange.requestComplete();
+ boolean completed = exchange.requestComplete(null);
if (!completed)
return false;
@@ -292,18 +312,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
// Reset to be ready for another request.
reset();
- // Mark atomically the request as terminated and succeeded,
- // with respect to concurrency between request and response.
- Result result = exchange.terminateRequest(null);
-
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request success {}", request);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifySuccess(exchange.getRequest());
+ // Mark atomically the request as terminated, with
+ // respect to concurrency between request and response.
+ Result result = exchange.terminateRequest();
terminateRequest(exchange, null, result);
-
return true;
}
default:
@@ -321,64 +339,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
// Mark atomically the request as completed, with respect
// to concurrency between request success and request failure.
- boolean completed = exchange.requestComplete();
- if (!completed)
- return false;
+ if (exchange.requestComplete(failure))
+ return abort(exchange, failure);
- this.failure = failure;
-
- // Update the state to avoid more request processing.
- RequestState current;
- boolean fail;
- while (true)
- {
- current = requestState.get();
- if (updateRequestState(current, RequestState.FAILURE))
- {
- fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
- break;
- }
- }
-
- dispose();
-
- Result result = failRequest(exchange, failure);
-
- if (fail)
- {
- terminateRequest(exchange, failure, result);
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
- }
-
- return true;
+ return false;
}
- private Result failRequest(HttpExchange exchange, Throwable failure)
+ private void terminateRequest(HttpExchange exchange)
{
- // Mark atomically the request as terminated and failed,
- // with respect to concurrency between request and response.
- Result result = exchange.terminateRequest(failure);
-
- Request request = exchange.getRequest();
- if (LOG.isDebugEnabled())
- LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
- HttpDestination destination = getHttpChannel().getHttpDestination();
- destination.getRequestNotifier().notifyFailure(request, failure);
-
- return result;
- }
-
- private void terminateRequest(HttpExchange exchange, Throwable failure)
- {
- if (exchange != null)
- {
- Result result = failRequest(exchange, failure);
- terminateRequest(exchange, failure, result);
- }
+ // In abort(), the state is updated before the failure is recorded
+ // to avoid to overwrite it, so here we may read a null failure.
+ Throwable failure = this.failure;
+ if (failure == null)
+ failure = new HttpRequestException("Concurrent failure", exchange.getRequest());
+ Result result = exchange.terminateRequest();
+ terminateRequest(exchange, failure, result);
}
private void terminateRequest(HttpExchange exchange, Throwable failure, Result result)
@@ -392,9 +367,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
if (failure != null)
{
- if (LOG.isDebugEnabled())
- LOG.debug("Response failure from request {} {}", request, exchange);
- getHttpChannel().abortResponse(failure);
+ if (exchange.responseComplete(failure))
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Response failure from request {} {}", request, exchange);
+ getHttpChannel().abortResponse(exchange, failure);
+ }
}
}
else
@@ -402,13 +380,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
HttpDestination destination = getHttpChannel().getHttpDestination();
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
if (!ordered)
- channel.exchangeTerminated(result);
+ channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)
- channel.exchangeTerminated(result);
+ channel.exchangeTerminated(exchange, result);
}
}
@@ -528,9 +506,55 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
}
}
- public boolean abort(Throwable failure)
+ public boolean abort(HttpExchange exchange, Throwable failure)
{
- return anyToFailure(failure);
+ // Update the state to avoid more request processing.
+ boolean terminate;
+ out: while (true)
+ {
+ RequestState current = requestState.get();
+ switch (current)
+ {
+ case FAILURE:
+ {
+ return false;
+ }
+ default:
+ {
+ if (updateRequestState(current, RequestState.FAILURE))
+ {
+ terminate = current != RequestState.TRANSIENT;
+ break out;
+ }
+ break;
+ }
+ }
+ }
+
+ this.failure = failure;
+
+ dispose();
+
+ Request request = exchange.getRequest();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
+ HttpDestination destination = getHttpChannel().getHttpDestination();
+ destination.getRequestNotifier().notifyFailure(request, failure);
+
+ if (terminate)
+ {
+ // Mark atomically the request as terminated, with
+ // respect to concurrency between request and response.
+ Result result = exchange.terminateRequest();
+ terminateRequest(exchange, failure, result);
+ }
+ else
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
+ }
+
+ return true;
}
private boolean updateRequestState(RequestState from, RequestState to)
@@ -575,10 +599,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
*/
TRANSIENT,
/**
- * The content transition method is being executed.
- */
- TRANSIENT_CONTENT,
- /**
* The request is queued, the initial state
*/
QUEUED,
@@ -686,8 +706,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (exchange == null)
return;
- Request request = exchange.getRequest();
- if (!headersToCommit(request))
+ if (!headersToCommit(exchange))
return;
HttpContent content = HttpSender.this.content;
@@ -705,7 +724,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
ByteBuffer contentBuffer = content.getContent();
if (contentBuffer != null)
{
- if (!someToContent(request, contentBuffer))
+ if (!someToContent(exchange, contentBuffer))
return;
}
@@ -840,7 +859,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return;
content.succeeded();
ByteBuffer buffer = content.getContent();
- someToContent(exchange.getRequest(), buffer);
+ someToContent(exchange, buffer);
super.succeeded();
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java
index 1b3367bb9e..6578c4438e 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java
@@ -20,6 +20,8 @@ package org.eclipse.jetty.client.http;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
+import org.eclipse.jetty.client.HttpReceiver;
+import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
@@ -51,37 +53,35 @@ public class HttpChannelOverHTTP extends HttpChannel
return new HttpReceiverOverHTTP(this);
}
- public HttpConnectionOverHTTP getHttpConnection()
+ @Override
+ protected HttpSender getHttpSender()
{
- return connection;
+ return sender;
}
@Override
- public void send()
+ protected HttpReceiver getHttpReceiver()
{
- HttpExchange exchange = getHttpExchange();
- if (exchange != null)
- sender.send(exchange);
+ return receiver;
}
- @Override
- public void proceed(HttpExchange exchange, Throwable failure)
+ public HttpConnectionOverHTTP getHttpConnection()
{
- sender.proceed(exchange, failure);
+ return connection;
}
@Override
- public boolean abort(Throwable cause)
+ public void send()
{
- boolean sendAborted = sender.abort(cause);
- boolean receiveAborted = abortResponse(cause);
- return sendAborted || receiveAborted;
+ HttpExchange exchange = getHttpExchange();
+ if (exchange != null)
+ sender.send(exchange);
}
@Override
- public boolean abortResponse(Throwable cause)
+ public void release()
{
- return receiver.abort(cause);
+ connection.release();
}
public void receive()
@@ -90,9 +90,9 @@ public class HttpChannelOverHTTP extends HttpChannel
}
@Override
- public void exchangeTerminated(Result result)
+ public void exchangeTerminated(HttpExchange exchange, Result result)
{
- super.exchangeTerminated(result);
+ super.exchangeTerminated(exchange, result);
Response response = result.getResponse();
HttpFields responseHeaders = response.getHeaders();
@@ -115,7 +115,7 @@ public class HttpChannelOverHTTP extends HttpChannel
if (close)
connection.close();
else
- connection.release();
+ release();
}
@Override
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
index c68bdeceea..1f21ab6478 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
@@ -204,8 +204,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
endPoint.setIdleTimeout(request.getIdleTimeout());
// One channel per connection, just delegate the send
- channel.associate(exchange);
- channel.send();
+ if (channel.associate(exchange))
+ channel.send();
+ else
+ channel.release();
}
@Override
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java
index 304d818e96..7a22d1253c 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java
@@ -23,7 +23,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -176,4 +176,84 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testAbortOnContentBeforeRequestTermination() throws Exception
+ {
+ start(new AbstractHandler()
+ {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ try
+ {
+ baseRequest.setHandled(true);
+ OutputStream output = response.getOutputStream();
+ output.write(1);
+ output.flush();
+ output.write(2);
+ output.flush();
+ }
+ catch (IOException ignored)
+ {
+ // The client may have already closed, and we'll get an exception here, but it's expected
+ }
+ }
+ });
+
+ final CountDownLatch abortLatch = new CountDownLatch(1);
+ final AtomicInteger completes = new AtomicInteger();
+ final CountDownLatch completeLatch = new CountDownLatch(1);
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scheme)
+ .onRequestSuccess(new org.eclipse.jetty.client.api.Request.SuccessListener()
+ {
+ @Override
+ public void onSuccess(org.eclipse.jetty.client.api.Request request)
+ {
+ try
+ {
+ abortLatch.await(5, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException x)
+ {
+ x.printStackTrace();
+ }
+ }
+ })
+ .onResponseContent(new Response.ContentListener()
+ {
+ @Override
+ public void onContent(Response response, ByteBuffer content)
+ {
+ try
+ {
+ response.abort(new Exception());
+ abortLatch.countDown();
+ // Delay to let the request side to finish its processing.
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException x)
+ {
+ x.printStackTrace();
+ }
+ }
+ })
+ .send(new Response.CompleteListener()
+ {
+ @Override
+ public void onComplete(Result result)
+ {
+ completes.incrementAndGet();
+ Assert.assertTrue(result.isFailed());
+ completeLatch.countDown();
+ }
+ });
+ Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
+
+ // Wait to be sure that the complete event is only notified once.
+ Thread.sleep(1000);
+
+ Assert.assertEquals(1, completes.get());
+ }
}
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java
index a15bf47ae5..55217609b5 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java
@@ -76,9 +76,10 @@ public class HttpReceiverOverHTTPTest
HttpRequest request = (HttpRequest)client.newRequest("http://localhost");
FutureResponseListener listener = new FutureResponseListener(request);
HttpExchange exchange = new HttpExchange(destination, request, Collections.<Response.ResponseListener>singletonList(listener));
- connection.getHttpChannel().associate(exchange);
- exchange.requestComplete();
- exchange.terminateRequest(null);
+ boolean associated = connection.getHttpChannel().associate(exchange);
+ Assert.assertTrue(associated);
+ exchange.requestComplete(null);
+ exchange.terminateRequest();
return exchange;
}

Back to the top