Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java')
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java693
1 files changed, 485 insertions, 208 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
index 078a710676..8a2b1cb761 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
@@ -21,6 +21,7 @@ package org.eclipse.jetty.server;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.servlet.AsyncListener;
import javax.servlet.RequestDispatcher;
@@ -31,6 +32,7 @@ import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
+import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
/**
@@ -42,17 +44,18 @@ public class HttpChannelState
private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L);
- /** The dispatched state of the HttpChannel, used to control the overall livecycle
+ /**
+ * The dispatched state of the HttpChannel, used to control the overall lifecycle
*/
public enum State
{
IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet
- ASYNC_WAIT, // Suspended and parked
- ASYNC_WOKEN, // A thread has been dispatch to handle from ASYNCWAIT
- ASYNC_IO, // Has been dispatched for async IO
- COMPLETING, // Request is completable
- COMPLETED, // Request is complete
+ ASYNC_WAIT, // Suspended and waiting
+ ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT
+ ASYNC_IO, // Dispatched for async IO
+ COMPLETING, // Response is completable
+ COMPLETED, // Response is completed
UPGRADED // Request upgraded the connection
}
@@ -61,42 +64,48 @@ public class HttpChannelState
*/
public enum Action
{
- REQUEST_DISPATCH, // handle a normal request dispatch
+ DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
- ASYNC_EXPIRED, // handle an async timeout
+ ERROR_DISPATCH, // handle a normal error
+ ASYNC_ERROR, // handle an async error
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
- WAIT, // Wait for further events
- COMPLETE // Complete the channel
+ COMPLETE, // Complete the response
+ TERMINATED, // No further actions
+ WAIT, // Wait for further events
}
-
+
/**
- * The state of the servlet async API. This can lead or follow the
+ * The state of the servlet async API. This can lead or follow the
* channel dispatch state and also includes reasons such as expired,
* dispatched or completed.
*/
public enum Async
{
- STARTED,
- DISPATCH,
- COMPLETE,
- EXPIRING,
- EXPIRED
+ STARTED, // AsyncContext.startAsync() has been called
+ DISPATCH, //
+ COMPLETE, // AsyncContext.complete() has been called
+ EXPIRING, // AsyncContext timeout just happened
+ EXPIRED, // AsyncContext timeout has been processed
+ ERRORING, // An error just happened
+ ERRORED // The error has been processed
}
private final boolean DEBUG=LOG.isDebugEnabled();
- private final HttpChannel<?> _channel;
+ private final Locker _locker=new Locker();
+ private final HttpChannel _channel;
private List<AsyncListener> _asyncListeners;
private State _state;
private Async _async;
private boolean _initial;
- private boolean _asyncRead;
- private boolean _asyncWrite;
+ private boolean _asyncReadPossible;
+ private boolean _asyncReadUnready;
+ private boolean _asyncWrite; // TODO refactor same as read
private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
- protected HttpChannelState(HttpChannel<?> channel)
+ protected HttpChannelState(HttpChannel channel)
{
_channel=channel;
_state=State.IDLE;
@@ -106,7 +115,7 @@ public class HttpChannelState
public State getState()
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
return _state;
}
@@ -114,7 +123,7 @@ public class HttpChannelState
public void addListener(AsyncListener listener)
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
if (_asyncListeners==null)
_asyncListeners=new ArrayList<>();
@@ -124,7 +133,7 @@ public class HttpChannelState
public void setTimeout(long ms)
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
_timeoutMs=ms;
}
@@ -132,7 +141,7 @@ public class HttpChannelState
public long getTimeout()
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
return _timeoutMs;
}
@@ -140,7 +149,7 @@ public class HttpChannelState
public AsyncContextEvent getAsyncContextEvent()
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
return _event;
}
@@ -149,17 +158,24 @@ public class HttpChannelState
@Override
public String toString()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
- return String.format("%s@%x{s=%s i=%b a=%s}",getClass().getSimpleName(),hashCode(),_state,_initial,_async);
+ return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
+ _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
+ _asyncWrite);
}
}
+ private String getStatusStringLocked()
+ {
+ return String.format("s=%s i=%b a=%s",_state,_initial,_async);
+ }
+
public String getStatusString()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
- return String.format("s=%s i=%b a=%s",_state,_initial,_async);
+ return getStatusStringLocked();
}
}
@@ -168,37 +184,38 @@ public class HttpChannelState
*/
protected Action handling()
{
- synchronized (this)
+ if(DEBUG)
+ LOG.debug("{} handling {}",this,_state);
+ try(Locker.Lock lock= _locker.lock())
{
- if(DEBUG)
- LOG.debug("{} handling {}",this,_state);
switch(_state)
{
case IDLE:
_initial=true;
_state=State.DISPATCHED;
- return Action.REQUEST_DISPATCH;
+ return Action.DISPATCH;
case COMPLETING:
return Action.COMPLETE;
case COMPLETED:
- return Action.WAIT;
+ return Action.TERMINATED;
case ASYNC_WOKEN:
- if (_asyncRead)
+ if (_asyncReadPossible)
{
_state=State.ASYNC_IO;
- _asyncRead=false;
+ _asyncReadUnready=false;
return Action.READ_CALLBACK;
}
+
if (_asyncWrite)
{
_state=State.ASYNC_IO;
_asyncWrite=false;
return Action.WRITE_CALLBACK;
}
-
+
if (_async!=null)
{
Async async=_async;
@@ -216,20 +233,27 @@ public class HttpChannelState
case EXPIRED:
_state=State.DISPATCHED;
_async=null;
- return Action.ASYNC_EXPIRED;
+ return Action.ERROR_DISPATCH;
case STARTED:
- // TODO
- if (DEBUG)
- LOG.debug("TODO Fix this double dispatch",new IllegalStateException(this
- .getStatusString()));
return Action.WAIT;
+ case ERRORING:
+ _state=State.DISPATCHED;
+ return Action.ASYNC_ERROR;
+
+ default:
+ throw new IllegalStateException(getStatusStringLocked());
}
}
-
+
return Action.WAIT;
+ case ASYNC_IO:
+ case ASYNC_WAIT:
+ case DISPATCHED:
+ case UPGRADED:
default:
- throw new IllegalStateException(this.getStatusString());
+ throw new IllegalStateException(getStatusStringLocked());
+
}
}
}
@@ -237,40 +261,56 @@ public class HttpChannelState
public void startAsync(AsyncContextEvent event)
{
final List<AsyncListener> lastAsyncListeners;
-
- synchronized (this)
+
+ try(Locker.Lock lock= _locker.lock())
{
if (_state!=State.DISPATCHED || _async!=null)
- throw new IllegalStateException(this.getStatusString());
-
+ throw new IllegalStateException(this.getStatusStringLocked());
+
_async=Async.STARTED;
_event=event;
lastAsyncListeners=_asyncListeners;
- _asyncListeners=null;
+ _asyncListeners=null;
}
if (lastAsyncListeners!=null)
{
- for (AsyncListener listener : lastAsyncListeners)
+ Runnable callback=new Runnable()
{
- try
+ @Override
+ public void run()
{
- listener.onStartAsync(event);
+ for (AsyncListener listener : lastAsyncListeners)
+ {
+ try
+ {
+ listener.onStartAsync(event);
+ }
+ catch(Exception e)
+ {
+ // TODO Async Dispatch Error
+ LOG.warn(e);
+ }
+ }
}
- catch(Exception e)
+ @Override
+ public String toString()
{
- LOG.warn(e);
+ return "startAsync";
}
- }
+ };
+
+ runInContext(event,callback);
}
}
protected void error(Throwable th)
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
if (_event!=null)
- _event.setThrowable(th);
+ _event.addThrowable(th);
+ _async=Async.ERRORING;
}
}
@@ -283,32 +323,27 @@ public class HttpChannelState
*/
protected Action unhandle()
{
- synchronized (this)
+ Action action;
+ AsyncContextEvent schedule_event=null;
+ boolean read_interested=false;
+
+ if(DEBUG)
+ LOG.debug("{} unhandle {}",this,_state);
+
+ try(Locker.Lock lock= _locker.lock())
{
- if(DEBUG)
- LOG.debug("{} unhandle {}",this,_state);
-
switch(_state)
{
+ case COMPLETING:
+ case COMPLETED:
+ return Action.TERMINATED;
+
case DISPATCHED:
case ASYNC_IO:
break;
- default:
- throw new IllegalStateException(this.getStatusString());
- }
- if (_asyncRead)
- {
- _state=State.ASYNC_IO;
- _asyncRead=false;
- return Action.READ_CALLBACK;
- }
-
- if (_asyncWrite)
- {
- _asyncWrite=false;
- _state=State.ASYNC_IO;
- return Action.WRITE_CALLBACK;
+ default:
+ throw new IllegalStateException(this.getStatusStringLocked());
}
if (_async!=null)
@@ -319,146 +354,257 @@ public class HttpChannelState
case COMPLETE:
_state=State.COMPLETING;
_async=null;
- return Action.COMPLETE;
+ action=Action.COMPLETE;
+ break;
+
case DISPATCH:
_state=State.DISPATCHED;
_async=null;
- return Action.ASYNC_DISPATCH;
+ action=Action.ASYNC_DISPATCH;
+ break;
+
case EXPIRED:
_state=State.DISPATCHED;
_async=null;
- return Action.ASYNC_EXPIRED;
- case EXPIRING:
+ action = Action.ERROR_DISPATCH;
+ break;
+
case STARTED:
- scheduleTimeout();
+ if (_asyncReadUnready && _asyncReadPossible)
+ {
+ _state=State.ASYNC_IO;
+ _asyncReadUnready=false;
+ action = Action.READ_CALLBACK;
+ }
+ else if (_asyncWrite) // TODO refactor same as read
+ {
+ _asyncWrite=false;
+ _state=State.ASYNC_IO;
+ action=Action.WRITE_CALLBACK;
+ }
+ else
+ {
+ schedule_event=_event;
+ read_interested=_asyncReadUnready;
+ _state=State.ASYNC_WAIT;
+ action=Action.WAIT;
+ }
+ break;
+
+ case EXPIRING:
+ schedule_event=_event;
_state=State.ASYNC_WAIT;
- return Action.WAIT;
+ action=Action.WAIT;
+ break;
+
+ case ERRORING:
+ _state=State.DISPATCHED;
+ action=Action.ASYNC_ERROR;
+ break;
+
+ case ERRORED:
+ _state=State.DISPATCHED;
+ action=Action.ERROR_DISPATCH;
+ _async=null;
+ break;
+
+ default:
+ _state=State.COMPLETING;
+ action=Action.COMPLETE;
+ break;
}
}
-
- _state=State.COMPLETING;
- return Action.COMPLETE;
+ else
+ {
+ _state=State.COMPLETING;
+ action=Action.COMPLETE;
+ }
}
+
+ if (schedule_event!=null)
+ scheduleTimeout(schedule_event);
+ if (read_interested)
+ _channel.asyncReadFillInterested();
+ return action;
}
public void dispatch(ServletContext context, String path)
{
- boolean dispatch;
- synchronized (this)
+ boolean dispatch=false;
+ AsyncContextEvent event=null;
+ try(Locker.Lock lock= _locker.lock())
{
- if (_async!=Async.STARTED && _async!=Async.EXPIRING)
- throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString());
+ boolean started=false;
+ event=_event;
+ switch(_async)
+ {
+ case STARTED:
+ started=true;
+ break;
+ case EXPIRING:
+ case ERRORED:
+ break;
+ default:
+ throw new IllegalStateException(this.getStatusStringLocked());
+ }
_async=Async.DISPATCH;
-
+
if (context!=null)
_event.setDispatchContext(context);
if (path!=null)
_event.setDispatchPath(path);
-
- switch(_state)
+
+ if (started)
{
- case DISPATCHED:
- case ASYNC_IO:
- dispatch=false;
- break;
- case ASYNC_WAIT:
- _state=State.ASYNC_WOKEN;
- dispatch=true;
- break;
- case ASYNC_WOKEN:
- dispatch=false;
- break;
- default:
- LOG.warn("async dispatched when complete {}",this);
- dispatch=false;
- break;
+ switch(_state)
+ {
+ case DISPATCHED:
+ case ASYNC_IO:
+ case ASYNC_WOKEN:
+ break;
+ case ASYNC_WAIT:
+ _state=State.ASYNC_WOKEN;
+ dispatch=true;
+ break;
+ default:
+ LOG.warn("async dispatched when complete {}",this);
+ break;
+ }
}
}
- cancelTimeout();
+ cancelTimeout(event);
if (dispatch)
scheduleDispatch();
}
- protected void expired()
+ protected void onTimeout()
{
- final List<AsyncListener> aListeners;
+ final List<AsyncListener> listeners;
AsyncContextEvent event;
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
if (_async!=Async.STARTED)
return;
_async=Async.EXPIRING;
event=_event;
- aListeners=_asyncListeners;
+ listeners=_asyncListeners;
+
}
- if (aListeners!=null)
+ if (LOG.isDebugEnabled())
+ LOG.debug("Async timeout {}",this);
+
+ if (listeners!=null)
{
- for (AsyncListener listener : aListeners)
+ Runnable callback=new Runnable()
{
- try
+ @Override
+ public void run()
{
- listener.onTimeout(event);
+ for (AsyncListener listener : listeners)
+ {
+ try
+ {
+ listener.onTimeout(event);
+ }
+ catch(Exception e)
+ {
+ LOG.debug(e);
+ event.addThrowable(e);
+ _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
+ break;
+ }
+ }
}
- catch(Exception e)
+ @Override
+ public String toString()
{
- LOG.debug(e);
- event.setThrowable(e);
- _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,e);
- break;
+ return "onTimeout";
}
- }
+ };
+
+ runInContext(event,callback);
}
-
+
boolean dispatch=false;
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
- if (_async==Async.EXPIRING)
+ switch(_async)
{
- _async=Async.EXPIRED;
- if (_state==State.ASYNC_WAIT)
- {
- _state=State.ASYNC_WOKEN;
- dispatch=true;
- }
+ case EXPIRING:
+ if (event.getThrowable()==null)
+ {
+ _async=Async.EXPIRED;
+ _event.addThrowable(new TimeoutException("Async API violation"));
+ }
+ else
+ {
+ _async=Async.ERRORING;
+ }
+ break;
+
+ case COMPLETE:
+ case DISPATCH:
+ break;
+
+ default:
+ throw new IllegalStateException();
+ }
+
+ if (_state==State.ASYNC_WAIT)
+ {
+ _state=State.ASYNC_WOKEN;
+ dispatch=true;
}
}
if (dispatch)
+ {
+ if (LOG.isDebugEnabled())
+ LOG.debug("Dispatch after async timeout {}",this);
scheduleDispatch();
+ }
}
public void complete()
{
// just like resume, except don't set _dispatched=true;
boolean handle=false;
- synchronized (this)
+ AsyncContextEvent event=null;
+ try(Locker.Lock lock= _locker.lock())
{
- if (_async!=Async.STARTED && _async!=Async.EXPIRING)
- throw new IllegalStateException(this.getStatusString());
+ boolean started=false;
+ event=_event;
+
+ switch(_async)
+ {
+ case STARTED:
+ started=true;
+ break;
+ case EXPIRING:
+ case ERRORED:
+ break;
+ default:
+ throw new IllegalStateException(this.getStatusStringLocked());
+ }
_async=Async.COMPLETE;
- if (_state==State.ASYNC_WAIT)
+
+ if (started && _state==State.ASYNC_WAIT)
{
handle=true;
_state=State.ASYNC_WOKEN;
}
}
- cancelTimeout();
+ cancelTimeout(event);
if (handle)
- {
- ContextHandler handler=getContextHandler();
- if (handler!=null)
- handler.handle(_channel);
- else
- _channel.handle();
- }
+ runInContext(event,_channel);
}
public void errorComplete()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
_async=Async.COMPLETE;
_event.setDispatchContext(null);
@@ -468,22 +614,58 @@ public class HttpChannelState
cancelTimeout();
}
- protected void completed()
+ protected void onError()
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
- synchronized (this)
+
+ try(Locker.Lock lock= _locker.lock())
+ {
+ if (_state!=State.DISPATCHED/* || _async!=Async.ERRORING*/)
+ throw new IllegalStateException(this.getStatusStringLocked());
+
+ aListeners=_asyncListeners;
+ event=_event;
+ _async=Async.ERRORED;
+ }
+
+ if (event!=null && aListeners!=null)
+ {
+ event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
+ event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
+ for (AsyncListener listener : aListeners)
+ {
+ try
+ {
+ listener.onError(event);
+ }
+ catch(Exception x)
+ {
+ LOG.info("Exception while invoking listener " + listener, x);
+ }
+ }
+ }
+ }
+
+
+ protected void onComplete()
+ {
+ final List<AsyncListener> aListeners;
+ final AsyncContextEvent event;
+
+ try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
case COMPLETING:
- _state=State.COMPLETED;
aListeners=_asyncListeners;
event=_event;
+ _state=State.COMPLETED;
+ _async=null;
break;
default:
- throw new IllegalStateException(this.getStatusString());
+ throw new IllegalStateException(this.getStatusStringLocked());
}
}
@@ -491,41 +673,46 @@ public class HttpChannelState
{
if (aListeners!=null)
{
- if (event.getThrowable()!=null)
+ Runnable callback = new Runnable()
{
- event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
- event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
- }
-
- for (AsyncListener listener : aListeners)
- {
- try
+ @Override
+ public void run()
{
- if (event.getThrowable()!=null)
- listener.onError(event);
- else
- listener.onComplete(event);
- }
- catch(Exception e)
+ for (AsyncListener listener : aListeners)
+ {
+ try
+ {
+ listener.onComplete(event);
+ }
+ catch(Exception e)
+ {
+ LOG.warn(e);
+ }
+ }
+ }
+ @Override
+ public String toString()
{
- LOG.warn(e);
+ return "onComplete";
}
- }
+ };
+
+ runInContext(event,callback);
}
-
event.completed();
}
}
protected void recycle()
{
- synchronized (this)
+ cancelTimeout();
+ try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
case DISPATCHED:
case ASYNC_IO:
- throw new IllegalStateException(getStatusString());
+ throw new IllegalStateException(getStatusStringLocked());
case UPGRADED:
return;
default:
@@ -535,17 +722,17 @@ public class HttpChannelState
_state=State.IDLE;
_async=null;
_initial=true;
- _asyncRead=false;
+ _asyncReadPossible=_asyncReadUnready=false;
_asyncWrite=false;
_timeoutMs=DEFAULT_TIMEOUT;
- cancelTimeout();
_event=null;
}
}
-
+
public void upgrade()
{
- synchronized (this)
+ cancelTimeout();
+ try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
@@ -553,47 +740,58 @@ public class HttpChannelState
case COMPLETED:
break;
default:
- throw new IllegalStateException(getStatusString());
+ throw new IllegalStateException(getStatusStringLocked());
}
_asyncListeners=null;
_state=State.UPGRADED;
_async=null;
_initial=true;
- _asyncRead=false;
+ _asyncReadPossible=_asyncReadUnready=false;
_asyncWrite=false;
_timeoutMs=DEFAULT_TIMEOUT;
- cancelTimeout();
_event=null;
}
}
-
protected void scheduleDispatch()
{
_channel.execute(_channel);
}
- protected void scheduleTimeout()
+ protected void scheduleTimeout(AsyncContextEvent event)
{
Scheduler scheduler = _channel.getScheduler();
if (scheduler!=null && _timeoutMs>0)
- _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
+ event.setTimeoutTask(scheduler.schedule(event,_timeoutMs,TimeUnit.MILLISECONDS));
}
protected void cancelTimeout()
{
final AsyncContextEvent event;
- synchronized (this)
- {
+ try(Locker.Lock lock= _locker.lock())
+ {
event=_event;
}
+ cancelTimeout(event);
+ }
+
+ protected void cancelTimeout(AsyncContextEvent event)
+ {
if (event!=null)
event.cancelTimeoutTask();
}
+
+ public boolean isIdle()
+ {
+ try(Locker.Lock lock= _locker.lock())
+ {
+ return _state==State.IDLE;
+ }
+ }
public boolean isExpired()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
return _async==Async.EXPIRED;
}
@@ -601,7 +799,7 @@ public class HttpChannelState
public boolean isInitial()
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
return _initial;
}
@@ -609,7 +807,7 @@ public class HttpChannelState
public boolean isSuspended()
{
- synchronized(this)
+ try(Locker.Lock lock= _locker.lock())
{
return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED;
}
@@ -617,7 +815,7 @@ public class HttpChannelState
boolean isCompleting()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
return _state==State.COMPLETING;
}
@@ -625,7 +823,7 @@ public class HttpChannelState
boolean isCompleted()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
return _state == State.COMPLETED;
}
@@ -633,8 +831,8 @@ public class HttpChannelState
public boolean isAsyncStarted()
{
- synchronized (this)
- {
+ try(Locker.Lock lock= _locker.lock())
+ {
if (_state==State.DISPATCHED)
return _async!=null;
return _async==Async.STARTED || _async==Async.EXPIRING;
@@ -643,7 +841,7 @@ public class HttpChannelState
public boolean isAsync()
{
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
return !_initial || _async!=null;
}
@@ -654,7 +852,7 @@ public class HttpChannelState
return _channel.getRequest();
}
- public HttpChannel<?> getHttpChannel()
+ public HttpChannel getHttpChannel()
{
return _channel;
}
@@ -662,11 +860,15 @@ public class HttpChannelState
public ContextHandler getContextHandler()
{
final AsyncContextEvent event;
- synchronized (this)
- {
+ try(Locker.Lock lock= _locker.lock())
+ {
event=_event;
}
-
+ return getContextHandler(event);
+ }
+
+ ContextHandler getContextHandler(AsyncContextEvent event)
+ {
if (event!=null)
{
Context context=((Context)event.getServletContext());
@@ -679,15 +881,29 @@ public class HttpChannelState
public ServletResponse getServletResponse()
{
final AsyncContextEvent event;
- synchronized (this)
- {
+ try(Locker.Lock lock= _locker.lock())
+ {
event=_event;
}
+ return getServletResponse(event);
+ }
+
+ public ServletResponse getServletResponse(AsyncContextEvent event)
+ {
if (event!=null && event.getSuppliedResponse()!=null)
return event.getSuppliedResponse();
return _channel.getResponse();
}
-
+
+ void runInContext(AsyncContextEvent event,Runnable runnable)
+ {
+ ContextHandler contextHandler = getContextHandler(event);
+ if (contextHandler==null)
+ runnable.run();
+ else
+ contextHandler.handle(_channel.getRequest(),runnable);
+ }
+
public Object getAttribute(String name)
{
return _channel.getRequest().getAttribute(name);
@@ -703,29 +919,91 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute);
}
- public void onReadPossible()
+
+ /* ------------------------------------------------------------ */
+ /** Called to signal async read isReady() has returned false.
+ * This indicates that there is no content available to be consumed
+ * and that once the channel enteres the ASYNC_WAIT state it will
+ * register for read interest by calling {@link HttpChannel#asyncReadFillInterested()}
+ * either from this method or from a subsequent call to {@link #unhandle()}.
+ */
+ public void onReadUnready()
{
- boolean handle=false;
+ boolean interested=false;
+ try(Locker.Lock lock= _locker.lock())
+ {
+ // We were already unready, this is not a state change, so do nothing
+ if (!_asyncReadUnready)
+ {
+ _asyncReadUnready=true;
+ _asyncReadPossible=false; // Assumes this has been checked in isReady() with lock held
+ if (_state==State.ASYNC_WAIT)
+ interested=true;
+ }
+ }
+
+ if (interested)
+ _channel.asyncReadFillInterested();
+ }
- synchronized (this)
+ /* ------------------------------------------------------------ */
+ /** Called to signal that content is now available to read.
+ * If the channel is in ASYNC_WAIT state and unready (ie isReady() has
+ * returned false), then the state is changed to ASYNC_WOKEN and true
+ * is returned.
+ * @return True IFF the channel was unready and in ASYNC_WAIT state
+ */
+ public boolean onReadPossible()
+ {
+ boolean woken=false;
+ try(Locker.Lock lock= _locker.lock())
{
- _asyncRead=true;
+ _asyncReadPossible=true;
+ if (_state==State.ASYNC_WAIT && _asyncReadUnready)
+ {
+ woken=true;
+ _state=State.ASYNC_WOKEN;
+ }
+ }
+ return woken;
+ }
+
+ /* ------------------------------------------------------------ */
+ /** Called to signal that the channel is ready for a callback.
+ * This is similar to calling {@link #onReadUnready()} followed by
+ * {@link #onReadPossible()}, except that as content is already
+ * available, read interest is never set.
+ * @return true if woken
+ */
+ public boolean onReadReady()
+ {
+ boolean woken=false;
+ try(Locker.Lock lock= _locker.lock())
+ {
+ _asyncReadUnready=true;
+ _asyncReadPossible=true;
if (_state==State.ASYNC_WAIT)
{
+ woken=true;
_state=State.ASYNC_WOKEN;
- handle=true;
}
}
+ return woken;
+ }
- if (handle)
- _channel.execute(_channel);
+ public boolean isReadPossible()
+ {
+ try(Locker.Lock lock= _locker.lock())
+ {
+ return _asyncReadPossible;
+ }
}
-
- public void onWritePossible()
+
+ public boolean onWritePossible()
{
boolean handle=false;
- synchronized (this)
+ try(Locker.Lock lock= _locker.lock())
{
_asyncWrite=true;
if (_state==State.ASYNC_WAIT)
@@ -735,8 +1013,7 @@ public class HttpChannelState
}
}
- if (handle)
- _channel.execute(_channel);
+ return handle;
}
-
+
}

Back to the top