Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java')
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java496
1 files changed, 360 insertions, 136 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
index bf4af4b46e..2c16602ec4 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
@@ -18,16 +18,23 @@
package org.eclipse.jetty.server;
+import static javax.servlet.RequestDispatcher.ERROR_EXCEPTION;
+import static javax.servlet.RequestDispatcher.ERROR_MESSAGE;
+import static javax.servlet.RequestDispatcher.ERROR_STATUS_CODE;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncListener;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletContext;
import javax.servlet.ServletResponse;
+import javax.servlet.UnavailableException;
+import org.eclipse.jetty.http.BadMessageException;
+import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.log.Log;
@@ -45,12 +52,13 @@ public class HttpChannelState
private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L);
/**
- * The dispatched state of the HttpChannel, used to control the overall lifecycle
+ * The state of the HttpChannel,used to control the overall lifecycle.
*/
public enum State
{
IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet
+ THROWN, // Exception thrown while DISPATCHED
ASYNC_WAIT, // Suspended and waiting
ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT
ASYNC_IO, // Dispatched for async IO
@@ -67,7 +75,6 @@ public class HttpChannelState
DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
ERROR_DISPATCH, // handle a normal error
- ASYNC_ERROR, // handle an async error
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
COMPLETE, // Complete the response
@@ -76,14 +83,12 @@ public class HttpChannelState
}
/**
- * The state of the servlet async API. This can lead or follow the
- * channel dispatch state and also includes reasons such as expired,
- * dispatched or completed.
+ * The state of the servlet async API.
*/
public enum Async
{
STARTED, // AsyncContext.startAsync() has been called
- DISPATCH, //
+ DISPATCH, // AsyncContext.dispatch() has been called
COMPLETE, // AsyncContext.complete() has been called
EXPIRING, // AsyncContext timeout just happened
EXPIRED, // AsyncContext timeout has been processed
@@ -160,12 +165,18 @@ public class HttpChannelState
{
try(Locker.Lock lock= _locker.lock())
{
- return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
- _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
- _asyncWrite);
+ return toStringLocked();
}
}
+ public String toStringLocked()
+ {
+ return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
+ _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
+ _asyncWrite);
+ }
+
+
private String getStatusStringLocked()
{
return String.format("s=%s i=%b a=%s",_state,_initial,_async);
@@ -184,10 +195,11 @@ public class HttpChannelState
*/
protected Action handling()
{
- if(DEBUG)
- LOG.debug("{} handling {}",this,_state);
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("handling {}",toStringLocked());
+
switch(_state)
{
case IDLE:
@@ -228,17 +240,15 @@ public class HttpChannelState
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_DISPATCH;
- case EXPIRING:
- break;
case EXPIRED:
+ case ERRORED:
_state=State.DISPATCHED;
_async=null;
return Action.ERROR_DISPATCH;
case STARTED:
- return Action.WAIT;
+ case EXPIRING:
case ERRORING:
- _state=State.DISPATCHED;
- return Action.ASYNC_ERROR;
+ return Action.WAIT;
default:
throw new IllegalStateException(getStatusStringLocked());
@@ -264,45 +274,53 @@ public class HttpChannelState
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("startAsync {}",toStringLocked());
+
if (_state!=State.DISPATCHED || _async!=null)
throw new IllegalStateException(this.getStatusStringLocked());
_async=Async.STARTED;
_event=event;
lastAsyncListeners=_asyncListeners;
- _asyncListeners=null;
+ _asyncListeners=null;
}
if (lastAsyncListeners!=null)
{
- for (AsyncListener listener : lastAsyncListeners)
+ Runnable callback=new Runnable()
{
- try
+ @Override
+ public void run()
{
- listener.onStartAsync(event);
+ for (AsyncListener listener : lastAsyncListeners)
+ {
+ try
+ {
+ listener.onStartAsync(event);
+ }
+ catch(Exception e)
+ {
+ // TODO Async Dispatch Error
+ LOG.warn(e);
+ }
+ }
}
- catch(Exception e)
+ @Override
+ public String toString()
{
- // TODO Async Dispatch Error
- LOG.warn(e);
+ return "startAsync";
}
- }
+ };
+
+ runInContext(event,callback);
}
}
- protected void error(Throwable th)
- {
- try(Locker.Lock lock= _locker.lock())
- {
- if (_event!=null)
- _event.addThrowable(th);
- _async=Async.ERRORING;
- }
- }
/**
* Signal that the HttpConnection has finished handling the request.
- * For blocking connectors, this call may block if the request has
+ * For blocking connectors,this call may block if the request has
* been suspended (startAsync called).
* @return next actions
* be handled again (eg because of a resume that happened before unhandle was called)
@@ -313,17 +331,21 @@ public class HttpChannelState
AsyncContextEvent schedule_event=null;
boolean read_interested=false;
- if(DEBUG)
- LOG.debug("{} unhandle {}",this,_state);
-
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("unhandle {}",toStringLocked());
+
switch(_state)
{
case COMPLETING:
case COMPLETED:
return Action.TERMINATED;
+ case THROWN:
+ _state=State.DISPATCHED;
+ return Action.ERROR_DISPATCH;
+
case DISPATCHED:
case ASYNC_IO:
break;
@@ -349,12 +371,6 @@ public class HttpChannelState
action=Action.ASYNC_DISPATCH;
break;
- case EXPIRED:
- _state=State.DISPATCHED;
- _async=null;
- action = Action.ERROR_DISPATCH;
- break;
-
case STARTED:
if (_asyncReadUnready && _asyncReadPossible)
{
@@ -378,26 +394,27 @@ public class HttpChannelState
break;
case EXPIRING:
- schedule_event=_event;
+ // onTimeout callbacks still being called, so just WAIT
_state=State.ASYNC_WAIT;
action=Action.WAIT;
break;
- case ERRORING:
+ case EXPIRED:
+ // onTimeout handling is complete, but did not dispatch as
+ // we were handling. So do the error dispatch here
_state=State.DISPATCHED;
- action=Action.ASYNC_ERROR;
+ _async=null;
+ action=Action.ERROR_DISPATCH;
break;
-
+
case ERRORED:
_state=State.DISPATCHED;
- action=Action.ERROR_DISPATCH;
_async=null;
+ action=Action.ERROR_DISPATCH;
break;
default:
- _state=State.COMPLETING;
- action=Action.COMPLETE;
- break;
+ throw new IllegalStateException(this.getStatusStringLocked());
}
}
else
@@ -416,13 +433,22 @@ public class HttpChannelState
public void dispatch(ServletContext context, String path)
{
- boolean dispatch;
+ boolean dispatch=false;
+ AsyncContextEvent event;
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("dispatch {} -> {}",toStringLocked(),path);
+
+ boolean started=false;
+ event=_event;
switch(_async)
{
case STARTED:
+ started=true;
+ break;
case EXPIRING:
+ case ERRORING:
case ERRORED:
break;
default:
@@ -435,27 +461,26 @@ public class HttpChannelState
if (path!=null)
_event.setDispatchPath(path);
- switch(_state)
+ if (started)
{
- case DISPATCHED:
- case ASYNC_IO:
- dispatch=false;
- break;
- case ASYNC_WAIT:
- _state=State.ASYNC_WOKEN;
- dispatch=true;
- break;
- case ASYNC_WOKEN:
- dispatch=false;
- break;
- default:
- LOG.warn("async dispatched when complete {}",this);
- dispatch=false;
- break;
+ switch(_state)
+ {
+ case DISPATCHED:
+ case ASYNC_IO:
+ case ASYNC_WOKEN:
+ break;
+ case ASYNC_WAIT:
+ _state=State.ASYNC_WOKEN;
+ dispatch=true;
+ break;
+ default:
+ LOG.warn("async dispatched when complete {}",this);
+ break;
+ }
}
}
- cancelTimeout();
+ cancelTimeout(event);
if (dispatch)
scheduleDispatch();
}
@@ -466,58 +491,88 @@ public class HttpChannelState
AsyncContextEvent event;
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("onTimeout {}",toStringLocked());
+
if (_async!=Async.STARTED)
return;
_async=Async.EXPIRING;
event=_event;
listeners=_asyncListeners;
- }
- if (LOG.isDebugEnabled())
- LOG.debug("Async timeout {}",this);
+ }
+ final AtomicReference<Throwable> error=new AtomicReference<Throwable>();
if (listeners!=null)
{
- for (AsyncListener listener : listeners)
+ Runnable task=new Runnable()
{
- try
+ @Override
+ public void run()
{
- listener.onTimeout(event);
+ for (AsyncListener listener : listeners)
+ {
+ try
+ {
+ listener.onTimeout(event);
+ }
+ catch(Throwable x)
+ {
+ LOG.debug("Exception while invoking listener " + listener,x);
+ if (error.get()==null)
+ error.set(x);
+ else
+ error.get().addSuppressed(x);
+ }
+ }
}
- catch(Exception e)
+ @Override
+ public String toString()
{
- LOG.debug(e);
- event.addThrowable(e);
- _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
- break;
+ return "onTimeout";
}
- }
+ };
+
+ runInContext(event,task);
}
+ Throwable th=error.get();
boolean dispatch=false;
try(Locker.Lock lock= _locker.lock())
{
- if (_async==Async.EXPIRING)
+ switch(_async)
{
- // If the listeners did not call dispatch() or complete(),
- // then the container must generate an error.
- if (event.getThrowable()==null)
- {
- _async=Async.EXPIRED;
- _event.addThrowable(new TimeoutException("Async API violation"));
- }
- else
- {
- _async=Async.ERRORING;
- }
- if (_state==State.ASYNC_WAIT)
- {
- _state=State.ASYNC_WOKEN;
- dispatch=true;
- }
+ case EXPIRING:
+ _async=th==null ? Async.EXPIRED : Async.ERRORING;
+ break;
+
+ case COMPLETE:
+ case DISPATCH:
+ if (th!=null)
+ {
+ LOG.ignore(th);
+ th=null;
+ }
+ break;
+
+ default:
+ throw new IllegalStateException();
+ }
+
+ if (_state==State.ASYNC_WAIT)
+ {
+ _state=State.ASYNC_WOKEN;
+ dispatch=true;
}
}
+ if (th!=null)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Error after async timeout {}",this,th);
+ onError(th);
+ }
+
if (dispatch)
{
if (LOG.isDebugEnabled())
@@ -528,42 +583,53 @@ public class HttpChannelState
public void complete()
{
+
// just like resume, except don't set _dispatched=true;
boolean handle=false;
+ AsyncContextEvent event;
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("complete {}",toStringLocked());
+
+ boolean started=false;
+ event=_event;
+
switch(_async)
{
case STARTED:
+ started=true;
+ break;
case EXPIRING:
+ case ERRORING:
case ERRORED:
break;
+ case COMPLETE:
+ return;
default:
throw new IllegalStateException(this.getStatusStringLocked());
}
_async=Async.COMPLETE;
- if (_state==State.ASYNC_WAIT)
+
+ if (started && _state==State.ASYNC_WAIT)
{
handle=true;
_state=State.ASYNC_WOKEN;
}
}
- cancelTimeout();
+ cancelTimeout(event);
if (handle)
- {
- ContextHandler handler=getContextHandler();
- if (handler!=null)
- handler.handle(_channel.getRequest(),_channel);
- else
- _channel.handle();
- }
+ runInContext(event,_channel);
}
public void errorComplete()
{
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("error complete {}",toStringLocked());
+
_async=Async.COMPLETE;
_event.setDispatchContext(null);
_event.setDispatchPath(null);
@@ -571,40 +637,142 @@ public class HttpChannelState
cancelTimeout();
}
-
- protected void onError()
+
+ protected void onError(Throwable failure)
{
- final List<AsyncListener> aListeners;
+ final List<AsyncListener> listeners;
final AsyncContextEvent event;
-
+ final Request baseRequest = _channel.getRequest();
+
+ int code=HttpStatus.INTERNAL_SERVER_ERROR_500;
+ String reason=null;
+ if (failure instanceof BadMessageException)
+ {
+ BadMessageException bme = (BadMessageException)failure;
+ code = bme.getCode();
+ reason = bme.getReason();
+ }
+ else if (failure instanceof UnavailableException)
+ {
+ if (((UnavailableException)failure).isPermanent())
+ code = HttpStatus.NOT_FOUND_404;
+ else
+ code = HttpStatus.SERVICE_UNAVAILABLE_503;
+ }
+
try(Locker.Lock lock= _locker.lock())
{
- if (_state!=State.DISPATCHED/* || _async!=Async.ERRORING*/)
+ if(DEBUG)
+ LOG.debug("onError {} {}",toStringLocked(),failure);
+
+ // Set error on request.
+ if(_event!=null)
+ {
+ if (_event.getThrowable()!=null)
+ throw new IllegalStateException("Error already set",_event.getThrowable());
+ _event.addThrowable(failure);
+ _event.getSuppliedRequest().setAttribute(ERROR_STATUS_CODE,code);
+ _event.getSuppliedRequest().setAttribute(ERROR_EXCEPTION,failure);
+ _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE,failure==null?null:failure.getClass());
+
+ _event.getSuppliedRequest().setAttribute(ERROR_MESSAGE,reason!=null?reason:null);
+ }
+ else
+ {
+ Throwable error = (Throwable)baseRequest.getAttribute(ERROR_EXCEPTION);
+ if (error!=null)
+ throw new IllegalStateException("Error already set",error);
+ baseRequest.setAttribute(ERROR_STATUS_CODE,code);
+ baseRequest.setAttribute(ERROR_EXCEPTION,failure);
+ baseRequest.setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE,failure==null?null:failure.getClass());
+ baseRequest.setAttribute(ERROR_MESSAGE,reason!=null?reason:null);
+ }
+
+ // Are we blocking?
+ if (_async==null)
+ {
+ // Only called from within HttpChannel Handling, so much be dispatched, let's stay dispatched!
+ if (_state==State.DISPATCHED)
+ {
+ _state=State.THROWN;
+ return;
+ }
throw new IllegalStateException(this.getStatusStringLocked());
-
- aListeners=_asyncListeners;
+ }
+
+ // We are Async
+ _async=Async.ERRORING;
+ listeners=_asyncListeners;
event=_event;
- _async=Async.ERRORED;
}
- if (event!=null && aListeners!=null)
+ if(listeners!=null)
{
- event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
- event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
- for (AsyncListener listener : aListeners)
+ Runnable task=new Runnable()
{
- try
+ @Override
+ public void run()
{
- listener.onError(event);
+ for (AsyncListener listener : listeners)
+ {
+ try
+ {
+ listener.onError(event);
+ }
+ catch (Throwable x)
+ {
+ LOG.info("Exception while invoking listener " + listener,x);
+ }
+ }
}
- catch(Exception x)
+
+ @Override
+ public String toString()
+ {
+ return "onError";
+ }
+ };
+ runInContext(event,task);
+ }
+
+ boolean dispatch=false;
+ try(Locker.Lock lock= _locker.lock())
+ {
+ switch(_async)
+ {
+ case ERRORING:
+ {
+ // Still in this state ? The listeners did not invoke API methods
+ // and the container must provide a default error dispatch.
+ _async=Async.ERRORED;
+ break;
+ }
+ case DISPATCH:
+ case COMPLETE:
+ {
+ // The listeners called dispatch() or complete().
+ break;
+ }
+ default:
{
- LOG.info("Exception while invoking listener " + listener, x);
+ throw new IllegalStateException(toString());
}
}
+
+ if(_state==State.ASYNC_WAIT)
+ {
+ _state=State.ASYNC_WOKEN;
+ dispatch=true;
+ }
}
- }
+ if(dispatch)
+ {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Dispatch after error {}",this);
+ scheduleDispatch();
+ }
+ }
protected void onComplete()
{
@@ -613,6 +781,9 @@ public class HttpChannelState
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("onComplete {}",toStringLocked());
+
switch(_state)
{
case COMPLETING:
@@ -631,17 +802,31 @@ public class HttpChannelState
{
if (aListeners!=null)
{
- for (AsyncListener listener : aListeners)
+ Runnable callback = new Runnable()
{
- try
+ @Override
+ public void run()
{
- listener.onComplete(event);
- }
- catch(Exception e)
+ for (AsyncListener listener : aListeners)
+ {
+ try
+ {
+ listener.onComplete(event);
+ }
+ catch(Exception e)
+ {
+ LOG.warn("Exception while invoking listener " + listener,e);
+ }
+ }
+ }
+ @Override
+ public String toString()
{
- LOG.warn(e);
+ return "onComplete";
}
- }
+ };
+
+ runInContext(event,callback);
}
event.completed();
}
@@ -652,6 +837,9 @@ public class HttpChannelState
cancelTimeout();
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("recycle {}",toStringLocked());
+
switch(_state)
{
case DISPATCHED:
@@ -678,6 +866,9 @@ public class HttpChannelState
cancelTimeout();
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("upgrade {}",toStringLocked());
+
switch(_state)
{
case IDLE:
@@ -697,7 +888,6 @@ public class HttpChannelState
}
}
-
protected void scheduleDispatch()
{
_channel.execute(_channel);
@@ -717,10 +907,15 @@ public class HttpChannelState
{
event=_event;
}
+ cancelTimeout(event);
+ }
+
+ protected void cancelTimeout(AsyncContextEvent event)
+ {
if (event!=null)
event.cancelTimeoutTask();
}
-
+
public boolean isIdle()
{
try(Locker.Lock lock= _locker.lock())
@@ -779,7 +974,6 @@ public class HttpChannelState
}
}
-
public boolean isAsync()
{
try(Locker.Lock lock= _locker.lock())
@@ -805,7 +999,11 @@ public class HttpChannelState
{
event=_event;
}
+ return getContextHandler(event);
+ }
+ ContextHandler getContextHandler(AsyncContextEvent event)
+ {
if (event!=null)
{
Context context=((Context)event.getServletContext());
@@ -822,11 +1020,25 @@ public class HttpChannelState
{
event=_event;
}
+ return getServletResponse(event);
+ }
+
+ public ServletResponse getServletResponse(AsyncContextEvent event)
+ {
if (event!=null && event.getSuppliedResponse()!=null)
return event.getSuppliedResponse();
return _channel.getResponse();
}
-
+
+ void runInContext(AsyncContextEvent event,Runnable runnable)
+ {
+ ContextHandler contextHandler = getContextHandler(event);
+ if (contextHandler==null)
+ runnable.run();
+ else
+ contextHandler.handle(_channel.getRequest(),runnable);
+ }
+
public Object getAttribute(String name)
{
return _channel.getRequest().getAttribute(name);
@@ -855,6 +1067,9 @@ public class HttpChannelState
boolean interested=false;
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("onReadUnready {}",toStringLocked());
+
// We were already unready, this is not a state change, so do nothing
if (!_asyncReadUnready)
{
@@ -881,6 +1096,9 @@ public class HttpChannelState
boolean woken=false;
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("onReadPossible {}",toStringLocked());
+
_asyncReadPossible=true;
if (_state==State.ASYNC_WAIT && _asyncReadUnready)
{
@@ -903,6 +1121,9 @@ public class HttpChannelState
boolean woken=false;
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("onReadReady {}",toStringLocked());
+
_asyncReadUnready=true;
_asyncReadPossible=true;
if (_state==State.ASYNC_WAIT)
@@ -928,6 +1149,9 @@ public class HttpChannelState
try(Locker.Lock lock= _locker.lock())
{
+ if(DEBUG)
+ LOG.debug("onWritePossible {}",toStringLocked());
+
_asyncWrite=true;
if (_state==State.ASYNC_WAIT)
{

Back to the top