diff options
Diffstat (limited to 'jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java')
-rw-r--r-- | jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java | 693 |
1 files changed, 485 insertions, 208 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 078a710676..8a2b1cb761 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.server; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.servlet.AsyncListener; import javax.servlet.RequestDispatcher; @@ -31,6 +32,7 @@ import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; /** @@ -42,17 +44,18 @@ public class HttpChannelState private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L); - /** The dispatched state of the HttpChannel, used to control the overall livecycle + /** + * The dispatched state of the HttpChannel, used to control the overall lifecycle */ public enum State { IDLE, // Idle request DISPATCHED, // Request dispatched to filter/servlet - ASYNC_WAIT, // Suspended and parked - ASYNC_WOKEN, // A thread has been dispatch to handle from ASYNCWAIT - ASYNC_IO, // Has been dispatched for async IO - COMPLETING, // Request is completable - COMPLETED, // Request is complete + ASYNC_WAIT, // Suspended and waiting + ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT + ASYNC_IO, // Dispatched for async IO + COMPLETING, // Response is completable + COMPLETED, // Response is completed UPGRADED // Request upgraded the connection } @@ -61,42 +64,48 @@ public class HttpChannelState */ public enum Action { - REQUEST_DISPATCH, // handle a normal request dispatch + DISPATCH, // handle a normal request dispatch ASYNC_DISPATCH, // handle an async request dispatch - ASYNC_EXPIRED, // handle an async timeout + ERROR_DISPATCH, // handle a normal error + ASYNC_ERROR, // handle an async error WRITE_CALLBACK, // handle an IO write callback READ_CALLBACK, // handle an IO read callback - WAIT, // Wait for further events - COMPLETE // Complete the channel + COMPLETE, // Complete the response + TERMINATED, // No further actions + WAIT, // Wait for further events } - + /** - * The state of the servlet async API. This can lead or follow the + * The state of the servlet async API. This can lead or follow the * channel dispatch state and also includes reasons such as expired, * dispatched or completed. */ public enum Async { - STARTED, - DISPATCH, - COMPLETE, - EXPIRING, - EXPIRED + STARTED, // AsyncContext.startAsync() has been called + DISPATCH, // + COMPLETE, // AsyncContext.complete() has been called + EXPIRING, // AsyncContext timeout just happened + EXPIRED, // AsyncContext timeout has been processed + ERRORING, // An error just happened + ERRORED // The error has been processed } private final boolean DEBUG=LOG.isDebugEnabled(); - private final HttpChannel<?> _channel; + private final Locker _locker=new Locker(); + private final HttpChannel _channel; private List<AsyncListener> _asyncListeners; private State _state; private Async _async; private boolean _initial; - private boolean _asyncRead; - private boolean _asyncWrite; + private boolean _asyncReadPossible; + private boolean _asyncReadUnready; + private boolean _asyncWrite; // TODO refactor same as read private long _timeoutMs=DEFAULT_TIMEOUT; private AsyncContextEvent _event; - protected HttpChannelState(HttpChannel<?> channel) + protected HttpChannelState(HttpChannel channel) { _channel=channel; _state=State.IDLE; @@ -106,7 +115,7 @@ public class HttpChannelState public State getState() { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { return _state; } @@ -114,7 +123,7 @@ public class HttpChannelState public void addListener(AsyncListener listener) { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { if (_asyncListeners==null) _asyncListeners=new ArrayList<>(); @@ -124,7 +133,7 @@ public class HttpChannelState public void setTimeout(long ms) { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { _timeoutMs=ms; } @@ -132,7 +141,7 @@ public class HttpChannelState public long getTimeout() { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { return _timeoutMs; } @@ -140,7 +149,7 @@ public class HttpChannelState public AsyncContextEvent getAsyncContextEvent() { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { return _event; } @@ -149,17 +158,24 @@ public class HttpChannelState @Override public String toString() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { - return String.format("%s@%x{s=%s i=%b a=%s}",getClass().getSimpleName(),hashCode(),_state,_initial,_async); + return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial, + _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"), + _asyncWrite); } } + private String getStatusStringLocked() + { + return String.format("s=%s i=%b a=%s",_state,_initial,_async); + } + public String getStatusString() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { - return String.format("s=%s i=%b a=%s",_state,_initial,_async); + return getStatusStringLocked(); } } @@ -168,37 +184,38 @@ public class HttpChannelState */ protected Action handling() { - synchronized (this) + if(DEBUG) + LOG.debug("{} handling {}",this,_state); + try(Locker.Lock lock= _locker.lock()) { - if(DEBUG) - LOG.debug("{} handling {}",this,_state); switch(_state) { case IDLE: _initial=true; _state=State.DISPATCHED; - return Action.REQUEST_DISPATCH; + return Action.DISPATCH; case COMPLETING: return Action.COMPLETE; case COMPLETED: - return Action.WAIT; + return Action.TERMINATED; case ASYNC_WOKEN: - if (_asyncRead) + if (_asyncReadPossible) { _state=State.ASYNC_IO; - _asyncRead=false; + _asyncReadUnready=false; return Action.READ_CALLBACK; } + if (_asyncWrite) { _state=State.ASYNC_IO; _asyncWrite=false; return Action.WRITE_CALLBACK; } - + if (_async!=null) { Async async=_async; @@ -216,20 +233,27 @@ public class HttpChannelState case EXPIRED: _state=State.DISPATCHED; _async=null; - return Action.ASYNC_EXPIRED; + return Action.ERROR_DISPATCH; case STARTED: - // TODO - if (DEBUG) - LOG.debug("TODO Fix this double dispatch",new IllegalStateException(this - .getStatusString())); return Action.WAIT; + case ERRORING: + _state=State.DISPATCHED; + return Action.ASYNC_ERROR; + + default: + throw new IllegalStateException(getStatusStringLocked()); } } - + return Action.WAIT; + case ASYNC_IO: + case ASYNC_WAIT: + case DISPATCHED: + case UPGRADED: default: - throw new IllegalStateException(this.getStatusString()); + throw new IllegalStateException(getStatusStringLocked()); + } } } @@ -237,40 +261,56 @@ public class HttpChannelState public void startAsync(AsyncContextEvent event) { final List<AsyncListener> lastAsyncListeners; - - synchronized (this) + + try(Locker.Lock lock= _locker.lock()) { if (_state!=State.DISPATCHED || _async!=null) - throw new IllegalStateException(this.getStatusString()); - + throw new IllegalStateException(this.getStatusStringLocked()); + _async=Async.STARTED; _event=event; lastAsyncListeners=_asyncListeners; - _asyncListeners=null; + _asyncListeners=null; } if (lastAsyncListeners!=null) { - for (AsyncListener listener : lastAsyncListeners) + Runnable callback=new Runnable() { - try + @Override + public void run() { - listener.onStartAsync(event); + for (AsyncListener listener : lastAsyncListeners) + { + try + { + listener.onStartAsync(event); + } + catch(Exception e) + { + // TODO Async Dispatch Error + LOG.warn(e); + } + } } - catch(Exception e) + @Override + public String toString() { - LOG.warn(e); + return "startAsync"; } - } + }; + + runInContext(event,callback); } } protected void error(Throwable th) { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { if (_event!=null) - _event.setThrowable(th); + _event.addThrowable(th); + _async=Async.ERRORING; } } @@ -283,32 +323,27 @@ public class HttpChannelState */ protected Action unhandle() { - synchronized (this) + Action action; + AsyncContextEvent schedule_event=null; + boolean read_interested=false; + + if(DEBUG) + LOG.debug("{} unhandle {}",this,_state); + + try(Locker.Lock lock= _locker.lock()) { - if(DEBUG) - LOG.debug("{} unhandle {}",this,_state); - switch(_state) { + case COMPLETING: + case COMPLETED: + return Action.TERMINATED; + case DISPATCHED: case ASYNC_IO: break; - default: - throw new IllegalStateException(this.getStatusString()); - } - if (_asyncRead) - { - _state=State.ASYNC_IO; - _asyncRead=false; - return Action.READ_CALLBACK; - } - - if (_asyncWrite) - { - _asyncWrite=false; - _state=State.ASYNC_IO; - return Action.WRITE_CALLBACK; + default: + throw new IllegalStateException(this.getStatusStringLocked()); } if (_async!=null) @@ -319,146 +354,257 @@ public class HttpChannelState case COMPLETE: _state=State.COMPLETING; _async=null; - return Action.COMPLETE; + action=Action.COMPLETE; + break; + case DISPATCH: _state=State.DISPATCHED; _async=null; - return Action.ASYNC_DISPATCH; + action=Action.ASYNC_DISPATCH; + break; + case EXPIRED: _state=State.DISPATCHED; _async=null; - return Action.ASYNC_EXPIRED; - case EXPIRING: + action = Action.ERROR_DISPATCH; + break; + case STARTED: - scheduleTimeout(); + if (_asyncReadUnready && _asyncReadPossible) + { + _state=State.ASYNC_IO; + _asyncReadUnready=false; + action = Action.READ_CALLBACK; + } + else if (_asyncWrite) // TODO refactor same as read + { + _asyncWrite=false; + _state=State.ASYNC_IO; + action=Action.WRITE_CALLBACK; + } + else + { + schedule_event=_event; + read_interested=_asyncReadUnready; + _state=State.ASYNC_WAIT; + action=Action.WAIT; + } + break; + + case EXPIRING: + schedule_event=_event; _state=State.ASYNC_WAIT; - return Action.WAIT; + action=Action.WAIT; + break; + + case ERRORING: + _state=State.DISPATCHED; + action=Action.ASYNC_ERROR; + break; + + case ERRORED: + _state=State.DISPATCHED; + action=Action.ERROR_DISPATCH; + _async=null; + break; + + default: + _state=State.COMPLETING; + action=Action.COMPLETE; + break; } } - - _state=State.COMPLETING; - return Action.COMPLETE; + else + { + _state=State.COMPLETING; + action=Action.COMPLETE; + } } + + if (schedule_event!=null) + scheduleTimeout(schedule_event); + if (read_interested) + _channel.asyncReadFillInterested(); + return action; } public void dispatch(ServletContext context, String path) { - boolean dispatch; - synchronized (this) + boolean dispatch=false; + AsyncContextEvent event=null; + try(Locker.Lock lock= _locker.lock()) { - if (_async!=Async.STARTED && _async!=Async.EXPIRING) - throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString()); + boolean started=false; + event=_event; + switch(_async) + { + case STARTED: + started=true; + break; + case EXPIRING: + case ERRORED: + break; + default: + throw new IllegalStateException(this.getStatusStringLocked()); + } _async=Async.DISPATCH; - + if (context!=null) _event.setDispatchContext(context); if (path!=null) _event.setDispatchPath(path); - - switch(_state) + + if (started) { - case DISPATCHED: - case ASYNC_IO: - dispatch=false; - break; - case ASYNC_WAIT: - _state=State.ASYNC_WOKEN; - dispatch=true; - break; - case ASYNC_WOKEN: - dispatch=false; - break; - default: - LOG.warn("async dispatched when complete {}",this); - dispatch=false; - break; + switch(_state) + { + case DISPATCHED: + case ASYNC_IO: + case ASYNC_WOKEN: + break; + case ASYNC_WAIT: + _state=State.ASYNC_WOKEN; + dispatch=true; + break; + default: + LOG.warn("async dispatched when complete {}",this); + break; + } } } - cancelTimeout(); + cancelTimeout(event); if (dispatch) scheduleDispatch(); } - protected void expired() + protected void onTimeout() { - final List<AsyncListener> aListeners; + final List<AsyncListener> listeners; AsyncContextEvent event; - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { if (_async!=Async.STARTED) return; _async=Async.EXPIRING; event=_event; - aListeners=_asyncListeners; + listeners=_asyncListeners; + } - if (aListeners!=null) + if (LOG.isDebugEnabled()) + LOG.debug("Async timeout {}",this); + + if (listeners!=null) { - for (AsyncListener listener : aListeners) + Runnable callback=new Runnable() { - try + @Override + public void run() { - listener.onTimeout(event); + for (AsyncListener listener : listeners) + { + try + { + listener.onTimeout(event); + } + catch(Exception e) + { + LOG.debug(e); + event.addThrowable(e); + _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); + break; + } + } } - catch(Exception e) + @Override + public String toString() { - LOG.debug(e); - event.setThrowable(e); - _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,e); - break; + return "onTimeout"; } - } + }; + + runInContext(event,callback); } - + boolean dispatch=false; - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { - if (_async==Async.EXPIRING) + switch(_async) { - _async=Async.EXPIRED; - if (_state==State.ASYNC_WAIT) - { - _state=State.ASYNC_WOKEN; - dispatch=true; - } + case EXPIRING: + if (event.getThrowable()==null) + { + _async=Async.EXPIRED; + _event.addThrowable(new TimeoutException("Async API violation")); + } + else + { + _async=Async.ERRORING; + } + break; + + case COMPLETE: + case DISPATCH: + break; + + default: + throw new IllegalStateException(); + } + + if (_state==State.ASYNC_WAIT) + { + _state=State.ASYNC_WOKEN; + dispatch=true; } } if (dispatch) + { + if (LOG.isDebugEnabled()) + LOG.debug("Dispatch after async timeout {}",this); scheduleDispatch(); + } } public void complete() { // just like resume, except don't set _dispatched=true; boolean handle=false; - synchronized (this) + AsyncContextEvent event=null; + try(Locker.Lock lock= _locker.lock()) { - if (_async!=Async.STARTED && _async!=Async.EXPIRING) - throw new IllegalStateException(this.getStatusString()); + boolean started=false; + event=_event; + + switch(_async) + { + case STARTED: + started=true; + break; + case EXPIRING: + case ERRORED: + break; + default: + throw new IllegalStateException(this.getStatusStringLocked()); + } _async=Async.COMPLETE; - if (_state==State.ASYNC_WAIT) + + if (started && _state==State.ASYNC_WAIT) { handle=true; _state=State.ASYNC_WOKEN; } } - cancelTimeout(); + cancelTimeout(event); if (handle) - { - ContextHandler handler=getContextHandler(); - if (handler!=null) - handler.handle(_channel); - else - _channel.handle(); - } + runInContext(event,_channel); } public void errorComplete() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { _async=Async.COMPLETE; _event.setDispatchContext(null); @@ -468,22 +614,58 @@ public class HttpChannelState cancelTimeout(); } - protected void completed() + protected void onError() { final List<AsyncListener> aListeners; final AsyncContextEvent event; - synchronized (this) + + try(Locker.Lock lock= _locker.lock()) + { + if (_state!=State.DISPATCHED/* || _async!=Async.ERRORING*/) + throw new IllegalStateException(this.getStatusStringLocked()); + + aListeners=_asyncListeners; + event=_event; + _async=Async.ERRORED; + } + + if (event!=null && aListeners!=null) + { + event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); + event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage()); + for (AsyncListener listener : aListeners) + { + try + { + listener.onError(event); + } + catch(Exception x) + { + LOG.info("Exception while invoking listener " + listener, x); + } + } + } + } + + + protected void onComplete() + { + final List<AsyncListener> aListeners; + final AsyncContextEvent event; + + try(Locker.Lock lock= _locker.lock()) { switch(_state) { case COMPLETING: - _state=State.COMPLETED; aListeners=_asyncListeners; event=_event; + _state=State.COMPLETED; + _async=null; break; default: - throw new IllegalStateException(this.getStatusString()); + throw new IllegalStateException(this.getStatusStringLocked()); } } @@ -491,41 +673,46 @@ public class HttpChannelState { if (aListeners!=null) { - if (event.getThrowable()!=null) + Runnable callback = new Runnable() { - event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); - event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage()); - } - - for (AsyncListener listener : aListeners) - { - try + @Override + public void run() { - if (event.getThrowable()!=null) - listener.onError(event); - else - listener.onComplete(event); - } - catch(Exception e) + for (AsyncListener listener : aListeners) + { + try + { + listener.onComplete(event); + } + catch(Exception e) + { + LOG.warn(e); + } + } + } + @Override + public String toString() { - LOG.warn(e); + return "onComplete"; } - } + }; + + runInContext(event,callback); } - event.completed(); } } protected void recycle() { - synchronized (this) + cancelTimeout(); + try(Locker.Lock lock= _locker.lock()) { switch(_state) { case DISPATCHED: case ASYNC_IO: - throw new IllegalStateException(getStatusString()); + throw new IllegalStateException(getStatusStringLocked()); case UPGRADED: return; default: @@ -535,17 +722,17 @@ public class HttpChannelState _state=State.IDLE; _async=null; _initial=true; - _asyncRead=false; + _asyncReadPossible=_asyncReadUnready=false; _asyncWrite=false; _timeoutMs=DEFAULT_TIMEOUT; - cancelTimeout(); _event=null; } } - + public void upgrade() { - synchronized (this) + cancelTimeout(); + try(Locker.Lock lock= _locker.lock()) { switch(_state) { @@ -553,47 +740,58 @@ public class HttpChannelState case COMPLETED: break; default: - throw new IllegalStateException(getStatusString()); + throw new IllegalStateException(getStatusStringLocked()); } _asyncListeners=null; _state=State.UPGRADED; _async=null; _initial=true; - _asyncRead=false; + _asyncReadPossible=_asyncReadUnready=false; _asyncWrite=false; _timeoutMs=DEFAULT_TIMEOUT; - cancelTimeout(); _event=null; } } - protected void scheduleDispatch() { _channel.execute(_channel); } - protected void scheduleTimeout() + protected void scheduleTimeout(AsyncContextEvent event) { Scheduler scheduler = _channel.getScheduler(); if (scheduler!=null && _timeoutMs>0) - _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS)); + event.setTimeoutTask(scheduler.schedule(event,_timeoutMs,TimeUnit.MILLISECONDS)); } protected void cancelTimeout() { final AsyncContextEvent event; - synchronized (this) - { + try(Locker.Lock lock= _locker.lock()) + { event=_event; } + cancelTimeout(event); + } + + protected void cancelTimeout(AsyncContextEvent event) + { if (event!=null) event.cancelTimeoutTask(); } + + public boolean isIdle() + { + try(Locker.Lock lock= _locker.lock()) + { + return _state==State.IDLE; + } + } public boolean isExpired() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { return _async==Async.EXPIRED; } @@ -601,7 +799,7 @@ public class HttpChannelState public boolean isInitial() { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { return _initial; } @@ -609,7 +807,7 @@ public class HttpChannelState public boolean isSuspended() { - synchronized(this) + try(Locker.Lock lock= _locker.lock()) { return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED; } @@ -617,7 +815,7 @@ public class HttpChannelState boolean isCompleting() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { return _state==State.COMPLETING; } @@ -625,7 +823,7 @@ public class HttpChannelState boolean isCompleted() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { return _state == State.COMPLETED; } @@ -633,8 +831,8 @@ public class HttpChannelState public boolean isAsyncStarted() { - synchronized (this) - { + try(Locker.Lock lock= _locker.lock()) + { if (_state==State.DISPATCHED) return _async!=null; return _async==Async.STARTED || _async==Async.EXPIRING; @@ -643,7 +841,7 @@ public class HttpChannelState public boolean isAsync() { - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { return !_initial || _async!=null; } @@ -654,7 +852,7 @@ public class HttpChannelState return _channel.getRequest(); } - public HttpChannel<?> getHttpChannel() + public HttpChannel getHttpChannel() { return _channel; } @@ -662,11 +860,15 @@ public class HttpChannelState public ContextHandler getContextHandler() { final AsyncContextEvent event; - synchronized (this) - { + try(Locker.Lock lock= _locker.lock()) + { event=_event; } - + return getContextHandler(event); + } + + ContextHandler getContextHandler(AsyncContextEvent event) + { if (event!=null) { Context context=((Context)event.getServletContext()); @@ -679,15 +881,29 @@ public class HttpChannelState public ServletResponse getServletResponse() { final AsyncContextEvent event; - synchronized (this) - { + try(Locker.Lock lock= _locker.lock()) + { event=_event; } + return getServletResponse(event); + } + + public ServletResponse getServletResponse(AsyncContextEvent event) + { if (event!=null && event.getSuppliedResponse()!=null) return event.getSuppliedResponse(); return _channel.getResponse(); } - + + void runInContext(AsyncContextEvent event,Runnable runnable) + { + ContextHandler contextHandler = getContextHandler(event); + if (contextHandler==null) + runnable.run(); + else + contextHandler.handle(_channel.getRequest(),runnable); + } + public Object getAttribute(String name) { return _channel.getRequest().getAttribute(name); @@ -703,29 +919,91 @@ public class HttpChannelState _channel.getRequest().setAttribute(name,attribute); } - public void onReadPossible() + + /* ------------------------------------------------------------ */ + /** Called to signal async read isReady() has returned false. + * This indicates that there is no content available to be consumed + * and that once the channel enteres the ASYNC_WAIT state it will + * register for read interest by calling {@link HttpChannel#asyncReadFillInterested()} + * either from this method or from a subsequent call to {@link #unhandle()}. + */ + public void onReadUnready() { - boolean handle=false; + boolean interested=false; + try(Locker.Lock lock= _locker.lock()) + { + // We were already unready, this is not a state change, so do nothing + if (!_asyncReadUnready) + { + _asyncReadUnready=true; + _asyncReadPossible=false; // Assumes this has been checked in isReady() with lock held + if (_state==State.ASYNC_WAIT) + interested=true; + } + } + + if (interested) + _channel.asyncReadFillInterested(); + } - synchronized (this) + /* ------------------------------------------------------------ */ + /** Called to signal that content is now available to read. + * If the channel is in ASYNC_WAIT state and unready (ie isReady() has + * returned false), then the state is changed to ASYNC_WOKEN and true + * is returned. + * @return True IFF the channel was unready and in ASYNC_WAIT state + */ + public boolean onReadPossible() + { + boolean woken=false; + try(Locker.Lock lock= _locker.lock()) { - _asyncRead=true; + _asyncReadPossible=true; + if (_state==State.ASYNC_WAIT && _asyncReadUnready) + { + woken=true; + _state=State.ASYNC_WOKEN; + } + } + return woken; + } + + /* ------------------------------------------------------------ */ + /** Called to signal that the channel is ready for a callback. + * This is similar to calling {@link #onReadUnready()} followed by + * {@link #onReadPossible()}, except that as content is already + * available, read interest is never set. + * @return true if woken + */ + public boolean onReadReady() + { + boolean woken=false; + try(Locker.Lock lock= _locker.lock()) + { + _asyncReadUnready=true; + _asyncReadPossible=true; if (_state==State.ASYNC_WAIT) { + woken=true; _state=State.ASYNC_WOKEN; - handle=true; } } + return woken; + } - if (handle) - _channel.execute(_channel); + public boolean isReadPossible() + { + try(Locker.Lock lock= _locker.lock()) + { + return _asyncReadPossible; + } } - - public void onWritePossible() + + public boolean onWritePossible() { boolean handle=false; - synchronized (this) + try(Locker.Lock lock= _locker.lock()) { _asyncWrite=true; if (_state==State.ASYNC_WAIT) @@ -735,8 +1013,7 @@ public class HttpChannelState } } - if (handle) - _channel.execute(_channel); + return handle; } - + } |