diff options
author | Simone Bordet | 2015-03-24 19:56:26 +0000 |
---|---|---|
committer | Simone Bordet | 2015-03-24 19:56:26 +0000 |
commit | d20c7707b7fc2e82c75732fb944f65764a0b9d1c (patch) | |
tree | 14333ba11a2bc38c233723da3e7a79b3cf561ea9 /jetty-client | |
parent | 161317a5efc2d641bff61552c634e75e763f40e3 (diff) | |
parent | 1dc66b72dd3788c1903e55cf1c8afcb2d4f7d343 (diff) | |
download | org.eclipse.jetty.project-d20c7707b7fc2e82c75732fb944f65764a0b9d1c.tar.gz org.eclipse.jetty.project-d20c7707b7fc2e82c75732fb944f65764a0b9d1c.tar.xz org.eclipse.jetty.project-d20c7707b7fc2e82c75732fb944f65764a0b9d1c.zip |
Merged branch 'jetty-9.2.x' into 'master'.
Diffstat (limited to 'jetty-client')
8 files changed, 521 insertions, 296 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java index 74d762e893..cf18ba24b7 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java @@ -18,74 +18,126 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicReference; - import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.SpinLock; public abstract class HttpChannel { protected static final Logger LOG = Log.getLogger(HttpChannel.class); - private final AtomicReference<HttpExchange> exchange = new AtomicReference<>(); - private final HttpDestination destination; + private final SpinLock _lock = new SpinLock(); + private final HttpDestination _destination; + private HttpExchange _exchange; protected HttpChannel(HttpDestination destination) { - this.destination = destination; + this._destination = destination; } public HttpDestination getHttpDestination() { - return destination; + return _destination; } - public void associate(HttpExchange exchange) + /** + * <p>Associates the given {@code exchange} to this channel in order to be sent over the network.</p> + * <p>If the association is successful, the exchange can be sent. Otherwise, the channel must be + * disposed because whoever terminated the exchange did not do it - it did not have the channel yet.</p> + * + * @param exchange the exchange to associate + * @return true if the association was successful, false otherwise + */ + public boolean associate(HttpExchange exchange) { - if (this.exchange.compareAndSet(null, exchange)) + boolean result = false; + boolean abort = true; + try (SpinLock.Lock lock = _lock.lock()) { - exchange.associate(this); - if (LOG.isDebugEnabled()) - LOG.debug("{} associated to {}", exchange, this); + if (_exchange == null) + { + abort = false; + result = exchange.associate(this); + if (result) + _exchange = exchange; + } } - else - { + + if (abort) exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported")); - } + + if (LOG.isDebugEnabled()) + LOG.debug("{} associated {} to {}", exchange, result, this); + + return result; } - public HttpExchange disassociate() + public boolean disassociate(HttpExchange exchange) { - HttpExchange exchange = this.exchange.getAndSet(null); - if (exchange != null) - exchange.disassociate(this); + boolean result = false; + try (SpinLock.Lock lock = _lock.lock()) + { + HttpExchange existing = _exchange; + _exchange = null; + if (existing == exchange) + { + existing.disassociate(this); + result = true; + } + } if (LOG.isDebugEnabled()) - LOG.debug("{} disassociated from {}", exchange, this); - return exchange; + LOG.debug("{} disassociated {} from {}", exchange, result, this); + return result; } public HttpExchange getHttpExchange() { - return exchange.get(); + try (SpinLock.Lock lock = _lock.lock()) + { + return _exchange; + } } + protected abstract HttpSender getHttpSender(); + + protected abstract HttpReceiver getHttpReceiver(); + public abstract void send(); - public abstract void proceed(HttpExchange exchange, Throwable failure); + public abstract void release(); + + public void proceed(HttpExchange exchange, Throwable failure) + { + getHttpSender().proceed(exchange, failure); + } - public abstract boolean abort(Throwable cause); + public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure) + { + boolean requestAborted = false; + if (requestFailure != null) + requestAborted = getHttpSender().abort(exchange, requestFailure); - public abstract boolean abortResponse(Throwable cause); + boolean responseAborted = false; + if (responseFailure != null) + responseAborted = abortResponse(exchange, responseFailure); + + return requestAborted || responseAborted; + } + + public boolean abortResponse(HttpExchange exchange, Throwable failure) + { + return getHttpReceiver().abort(exchange, failure); + } - public void exchangeTerminated(Result result) + public void exchangeTerminated(HttpExchange exchange, Result result) { - disassociate(); + disassociate(exchange); } @Override public String toString() { - return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), exchange); + return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), getHttpExchange()); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 704884c1dc..708b26bce5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.client; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -31,21 +30,14 @@ public class HttpExchange { private static final Logger LOG = Log.getLogger(HttpExchange.class); - private final AtomicReference<HttpChannel> channel = new AtomicReference<>(); private final HttpDestination destination; private final HttpRequest request; private final List<Response.ResponseListener> listeners; private final HttpResponse response; - - enum State - { - PENDING, COMPLETED, TERMINATED - } - - ; private final SpinLock _lock = new SpinLock(); private State requestState = State.PENDING; private State responseState = State.PENDING; + private HttpChannel _channel; private Throwable requestFailure; private Throwable responseFailure; @@ -96,120 +88,187 @@ public class HttpExchange } } - public void associate(HttpChannel channel) + /** + * <p>Associates the given {@code channel} to this exchange.</p> + * <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p> + * + * @param channel the channel to associate to this exchange + * @return true if the channel could be associated, false otherwise + */ + boolean associate(HttpChannel channel) { - if (!this.channel.compareAndSet(null, channel)) - request.abort(new IllegalStateException()); + boolean result = false; + boolean abort = false; + try (SpinLock.Lock lock = _lock.lock()) + { + // Only associate if the exchange state is initial, + // as the exchange could be already failed. + if (requestState == State.PENDING && responseState == State.PENDING) + { + abort = _channel != null; + if (!abort) + { + _channel = channel; + result = true; + } + } + } + + if (abort) + request.abort(new IllegalStateException(toString())); + + return result; } - public void disassociate(HttpChannel channel) + void disassociate(HttpChannel channel) { - if (!this.channel.compareAndSet(channel, null)) - request.abort(new IllegalStateException()); + boolean abort = false; + try (SpinLock.Lock lock = _lock.lock()) + { + if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED) + abort = true; + _channel = null; + } + + if (abort) + request.abort(new IllegalStateException(toString())); } - public boolean requestComplete() + private HttpChannel getHttpChannel() { try (SpinLock.Lock lock = _lock.lock()) { - if (requestState != State.PENDING) - return false; - requestState = State.COMPLETED; - return true; + return _channel; } } - public boolean responseComplete() + public boolean requestComplete(Throwable failure) { try (SpinLock.Lock lock = _lock.lock()) { - if (responseState != State.PENDING) - return false; - responseState = State.COMPLETED; - return true; + return completeRequest(failure); } } - public Result terminateRequest(Throwable failure) + private boolean completeRequest(Throwable failure) { - try (SpinLock.Lock lock = _lock.lock()) + if (requestState == State.PENDING) { - requestState = State.TERMINATED; + requestState = State.COMPLETED; requestFailure = failure; - if (State.TERMINATED.equals(responseState)) - return new Result(getRequest(), requestFailure, getResponse(), responseFailure); + return true; } - return null; + return false; } - public Result terminateResponse(Throwable failure) + public boolean responseComplete(Throwable failure) { try (SpinLock.Lock lock = _lock.lock()) { - responseState = State.TERMINATED; - responseFailure = failure; - if (requestState == State.TERMINATED) - return new Result(getRequest(), requestFailure, getResponse(), responseFailure); + return completeResponse(failure); } - return null; } - - public boolean abort(Throwable cause) + private boolean completeResponse(Throwable failure) { - if (destination.remove(this)) + if (responseState == State.PENDING) { - if (LOG.isDebugEnabled()) - LOG.debug("Aborting while queued {}: {}", this, cause); - return fail(cause); + responseState = State.COMPLETED; + responseFailure = failure; + return true; } - else - { - HttpChannel channel = this.channel.get(); - if (channel == null) - return fail(cause); + return false; + } - boolean aborted = channel.abort(cause); - if (LOG.isDebugEnabled()) - LOG.debug("Aborted ({}) while active {}: {}", aborted, this, cause); - return aborted; + public Result terminateRequest() + { + Result result = null; + try (SpinLock.Lock lock = _lock.lock()) + { + if (requestState == State.COMPLETED) + requestState = State.TERMINATED; + if (requestState == State.TERMINATED && responseState == State.TERMINATED) + result = new Result(getRequest(), requestFailure, getResponse(), responseFailure); } + + if (LOG.isDebugEnabled()) + LOG.debug("Terminated request for {}, result: {}", this, result); + + return result; } - private boolean fail(Throwable cause) + public Result terminateResponse() { - boolean notify = false; + Result result = null; try (SpinLock.Lock lock = _lock.lock()) { - if (requestState != State.TERMINATED) - { - requestState = State.TERMINATED; - notify = true; - requestFailure = cause; - } - if (responseState != State.TERMINATED) - { + if (responseState == State.COMPLETED) responseState = State.TERMINATED; - notify = true; - responseFailure = cause; - } + if (requestState == State.TERMINATED && responseState == State.TERMINATED) + result = new Result(getRequest(), requestFailure, getResponse(), responseFailure); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Terminated response for {}, result: {}", this, result); + + return result; + } + + public boolean abort(Throwable failure) + { + // Atomically change the state of this exchange to be completed. + // This will avoid that this exchange can be associated to a channel. + boolean abortRequest; + boolean abortResponse; + try (SpinLock.Lock lock = _lock.lock()) + { + abortRequest = completeRequest(failure); + abortResponse = completeResponse(failure); } - if (notify) + if (LOG.isDebugEnabled()) + LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure); + + if (!abortRequest && !abortResponse) + return false; + + // We failed this exchange, deal with it. + + // Case #1: exchange was in the destination queue. + if (destination.remove(this)) { if (LOG.isDebugEnabled()) - LOG.debug("Failing {}: {}", this, cause); - destination.getRequestNotifier().notifyFailure(request, cause); - List<Response.ResponseListener> listeners = getConversation().getResponseListeners(); - ResponseNotifier responseNotifier = destination.getResponseNotifier(); - responseNotifier.notifyFailure(listeners, response, cause); - responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause)); + LOG.debug("Aborting while queued {}: {}", this, failure); + notifyFailureComplete(failure); return true; } - else + + HttpChannel channel = getHttpChannel(); + if (channel == null) { - return false; + // Case #2: exchange was not yet associated. + // Because this exchange is failed, when associate() is called + // it will return false, and the caller will dispose the channel. + if (LOG.isDebugEnabled()) + LOG.debug("Aborted before association {}: {}", this, failure); + notifyFailureComplete(failure); + return true; } + + // Case #3: exchange was already associated. + boolean aborted = channel.abort(this, abortRequest ? failure : null, abortResponse ? failure : null); + if (LOG.isDebugEnabled()) + LOG.debug("Aborted ({}) while active {}: {}", aborted, this, failure); + return aborted; + } + + private void notifyFailureComplete(Throwable failure) + { + destination.getRequestNotifier().notifyFailure(request, failure); + List<Response.ResponseListener> listeners = getConversation().getResponseListeners(); + ResponseNotifier responseNotifier = destination.getResponseNotifier(); + responseNotifier.notifyFailure(listeners, response, failure); + responseNotifier.notifyComplete(listeners, new Result(request, failure, response, failure)); } public void resetResponse() @@ -223,7 +282,7 @@ public class HttpExchange public void proceed(Throwable failure) { - HttpChannel channel = this.channel.get(); + HttpChannel channel = getHttpChannel(); if (channel != null) channel.proceed(this, failure); } @@ -233,11 +292,16 @@ public class HttpExchange { try (SpinLock.Lock lock = _lock.lock()) { - return String.format("%s@%x req=%s/%s res=%s/%s", + return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h", HttpExchange.class.getSimpleName(), hashCode(), - requestState, requestFailure, - responseState, responseFailure); + requestState, requestFailure, requestFailure, + responseState, responseFailure, responseFailure); } } + + private enum State + { + PENDING, COMPLETED, TERMINATED + } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 10555f9a56..8b5b9fb68d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -129,10 +129,11 @@ public abstract class HttpReceiver ResponseNotifier notifier = destination.getResponseNotifier(); notifier.notifyBegin(conversation.getResponseListeners(), response); - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) + return true; - return true; + terminateResponse(exchange); + return false; } /** @@ -193,10 +194,11 @@ public abstract class HttpReceiver } } - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) + return true; - return true; + terminateResponse(exchange); + return false; } protected void storeCookie(URI uri, HttpField field) @@ -269,10 +271,11 @@ public abstract class HttpReceiver } } - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS)) + return true; - return true; + terminateResponse(exchange); + return false; } /** @@ -343,10 +346,11 @@ public abstract class HttpReceiver } } - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + return true; - return true; + terminateResponse(exchange); + return false; } /** @@ -362,7 +366,7 @@ public abstract class HttpReceiver { // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. - boolean completed = exchange.responseComplete(); + boolean completed = exchange.responseComplete(null); if (!completed) return false; @@ -371,12 +375,13 @@ public abstract class HttpReceiver // Reset to be ready for another response. reset(); - // Mark atomically the response as terminated and succeeded, - // with respect to concurrency between request and response. - Result result = exchange.terminateResponse(null); + // Mark atomically the response as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateResponse(); - // It is important to notify *after* we reset and terminate - // because the notification may trigger another request/response. + // Notify *after* resetting and terminating, because + // the notification may trigger another request/response, + // or the reset of the response in case of 100-Continue. HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) LOG.debug("Response success {}", response); @@ -409,60 +414,15 @@ public abstract class HttpReceiver // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. - boolean completed = exchange.responseComplete(); - if (!completed) - return false; - - this.failure = failure; - - // Update the state to avoid more response processing. - boolean fail; - while (true) - { - ResponseState current = responseState.get(); - if (updateResponseState(current, ResponseState.FAILURE)) - { - fail = current != ResponseState.TRANSIENT; - break; - } - } - - dispose(); - - Result result = failResponse(exchange, failure); - - if (fail) - { - terminateResponse(exchange, result); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); - } - - return true; - } + if (exchange.responseComplete(failure)) + return abort(exchange, failure); - private Result failResponse(HttpExchange exchange, Throwable failure) - { - // Mark atomically the response as terminated and failed, - // with respect to concurrency between request and response. - Result result = exchange.terminateResponse(failure); - - HttpResponse response = exchange.getResponse(); - if (LOG.isDebugEnabled()) - LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure); - List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners(); - ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); - notifier.notifyFailure(listeners, response, failure); - - return result; + return false; } - private void terminateResponse(HttpExchange exchange, Throwable failure) + private void terminateResponse(HttpExchange exchange) { - Result result = failResponse(exchange, failure); + Result result = exchange.terminateResponse(); terminateResponse(exchange, result); } @@ -477,14 +437,14 @@ public abstract class HttpReceiver { boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); if (!ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); if (LOG.isDebugEnabled()) LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners(); ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyComplete(listeners, result); if (ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); } } @@ -512,9 +472,56 @@ public abstract class HttpReceiver decoder = null; } - public boolean abort(Throwable cause) + public boolean abort(HttpExchange exchange, Throwable failure) { - return responseFailure(cause); + // Update the state to avoid more response processing. + boolean terminate; + out: while (true) + { + ResponseState current = responseState.get(); + switch (current) + { + case FAILURE: + { + return false; + } + default: + { + if (updateResponseState(current, ResponseState.FAILURE)) + { + terminate = current != ResponseState.TRANSIENT; + break out; + } + break; + } + } + } + + this.failure = failure; + + dispose(); + + HttpResponse response = exchange.getResponse(); + if (LOG.isDebugEnabled()) + LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure); + List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners(); + ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); + notifier.notifyFailure(listeners, response, failure); + + if (terminate) + { + // Mark atomically the response as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateResponse(); + terminateResponse(exchange, result); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); + } + + return true; } private boolean updateResponseState(ResponseState from, ResponseState to) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 8b515344d6..a7b68e5840 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -162,10 +162,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener public void send(HttpExchange exchange) { - Request request = exchange.getRequest(); - if (!queuedToBegin(request)) + if (!queuedToBegin(exchange)) return; + Request request = exchange.getRequest(); ContentProvider contentProvider = request.getContent(); HttpContent content = this.content = new HttpContent(contentProvider); @@ -198,7 +198,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (contentProvider instanceof AsyncContentProvider) ((AsyncContentProvider)contentProvider).setListener(this); - if (!beginToHeaders(request)) + if (!beginToHeaders(exchange)) return; sendHeaders(exchange, content, commitCallback); @@ -209,46 +209,61 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()); } - protected boolean queuedToBegin(Request request) + protected boolean queuedToBegin(HttpExchange exchange) { if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request begin {}", request); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyBegin(request); - if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN)) + return true; + + terminateRequest(exchange); + return false; } - protected boolean beginToHeaders(Request request) + protected boolean beginToHeaders(HttpExchange exchange) { if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim()); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyHeaders(request); - if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS)) + return true; + + terminateRequest(exchange); + return false; } - protected boolean headersToCommit(Request request) + protected boolean headersToCommit(HttpExchange exchange) { if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request committed {}", request); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyCommit(request); - if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT)) + return true; + + terminateRequest(exchange); + return false; } - protected boolean someToContent(Request request, ByteBuffer content) + protected boolean someToContent(HttpExchange exchange, ByteBuffer content) { RequestState current = requestState.get(); switch (current) @@ -256,15 +271,20 @@ public abstract class HttpSender implements AsyncContentProvider.Listener case COMMIT: case CONTENT: { - if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT)) + if (!updateRequestState(current, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content)); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyContent(request, content); - if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT)) + return true; + + terminateRequest(exchange); + return false; } default: { @@ -283,7 +303,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { // Mark atomically the request as completed, with respect // to concurrency between request success and request failure. - boolean completed = exchange.requestComplete(); + boolean completed = exchange.requestComplete(null); if (!completed) return false; @@ -292,18 +312,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener // Reset to be ready for another request. reset(); - // Mark atomically the request as terminated and succeeded, - // with respect to concurrency between request and response. - Result result = exchange.terminateRequest(null); - Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request success {}", request); HttpDestination destination = getHttpChannel().getHttpDestination(); destination.getRequestNotifier().notifySuccess(exchange.getRequest()); + // Mark atomically the request as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateRequest(); terminateRequest(exchange, null, result); - return true; } default: @@ -321,64 +339,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener // Mark atomically the request as completed, with respect // to concurrency between request success and request failure. - boolean completed = exchange.requestComplete(); - if (!completed) - return false; + if (exchange.requestComplete(failure)) + return abort(exchange, failure); - this.failure = failure; - - // Update the state to avoid more request processing. - RequestState current; - boolean fail; - while (true) - { - current = requestState.get(); - if (updateRequestState(current, RequestState.FAILURE)) - { - fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT; - break; - } - } - - dispose(); - - Result result = failRequest(exchange, failure); - - if (fail) - { - terminateRequest(exchange, failure, result); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); - } - - return true; + return false; } - private Result failRequest(HttpExchange exchange, Throwable failure) + private void terminateRequest(HttpExchange exchange) { - // Mark atomically the request as terminated and failed, - // with respect to concurrency between request and response. - Result result = exchange.terminateRequest(failure); - - Request request = exchange.getRequest(); - if (LOG.isDebugEnabled()) - LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure); - HttpDestination destination = getHttpChannel().getHttpDestination(); - destination.getRequestNotifier().notifyFailure(request, failure); - - return result; - } - - private void terminateRequest(HttpExchange exchange, Throwable failure) - { - if (exchange != null) - { - Result result = failRequest(exchange, failure); - terminateRequest(exchange, failure, result); - } + // In abort(), the state is updated before the failure is recorded + // to avoid to overwrite it, so here we may read a null failure. + Throwable failure = this.failure; + if (failure == null) + failure = new HttpRequestException("Concurrent failure", exchange.getRequest()); + Result result = exchange.terminateRequest(); + terminateRequest(exchange, failure, result); } private void terminateRequest(HttpExchange exchange, Throwable failure, Result result) @@ -392,9 +367,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { if (failure != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Response failure from request {} {}", request, exchange); - getHttpChannel().abortResponse(failure); + if (exchange.responseComplete(failure)) + { + if (LOG.isDebugEnabled()) + LOG.debug("Response failure from request {} {}", request, exchange); + getHttpChannel().abortResponse(exchange, failure); + } } } else @@ -402,13 +380,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener HttpDestination destination = getHttpChannel().getHttpDestination(); boolean ordered = destination.getHttpClient().isStrictEventOrdering(); if (!ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); if (LOG.isDebugEnabled()) LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); HttpConversation conversation = exchange.getConversation(); destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result); if (ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); } } @@ -528,9 +506,55 @@ public abstract class HttpSender implements AsyncContentProvider.Listener } } - public boolean abort(Throwable failure) + public boolean abort(HttpExchange exchange, Throwable failure) { - return anyToFailure(failure); + // Update the state to avoid more request processing. + boolean terminate; + out: while (true) + { + RequestState current = requestState.get(); + switch (current) + { + case FAILURE: + { + return false; + } + default: + { + if (updateRequestState(current, RequestState.FAILURE)) + { + terminate = current != RequestState.TRANSIENT; + break out; + } + break; + } + } + } + + this.failure = failure; + + dispose(); + + Request request = exchange.getRequest(); + if (LOG.isDebugEnabled()) + LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure); + HttpDestination destination = getHttpChannel().getHttpDestination(); + destination.getRequestNotifier().notifyFailure(request, failure); + + if (terminate) + { + // Mark atomically the request as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateRequest(); + terminateRequest(exchange, failure, result); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); + } + + return true; } private boolean updateRequestState(RequestState from, RequestState to) @@ -575,10 +599,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener */ TRANSIENT, /** - * The content transition method is being executed. - */ - TRANSIENT_CONTENT, - /** * The request is queued, the initial state */ QUEUED, @@ -686,8 +706,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (exchange == null) return; - Request request = exchange.getRequest(); - if (!headersToCommit(request)) + if (!headersToCommit(exchange)) return; HttpContent content = HttpSender.this.content; @@ -705,7 +724,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener ByteBuffer contentBuffer = content.getContent(); if (contentBuffer != null) { - if (!someToContent(request, contentBuffer)) + if (!someToContent(exchange, contentBuffer)) return; } @@ -840,7 +859,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return; content.succeeded(); ByteBuffer buffer = content.getContent(); - someToContent(exchange.getRequest(), buffer); + someToContent(exchange, buffer); super.succeeded(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java index 1b3367bb9e..6578c4438e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.client.http; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpReceiver; +import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http.HttpFields; @@ -51,37 +53,35 @@ public class HttpChannelOverHTTP extends HttpChannel return new HttpReceiverOverHTTP(this); } - public HttpConnectionOverHTTP getHttpConnection() + @Override + protected HttpSender getHttpSender() { - return connection; + return sender; } @Override - public void send() + protected HttpReceiver getHttpReceiver() { - HttpExchange exchange = getHttpExchange(); - if (exchange != null) - sender.send(exchange); + return receiver; } - @Override - public void proceed(HttpExchange exchange, Throwable failure) + public HttpConnectionOverHTTP getHttpConnection() { - sender.proceed(exchange, failure); + return connection; } @Override - public boolean abort(Throwable cause) + public void send() { - boolean sendAborted = sender.abort(cause); - boolean receiveAborted = abortResponse(cause); - return sendAborted || receiveAborted; + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + sender.send(exchange); } @Override - public boolean abortResponse(Throwable cause) + public void release() { - return receiver.abort(cause); + connection.release(); } public void receive() @@ -90,9 +90,9 @@ public class HttpChannelOverHTTP extends HttpChannel } @Override - public void exchangeTerminated(Result result) + public void exchangeTerminated(HttpExchange exchange, Result result) { - super.exchangeTerminated(result); + super.exchangeTerminated(exchange, result); Response response = result.getResponse(); HttpFields responseHeaders = response.getHeaders(); @@ -115,7 +115,7 @@ public class HttpChannelOverHTTP extends HttpChannel if (close) connection.close(); else - connection.release(); + release(); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index c68bdeceea..1f21ab6478 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -204,8 +204,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec endPoint.setIdleTimeout(request.getIdleTimeout()); // One channel per connection, just delegate the send - channel.associate(exchange); - channel.send(); + if (channel.associate(exchange)) + channel.send(); + else + channel.release(); } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java index 304d818e96..7a22d1253c 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java @@ -23,7 +23,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -176,4 +176,84 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest }); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testAbortOnContentBeforeRequestTermination() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + OutputStream output = response.getOutputStream(); + output.write(1); + output.flush(); + output.write(2); + output.flush(); + } + catch (IOException ignored) + { + // The client may have already closed, and we'll get an exception here, but it's expected + } + } + }); + + final CountDownLatch abortLatch = new CountDownLatch(1); + final AtomicInteger completes = new AtomicInteger(); + final CountDownLatch completeLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onRequestSuccess(new org.eclipse.jetty.client.api.Request.SuccessListener() + { + @Override + public void onSuccess(org.eclipse.jetty.client.api.Request request) + { + try + { + abortLatch.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + }) + .onResponseContent(new Response.ContentListener() + { + @Override + public void onContent(Response response, ByteBuffer content) + { + try + { + response.abort(new Exception()); + abortLatch.countDown(); + // Delay to let the request side to finish its processing. + Thread.sleep(1000); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + }) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + completes.incrementAndGet(); + Assert.assertTrue(result.isFailed()); + completeLatch.countDown(); + } + }); + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + + // Wait to be sure that the complete event is only notified once. + Thread.sleep(1000); + + Assert.assertEquals(1, completes.get()); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java index a15bf47ae5..55217609b5 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java @@ -76,9 +76,10 @@ public class HttpReceiverOverHTTPTest HttpRequest request = (HttpRequest)client.newRequest("http://localhost"); FutureResponseListener listener = new FutureResponseListener(request); HttpExchange exchange = new HttpExchange(destination, request, Collections.<Response.ResponseListener>singletonList(listener)); - connection.getHttpChannel().associate(exchange); - exchange.requestComplete(); - exchange.terminateRequest(null); + boolean associated = connection.getHttpChannel().associate(exchange); + Assert.assertTrue(associated); + exchange.requestComplete(null); + exchange.terminateRequest(); return exchange; } |