diff options
Diffstat (limited to 'jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java')
-rw-r--r-- | jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java | 496 |
1 files changed, 360 insertions, 136 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index bf4af4b46e..2c16602ec4 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -18,16 +18,23 @@ package org.eclipse.jetty.server; +import static javax.servlet.RequestDispatcher.ERROR_EXCEPTION; +import static javax.servlet.RequestDispatcher.ERROR_MESSAGE; +import static javax.servlet.RequestDispatcher.ERROR_STATUS_CODE; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncListener; import javax.servlet.RequestDispatcher; import javax.servlet.ServletContext; import javax.servlet.ServletResponse; +import javax.servlet.UnavailableException; +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; import org.eclipse.jetty.util.log.Log; @@ -45,12 +52,13 @@ public class HttpChannelState private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L); /** - * The dispatched state of the HttpChannel, used to control the overall lifecycle + * The state of the HttpChannel,used to control the overall lifecycle. */ public enum State { IDLE, // Idle request DISPATCHED, // Request dispatched to filter/servlet + THROWN, // Exception thrown while DISPATCHED ASYNC_WAIT, // Suspended and waiting ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT ASYNC_IO, // Dispatched for async IO @@ -67,7 +75,6 @@ public class HttpChannelState DISPATCH, // handle a normal request dispatch ASYNC_DISPATCH, // handle an async request dispatch ERROR_DISPATCH, // handle a normal error - ASYNC_ERROR, // handle an async error WRITE_CALLBACK, // handle an IO write callback READ_CALLBACK, // handle an IO read callback COMPLETE, // Complete the response @@ -76,14 +83,12 @@ public class HttpChannelState } /** - * The state of the servlet async API. This can lead or follow the - * channel dispatch state and also includes reasons such as expired, - * dispatched or completed. + * The state of the servlet async API. */ public enum Async { STARTED, // AsyncContext.startAsync() has been called - DISPATCH, // + DISPATCH, // AsyncContext.dispatch() has been called COMPLETE, // AsyncContext.complete() has been called EXPIRING, // AsyncContext timeout just happened EXPIRED, // AsyncContext timeout has been processed @@ -160,12 +165,18 @@ public class HttpChannelState { try(Locker.Lock lock= _locker.lock()) { - return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial, - _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"), - _asyncWrite); + return toStringLocked(); } } + public String toStringLocked() + { + return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial, + _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"), + _asyncWrite); + } + + private String getStatusStringLocked() { return String.format("s=%s i=%b a=%s",_state,_initial,_async); @@ -184,10 +195,11 @@ public class HttpChannelState */ protected Action handling() { - if(DEBUG) - LOG.debug("{} handling {}",this,_state); try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("handling {}",toStringLocked()); + switch(_state) { case IDLE: @@ -228,17 +240,15 @@ public class HttpChannelState _state=State.DISPATCHED; _async=null; return Action.ASYNC_DISPATCH; - case EXPIRING: - break; case EXPIRED: + case ERRORED: _state=State.DISPATCHED; _async=null; return Action.ERROR_DISPATCH; case STARTED: - return Action.WAIT; + case EXPIRING: case ERRORING: - _state=State.DISPATCHED; - return Action.ASYNC_ERROR; + return Action.WAIT; default: throw new IllegalStateException(getStatusStringLocked()); @@ -264,45 +274,53 @@ public class HttpChannelState try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("startAsync {}",toStringLocked()); + if (_state!=State.DISPATCHED || _async!=null) throw new IllegalStateException(this.getStatusStringLocked()); _async=Async.STARTED; _event=event; lastAsyncListeners=_asyncListeners; - _asyncListeners=null; + _asyncListeners=null; } if (lastAsyncListeners!=null) { - for (AsyncListener listener : lastAsyncListeners) + Runnable callback=new Runnable() { - try + @Override + public void run() { - listener.onStartAsync(event); + for (AsyncListener listener : lastAsyncListeners) + { + try + { + listener.onStartAsync(event); + } + catch(Exception e) + { + // TODO Async Dispatch Error + LOG.warn(e); + } + } } - catch(Exception e) + @Override + public String toString() { - // TODO Async Dispatch Error - LOG.warn(e); + return "startAsync"; } - } + }; + + runInContext(event,callback); } } - protected void error(Throwable th) - { - try(Locker.Lock lock= _locker.lock()) - { - if (_event!=null) - _event.addThrowable(th); - _async=Async.ERRORING; - } - } /** * Signal that the HttpConnection has finished handling the request. - * For blocking connectors, this call may block if the request has + * For blocking connectors,this call may block if the request has * been suspended (startAsync called). * @return next actions * be handled again (eg because of a resume that happened before unhandle was called) @@ -313,17 +331,21 @@ public class HttpChannelState AsyncContextEvent schedule_event=null; boolean read_interested=false; - if(DEBUG) - LOG.debug("{} unhandle {}",this,_state); - try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("unhandle {}",toStringLocked()); + switch(_state) { case COMPLETING: case COMPLETED: return Action.TERMINATED; + case THROWN: + _state=State.DISPATCHED; + return Action.ERROR_DISPATCH; + case DISPATCHED: case ASYNC_IO: break; @@ -349,12 +371,6 @@ public class HttpChannelState action=Action.ASYNC_DISPATCH; break; - case EXPIRED: - _state=State.DISPATCHED; - _async=null; - action = Action.ERROR_DISPATCH; - break; - case STARTED: if (_asyncReadUnready && _asyncReadPossible) { @@ -378,26 +394,27 @@ public class HttpChannelState break; case EXPIRING: - schedule_event=_event; + // onTimeout callbacks still being called, so just WAIT _state=State.ASYNC_WAIT; action=Action.WAIT; break; - case ERRORING: + case EXPIRED: + // onTimeout handling is complete, but did not dispatch as + // we were handling. So do the error dispatch here _state=State.DISPATCHED; - action=Action.ASYNC_ERROR; + _async=null; + action=Action.ERROR_DISPATCH; break; - + case ERRORED: _state=State.DISPATCHED; - action=Action.ERROR_DISPATCH; _async=null; + action=Action.ERROR_DISPATCH; break; default: - _state=State.COMPLETING; - action=Action.COMPLETE; - break; + throw new IllegalStateException(this.getStatusStringLocked()); } } else @@ -416,13 +433,22 @@ public class HttpChannelState public void dispatch(ServletContext context, String path) { - boolean dispatch; + boolean dispatch=false; + AsyncContextEvent event; try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("dispatch {} -> {}",toStringLocked(),path); + + boolean started=false; + event=_event; switch(_async) { case STARTED: + started=true; + break; case EXPIRING: + case ERRORING: case ERRORED: break; default: @@ -435,27 +461,26 @@ public class HttpChannelState if (path!=null) _event.setDispatchPath(path); - switch(_state) + if (started) { - case DISPATCHED: - case ASYNC_IO: - dispatch=false; - break; - case ASYNC_WAIT: - _state=State.ASYNC_WOKEN; - dispatch=true; - break; - case ASYNC_WOKEN: - dispatch=false; - break; - default: - LOG.warn("async dispatched when complete {}",this); - dispatch=false; - break; + switch(_state) + { + case DISPATCHED: + case ASYNC_IO: + case ASYNC_WOKEN: + break; + case ASYNC_WAIT: + _state=State.ASYNC_WOKEN; + dispatch=true; + break; + default: + LOG.warn("async dispatched when complete {}",this); + break; + } } } - cancelTimeout(); + cancelTimeout(event); if (dispatch) scheduleDispatch(); } @@ -466,58 +491,88 @@ public class HttpChannelState AsyncContextEvent event; try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("onTimeout {}",toStringLocked()); + if (_async!=Async.STARTED) return; _async=Async.EXPIRING; event=_event; listeners=_asyncListeners; - } - if (LOG.isDebugEnabled()) - LOG.debug("Async timeout {}",this); + } + final AtomicReference<Throwable> error=new AtomicReference<Throwable>(); if (listeners!=null) { - for (AsyncListener listener : listeners) + Runnable task=new Runnable() { - try + @Override + public void run() { - listener.onTimeout(event); + for (AsyncListener listener : listeners) + { + try + { + listener.onTimeout(event); + } + catch(Throwable x) + { + LOG.debug("Exception while invoking listener " + listener,x); + if (error.get()==null) + error.set(x); + else + error.get().addSuppressed(x); + } + } } - catch(Exception e) + @Override + public String toString() { - LOG.debug(e); - event.addThrowable(e); - _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); - break; + return "onTimeout"; } - } + }; + + runInContext(event,task); } + Throwable th=error.get(); boolean dispatch=false; try(Locker.Lock lock= _locker.lock()) { - if (_async==Async.EXPIRING) + switch(_async) { - // If the listeners did not call dispatch() or complete(), - // then the container must generate an error. - if (event.getThrowable()==null) - { - _async=Async.EXPIRED; - _event.addThrowable(new TimeoutException("Async API violation")); - } - else - { - _async=Async.ERRORING; - } - if (_state==State.ASYNC_WAIT) - { - _state=State.ASYNC_WOKEN; - dispatch=true; - } + case EXPIRING: + _async=th==null ? Async.EXPIRED : Async.ERRORING; + break; + + case COMPLETE: + case DISPATCH: + if (th!=null) + { + LOG.ignore(th); + th=null; + } + break; + + default: + throw new IllegalStateException(); + } + + if (_state==State.ASYNC_WAIT) + { + _state=State.ASYNC_WOKEN; + dispatch=true; } } + if (th!=null) + { + if (LOG.isDebugEnabled()) + LOG.debug("Error after async timeout {}",this,th); + onError(th); + } + if (dispatch) { if (LOG.isDebugEnabled()) @@ -528,42 +583,53 @@ public class HttpChannelState public void complete() { + // just like resume, except don't set _dispatched=true; boolean handle=false; + AsyncContextEvent event; try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("complete {}",toStringLocked()); + + boolean started=false; + event=_event; + switch(_async) { case STARTED: + started=true; + break; case EXPIRING: + case ERRORING: case ERRORED: break; + case COMPLETE: + return; default: throw new IllegalStateException(this.getStatusStringLocked()); } _async=Async.COMPLETE; - if (_state==State.ASYNC_WAIT) + + if (started && _state==State.ASYNC_WAIT) { handle=true; _state=State.ASYNC_WOKEN; } } - cancelTimeout(); + cancelTimeout(event); if (handle) - { - ContextHandler handler=getContextHandler(); - if (handler!=null) - handler.handle(_channel.getRequest(),_channel); - else - _channel.handle(); - } + runInContext(event,_channel); } public void errorComplete() { try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("error complete {}",toStringLocked()); + _async=Async.COMPLETE; _event.setDispatchContext(null); _event.setDispatchPath(null); @@ -571,40 +637,142 @@ public class HttpChannelState cancelTimeout(); } - - protected void onError() + + protected void onError(Throwable failure) { - final List<AsyncListener> aListeners; + final List<AsyncListener> listeners; final AsyncContextEvent event; - + final Request baseRequest = _channel.getRequest(); + + int code=HttpStatus.INTERNAL_SERVER_ERROR_500; + String reason=null; + if (failure instanceof BadMessageException) + { + BadMessageException bme = (BadMessageException)failure; + code = bme.getCode(); + reason = bme.getReason(); + } + else if (failure instanceof UnavailableException) + { + if (((UnavailableException)failure).isPermanent()) + code = HttpStatus.NOT_FOUND_404; + else + code = HttpStatus.SERVICE_UNAVAILABLE_503; + } + try(Locker.Lock lock= _locker.lock()) { - if (_state!=State.DISPATCHED/* || _async!=Async.ERRORING*/) + if(DEBUG) + LOG.debug("onError {} {}",toStringLocked(),failure); + + // Set error on request. + if(_event!=null) + { + if (_event.getThrowable()!=null) + throw new IllegalStateException("Error already set",_event.getThrowable()); + _event.addThrowable(failure); + _event.getSuppliedRequest().setAttribute(ERROR_STATUS_CODE,code); + _event.getSuppliedRequest().setAttribute(ERROR_EXCEPTION,failure); + _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE,failure==null?null:failure.getClass()); + + _event.getSuppliedRequest().setAttribute(ERROR_MESSAGE,reason!=null?reason:null); + } + else + { + Throwable error = (Throwable)baseRequest.getAttribute(ERROR_EXCEPTION); + if (error!=null) + throw new IllegalStateException("Error already set",error); + baseRequest.setAttribute(ERROR_STATUS_CODE,code); + baseRequest.setAttribute(ERROR_EXCEPTION,failure); + baseRequest.setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE,failure==null?null:failure.getClass()); + baseRequest.setAttribute(ERROR_MESSAGE,reason!=null?reason:null); + } + + // Are we blocking? + if (_async==null) + { + // Only called from within HttpChannel Handling, so much be dispatched, let's stay dispatched! + if (_state==State.DISPATCHED) + { + _state=State.THROWN; + return; + } throw new IllegalStateException(this.getStatusStringLocked()); - - aListeners=_asyncListeners; + } + + // We are Async + _async=Async.ERRORING; + listeners=_asyncListeners; event=_event; - _async=Async.ERRORED; } - if (event!=null && aListeners!=null) + if(listeners!=null) { - event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); - event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage()); - for (AsyncListener listener : aListeners) + Runnable task=new Runnable() { - try + @Override + public void run() { - listener.onError(event); + for (AsyncListener listener : listeners) + { + try + { + listener.onError(event); + } + catch (Throwable x) + { + LOG.info("Exception while invoking listener " + listener,x); + } + } } - catch(Exception x) + + @Override + public String toString() + { + return "onError"; + } + }; + runInContext(event,task); + } + + boolean dispatch=false; + try(Locker.Lock lock= _locker.lock()) + { + switch(_async) + { + case ERRORING: + { + // Still in this state ? The listeners did not invoke API methods + // and the container must provide a default error dispatch. + _async=Async.ERRORED; + break; + } + case DISPATCH: + case COMPLETE: + { + // The listeners called dispatch() or complete(). + break; + } + default: { - LOG.info("Exception while invoking listener " + listener, x); + throw new IllegalStateException(toString()); } } + + if(_state==State.ASYNC_WAIT) + { + _state=State.ASYNC_WOKEN; + dispatch=true; + } } - } + if(dispatch) + { + if(LOG.isDebugEnabled()) + LOG.debug("Dispatch after error {}",this); + scheduleDispatch(); + } + } protected void onComplete() { @@ -613,6 +781,9 @@ public class HttpChannelState try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("onComplete {}",toStringLocked()); + switch(_state) { case COMPLETING: @@ -631,17 +802,31 @@ public class HttpChannelState { if (aListeners!=null) { - for (AsyncListener listener : aListeners) + Runnable callback = new Runnable() { - try + @Override + public void run() { - listener.onComplete(event); - } - catch(Exception e) + for (AsyncListener listener : aListeners) + { + try + { + listener.onComplete(event); + } + catch(Exception e) + { + LOG.warn("Exception while invoking listener " + listener,e); + } + } + } + @Override + public String toString() { - LOG.warn(e); + return "onComplete"; } - } + }; + + runInContext(event,callback); } event.completed(); } @@ -652,6 +837,9 @@ public class HttpChannelState cancelTimeout(); try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("recycle {}",toStringLocked()); + switch(_state) { case DISPATCHED: @@ -678,6 +866,9 @@ public class HttpChannelState cancelTimeout(); try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("upgrade {}",toStringLocked()); + switch(_state) { case IDLE: @@ -697,7 +888,6 @@ public class HttpChannelState } } - protected void scheduleDispatch() { _channel.execute(_channel); @@ -717,10 +907,15 @@ public class HttpChannelState { event=_event; } + cancelTimeout(event); + } + + protected void cancelTimeout(AsyncContextEvent event) + { if (event!=null) event.cancelTimeoutTask(); } - + public boolean isIdle() { try(Locker.Lock lock= _locker.lock()) @@ -779,7 +974,6 @@ public class HttpChannelState } } - public boolean isAsync() { try(Locker.Lock lock= _locker.lock()) @@ -805,7 +999,11 @@ public class HttpChannelState { event=_event; } + return getContextHandler(event); + } + ContextHandler getContextHandler(AsyncContextEvent event) + { if (event!=null) { Context context=((Context)event.getServletContext()); @@ -822,11 +1020,25 @@ public class HttpChannelState { event=_event; } + return getServletResponse(event); + } + + public ServletResponse getServletResponse(AsyncContextEvent event) + { if (event!=null && event.getSuppliedResponse()!=null) return event.getSuppliedResponse(); return _channel.getResponse(); } - + + void runInContext(AsyncContextEvent event,Runnable runnable) + { + ContextHandler contextHandler = getContextHandler(event); + if (contextHandler==null) + runnable.run(); + else + contextHandler.handle(_channel.getRequest(),runnable); + } + public Object getAttribute(String name) { return _channel.getRequest().getAttribute(name); @@ -855,6 +1067,9 @@ public class HttpChannelState boolean interested=false; try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("onReadUnready {}",toStringLocked()); + // We were already unready, this is not a state change, so do nothing if (!_asyncReadUnready) { @@ -881,6 +1096,9 @@ public class HttpChannelState boolean woken=false; try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("onReadPossible {}",toStringLocked()); + _asyncReadPossible=true; if (_state==State.ASYNC_WAIT && _asyncReadUnready) { @@ -903,6 +1121,9 @@ public class HttpChannelState boolean woken=false; try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("onReadReady {}",toStringLocked()); + _asyncReadUnready=true; _asyncReadPossible=true; if (_state==State.ASYNC_WAIT) @@ -928,6 +1149,9 @@ public class HttpChannelState try(Locker.Lock lock= _locker.lock()) { + if(DEBUG) + LOG.debug("onWritePossible {}",toStringLocked()); + _asyncWrite=true; if (_state==State.ASYNC_WAIT) { |