diff options
Diffstat (limited to 'jetty-server/src/main/java')
32 files changed, 1875 insertions, 977 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 28fea907e5..0747b01c52 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -268,10 +268,13 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co protected void interruptAcceptors() { - for (Thread thread : _acceptors) + synchronized (this) { - if (thread != null) - thread.interrupt(); + for (Thread thread : _acceptors) + { + if (thread != null) + thread.interrupt(); + } } } @@ -306,9 +309,12 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co public void join(long timeout) throws InterruptedException { - for (Thread thread : _acceptors) - if (thread != null) - thread.join(timeout); + synchronized (this) + { + for (Thread thread : _acceptors) + if (thread != null) + thread.join(timeout); + } } protected abstract void accept(int acceptorID) throws IOException, InterruptedException; @@ -464,7 +470,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co if (isAccepting()) LOG.warn(e); else - LOG.debug(e); + LOG.ignore(e); } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java index 876e7aeae3..2538f41bd3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java @@ -28,6 +28,8 @@ import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import org.eclipse.jetty.server.handler.ContextHandler; + public class AsyncContextState implements AsyncContext { @@ -92,17 +94,20 @@ public class AsyncContextState implements AsyncContext @Override public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException - { + { + ContextHandler contextHandler = state().getContextHandler(); + if (contextHandler != null) + return contextHandler.getServletContext().createListener(clazz); try { return clazz.newInstance(); } - catch(Exception e) + catch (Exception e) { throw new ServletException(e); } } - + @Override public void dispatch() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferQueuedHttpInput.java index 90d81d90e2..db5e841927 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferHttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferQueuedHttpInput.java @@ -20,10 +20,12 @@ package org.eclipse.jetty.server; import java.nio.ByteBuffer; +import javax.servlet.ReadListener; + /** * <p>An implementation of HttpInput using {@link ByteBuffer} as items.</p> */ -public class ByteBufferHttpInput extends HttpInput<ByteBuffer> +public class ByteBufferQueuedHttpInput extends QueuedHttpInput<ByteBuffer> { @Override protected int remaining(ByteBuffer item) @@ -38,9 +40,16 @@ public class ByteBufferHttpInput extends HttpInput<ByteBuffer> item.get(buffer, offset, l); return l; } + + @Override + protected void consume(ByteBuffer item, int length) + { + item.position(item.position()+length); + } @Override protected void onContentConsumed(ByteBuffer item) { } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java index 7725a342eb..fd8c1c5eea 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java @@ -49,10 +49,10 @@ public class EncodingHttpWriter extends HttpWriter public void write (char[] s,int offset, int length) throws IOException { HttpOutput out = _out; - if (length==0) + if (length==0 && out.isAllContentWritten()) { - if (_out.isAllContentWritten()) - close(); + out.close(); + return; } while (length > 0) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java index 608a861d7f..845cbc8110 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java @@ -44,7 +44,7 @@ import org.eclipse.jetty.server.HttpConfiguration.Customizer; * the request came</p> * <p>Headers can also be defined so that forwarded SSL Session IDs and Cipher * suites may be customised</p> - * @see http://en.wikipedia.org/wiki/X-Forwarded-For + * @see <a href="http://en.wikipedia.org/wiki/X-Forwarded-For">Wikipedia: X-Forwarded-For</a> */ public class ForwardedRequestCustomizer implements Customizer { @@ -66,7 +66,6 @@ public class ForwardedRequestCustomizer implements Customizer /* ------------------------------------------------------------ */ /** * Set a forced valued for the host header to control what is returned by {@link ServletRequest#getServerName()} and {@link ServletRequest#getServerPort()}. - * This value is only used if {@link #isForwarded()} is true. * * @param hostHeader * The value of the host header to force. @@ -90,7 +89,6 @@ public class ForwardedRequestCustomizer implements Customizer /** * @param forwardedHostHeader * The header name for forwarded hosts (default x-forwarded-host) - * @see #setForwarded(boolean) */ public void setForwardedHostHeader(String forwardedHostHeader) { @@ -100,7 +98,6 @@ public class ForwardedRequestCustomizer implements Customizer /* ------------------------------------------------------------ */ /** * @return the header name for forwarded server. - * @see #setForwarded(boolean) */ public String getForwardedServerHeader() { @@ -111,7 +108,6 @@ public class ForwardedRequestCustomizer implements Customizer /** * @param forwardedServerHeader * The header name for forwarded server (default x-forwarded-server) - * @see #setForwarded(boolean) */ public void setForwardedServerHeader(String forwardedServerHeader) { @@ -121,7 +117,6 @@ public class ForwardedRequestCustomizer implements Customizer /* ------------------------------------------------------------ */ /** * @return the forwarded for header - * @see #setForwarded(boolean) */ public String getForwardedForHeader() { @@ -132,7 +127,6 @@ public class ForwardedRequestCustomizer implements Customizer /** * @param forwardedRemoteAddressHeader * The header name for forwarded for (default x-forwarded-for) - * @see #setForwarded(boolean) */ public void setForwardedForHeader(String forwardedRemoteAddressHeader) { @@ -144,7 +138,6 @@ public class ForwardedRequestCustomizer implements Customizer * Get the forwardedProtoHeader. * * @return the forwardedProtoHeader (default X-Forwarded-For) - * @see #setForwarded(boolean) */ public String getForwardedProtoHeader() { @@ -157,7 +150,6 @@ public class ForwardedRequestCustomizer implements Customizer * * @param forwardedProtoHeader * the forwardedProtoHeader to set (default X-Forwarded-For) - * @see #setForwarded(boolean) */ public void setForwardedProtoHeader(String forwardedProtoHeader) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java index 7cedf160ad..bb96ef689c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java @@ -56,10 +56,10 @@ public interface Handler extends LifeCycle, Destroyable * @param target The target of the request - either a URI or a name. * @param baseRequest The original unwrapped request object. * @param request The request either as the {@link Request} - * object or a wrapper of that request. The {@link AbstractHttpConnection#getCurrentHttpChannel()} + * object or a wrapper of that request. The {@link HttpChannel#getCurrentHttpChannel()} * method can be used access the Request object if required. * @param response The response as the {@link Response} - * object or a wrapper of that request. The {@link AbstractHttpConnection#getCurrentHttpChannel()} + * object or a wrapper of that request. The {@link HttpChannel#getCurrentHttpChannel()} * method can be used access the Response object if required. * @throws IOException * @throws ServletException diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index de338c13db..932135d2a2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.DispatcherType; import javax.servlet.RequestDispatcher; +import javax.servlet.WriteListener; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; @@ -43,7 +44,8 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.server.HttpChannelState.Next; +import org.eclipse.jetty.server.HttpChannelState.Action; +import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ErrorHandler; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.Callback; @@ -105,6 +107,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable _uri = new HttpURI(URIUtil.__CHARSET); _state = new HttpChannelState(this); + input.init(_state); _request = new Request(this, input); _response = new Response(this, new HttpOutput(this)); } @@ -250,36 +253,69 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable // The loop is controlled by the call to async.unhandle in the // finally block below. Unhandle will return false only if an async dispatch has // already happened when unhandle is called. - HttpChannelState.Next next = _state.handling(); - while (next==Next.CONTINUE && getServer().isRunning()) + HttpChannelState.Action action = _state.handling(); + loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && getServer().isRunning()) { try { - _request.setHandled(false); - _response.getHttpOutput().reopen(); - - if (_state.isInitial()) + LOG.debug("{} action {}",this,action); + + switch(action) { - _request.setTimeStamp(System.currentTimeMillis()); - _request.setDispatcherType(DispatcherType.REQUEST); - - for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) - customizer.customize(getConnector(),_configuration,_request); - getServer().handle(this); - } - else - { - if (_request.getHttpChannelState().isExpired()) - { + case REQUEST_DISPATCH: + _request.setHandled(false); + _response.getHttpOutput().reopen(); + _request.setTimeStamp(System.currentTimeMillis()); + _request.setDispatcherType(DispatcherType.REQUEST); + + for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) + customizer.customize(getConnector(),_configuration,_request); + getServer().handle(this); + break; + + case ASYNC_DISPATCH: + _request.setHandled(false); + _response.getHttpOutput().reopen(); + _request.setDispatcherType(DispatcherType.ASYNC); + getServer().handleAsync(this); + break; + + case ASYNC_EXPIRED: + _request.setHandled(false); + _response.getHttpOutput().reopen(); _request.setDispatcherType(DispatcherType.ERROR); _request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500)); _request.setAttribute(RequestDispatcher.ERROR_MESSAGE,"Async Timeout"); _request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI()); _response.setStatusWithReason(500,"Async Timeout"); + + getServer().handleAsync(this); + break; + + case READ_CALLBACK: + { + ContextHandler handler=_state.getContextHandler(); + if (handler!=null) + handler.handle(_request.getHttpInput()); + else + _request.getHttpInput().run(); + break; } - else - _request.setDispatcherType(DispatcherType.ASYNC); - getServer().handleAsync(this); + + case WRITE_CALLBACK: + { + ContextHandler handler=_state.getContextHandler(); + + if (handler!=null) + handler.handle(_response.getHttpOutput()); + else + _response.getHttpOutput().run(); + break; + } + + default: + break loop; + } } catch (Error e) @@ -301,7 +337,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable } finally { - next = _state.unhandle(); + action = _state.unhandle(); } } @@ -309,7 +345,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable Thread.currentThread().setName(threadName); setCurrentHttpChannel(null); - if (next==Next.COMPLETE) + if (action==Action.COMPLETE) { try { @@ -327,7 +363,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable } catch(Exception e) { - LOG.warn(e); + LOG.warn("handle complete",e); } finally { @@ -336,9 +372,9 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable } } - LOG.debug("{} handle exit, result {}", this, next); + LOG.debug("{} handle exit, result {}", this, action); - return next!=Next.WAIT; + return action!=Action.WAIT; } /** @@ -576,7 +612,8 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable @Override public boolean messageComplete() { - _request.getHttpInput().shutdown(); + LOG.debug("{} messageComplete", this); + _request.getHttpInput().messageComplete(); return true; } @@ -594,17 +631,19 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable try { - if (_state.handling()==Next.CONTINUE) + if (_state.handling()==Action.REQUEST_DISPATCH) sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true); } catch (IOException e) { - LOG.warn(e); + LOG.debug(e); } finally { - if (_state.unhandle()==Next.COMPLETE) + if (_state.unhandle()==Action.COMPLETE) _state.completed(); + else + throw new IllegalStateException(); } } @@ -727,7 +766,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable } else { - LOG.warn(x); + LOG.warn("Commit failed",x); _transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback() { @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 127bafa3c1..470acc0a02 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -21,7 +21,6 @@ package org.eclipse.jetty.server; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; - import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.RequestDispatcher; @@ -62,32 +61,41 @@ public class HttpChannelState { IDLE, // Idle request DISPATCHED, // Request dispatched to filter/servlet - ASYNCSTARTED, // Suspend called, but not yet returned to container - REDISPATCHING, // resumed while dispatched ASYNCWAIT, // Suspended and parked - REDISPATCH, // Has been scheduled - REDISPATCHED, // Request redispatched to filter/servlet - COMPLETECALLED,// complete called + ASYNCIO, // Has been dispatched for async IO COMPLETING, // Request is completable COMPLETED // Request is complete } - public enum Next + public enum Action + { + REQUEST_DISPATCH, // handle a normal request dispatch + ASYNC_DISPATCH, // handle an async request dispatch + ASYNC_EXPIRED, // handle an async timeout + WRITE_CALLBACK, // handle an IO write callback + READ_CALLBACK, // handle an IO read callback + WAIT, // Wait for further events + COMPLETE // Complete the channel + } + + public enum Async { - CONTINUE, // Continue handling the channel - WAIT, // Wait for further events - COMPLETE // Complete the channel + STARTED, + DISPATCH, + COMPLETE, + EXPIRING, + EXPIRED } + private final boolean DEBUG=LOG.isDebugEnabled(); private final HttpChannel<?> _channel; - private List<AsyncListener> _lastAsyncListeners; - private List<AsyncListener> _asyncListeners; + private List<AsyncListener> _asyncListeners; private State _state; + private Async _async; private boolean _initial; - private boolean _dispatched; - private boolean _expired; - private volatile boolean _responseWrapped; + private boolean _asyncRead; + private boolean _asyncWrite; private long _timeoutMs=DEFAULT_TIMEOUT; private AsyncContextEvent _event; @@ -95,6 +103,7 @@ public class HttpChannelState { _channel=channel; _state=State.IDLE; + _async=null; _initial=true; } @@ -145,7 +154,7 @@ public class HttpChannelState { synchronized (this) { - return super.toString()+"@"+getStatusString(); + return String.format("%s@%x{s=%s i=%b a=%s}",getClass().getSimpleName(),hashCode(),_state,_initial,_async); } } @@ -153,97 +162,102 @@ public class HttpChannelState { synchronized (this) { - return _state+ - (_initial?",initial":"")+ - (_dispatched?",resumed":"")+ - (_expired?",expired":""); + return String.format("s=%s i=%b a=%s",_state,_initial,_async); } } /** * @return Next handling of the request should proceed */ - protected Next handling() + protected Action handling() { synchronized (this) { + if(DEBUG) + LOG.debug("{} handling {}",this,_state); switch(_state) { case IDLE: _initial=true; _state=State.DISPATCHED; - if (_lastAsyncListeners!=null) - _lastAsyncListeners.clear(); - if (_asyncListeners!=null) - _asyncListeners.clear(); - else - { - _asyncListeners=_lastAsyncListeners; - _lastAsyncListeners=null; - } - break; - - case COMPLETECALLED: - _state=State.COMPLETING; - return Next.COMPLETE; + return Action.REQUEST_DISPATCH; case COMPLETING: - return Next.COMPLETE; - - case ASYNCWAIT: - return Next.WAIT; + return Action.COMPLETE; case COMPLETED: - return Next.WAIT; + return Action.WAIT; - case REDISPATCH: - _state=State.REDISPATCHED; - break; + case ASYNCWAIT: + if (_asyncRead) + { + _state=State.ASYNCIO; + _asyncRead=false; + return Action.READ_CALLBACK; + } + if (_asyncWrite) + { + _state=State.ASYNCIO; + _asyncWrite=false; + return Action.WRITE_CALLBACK; + } + + if (_async!=null) + { + Async async=_async; + switch(async) + { + case COMPLETE: + _state=State.COMPLETING; + return Action.COMPLETE; + case DISPATCH: + _state=State.DISPATCHED; + _async=null; + return Action.ASYNC_DISPATCH; + case EXPIRING: + break; + case EXPIRED: + _state=State.DISPATCHED; + _async=null; + return Action.ASYNC_EXPIRED; + case STARTED: + if (DEBUG) + LOG.debug("TODO Fix this double dispatch",new IllegalStateException(this + .getStatusString())); + return Action.WAIT; + } + } + + return Action.WAIT; default: throw new IllegalStateException(this.getStatusString()); } - - _responseWrapped=false; - return Next.CONTINUE; - } } - public void startAsync(AsyncContextEvent event) { + final List<AsyncListener> lastAsyncListeners; + synchronized (this) { - switch(_state) - { - case DISPATCHED: - case REDISPATCHED: - _dispatched=false; - _expired=false; - _responseWrapped=event.getSuppliedResponse()!=_channel.getResponse(); - _responseWrapped=false; - _event=event; - _state=State.ASYNCSTARTED; - List<AsyncListener> listeners=_lastAsyncListeners; - _lastAsyncListeners=_asyncListeners; - if (listeners!=null) - listeners.clear(); - _asyncListeners=listeners; - break; - - default: - throw new IllegalStateException(this.getStatusString()); - } + if (_state!=State.DISPATCHED || _async!=null) + throw new IllegalStateException(this.getStatusString()); + + _async=Async.STARTED; + _event=event; + lastAsyncListeners=_asyncListeners; + _asyncListeners=null; } - if (_lastAsyncListeners!=null) + if (lastAsyncListeners!=null) { - for (AsyncListener listener : _lastAsyncListeners) + for (AsyncListener listener : lastAsyncListeners) { try { - listener.onStartAsync(_event); + listener.onStartAsync(event); } catch(Exception e) { @@ -269,39 +283,63 @@ public class HttpChannelState * @return next actions * be handled again (eg because of a resume that happened before unhandle was called) */ - protected Next unhandle() + protected Action unhandle() { synchronized (this) { + if(DEBUG) + LOG.debug("{} unhandle {}",this,_state); + switch(_state) { - case REDISPATCHED: case DISPATCHED: - _state=State.COMPLETING; - return Next.COMPLETE; - - case IDLE: + case ASYNCIO: + break; + default: throw new IllegalStateException(this.getStatusString()); + } - case ASYNCSTARTED: - _initial=false; - _state=State.ASYNCWAIT; - scheduleTimeout(); - return Next.WAIT; - - case REDISPATCHING: - _initial=false; - _state=State.REDISPATCHED; - return Next.CONTINUE; - - case COMPLETECALLED: - _initial=false; - _state=State.COMPLETING; - return Next.COMPLETE; + if (_asyncRead) + { + _state=State.ASYNCIO; + _asyncRead=false; + return Action.READ_CALLBACK; + } + + if (_asyncWrite) + { + _asyncWrite=false; + _state=State.ASYNCIO; + return Action.WRITE_CALLBACK; + } - default: - throw new IllegalStateException(this.getStatusString()); + if (_async!=null) + { + _initial=false; + switch(_async) + { + case COMPLETE: + _state=State.COMPLETING; + _async=null; + return Action.COMPLETE; + case DISPATCH: + _state=State.DISPATCHED; + _async=null; + return Action.ASYNC_DISPATCH; + case EXPIRED: + _state=State.DISPATCHED; + _async=null; + return Action.ASYNC_EXPIRED; + case EXPIRING: + case STARTED: + scheduleTimeout(); + _state=State.ASYNCWAIT; + return Action.WAIT; + } } + + _state=State.COMPLETING; + return Action.COMPLETE; } } @@ -310,39 +348,26 @@ public class HttpChannelState boolean dispatch; synchronized (this) { + if (_async!=Async.STARTED && _async!=Async.EXPIRING) + throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString()); + _async=Async.DISPATCH; + _event.setDispatchTarget(context,path); + switch(_state) { - case ASYNCSTARTED: - _state=State.REDISPATCHING; - _event.setDispatchTarget(context,path); - _dispatched=true; - return; - - case ASYNCWAIT: - dispatch=!_expired; - _state=State.REDISPATCH; - _event.setDispatchTarget(context,path); - _dispatched=true; + case DISPATCHED: + case ASYNCIO: + dispatch=false; break; - default: - throw new IllegalStateException(this.getStatusString()); + dispatch=true; + break; } } + cancelTimeout(); if (dispatch) - { - cancelTimeout(); scheduleDispatch(); - } - } - - public boolean isDispatched() - { - synchronized (this) - { - return _dispatched; - } } protected void expired() @@ -351,17 +376,11 @@ public class HttpChannelState AsyncEvent event; synchronized (this) { - switch(_state) - { - case ASYNCSTARTED: - case ASYNCWAIT: - _expired=true; - event=_event; - aListeners=_asyncListeners; - break; - default: - return; - } + if (_async!=Async.STARTED) + return; + _async=Async.EXPIRING; + event=_event; + aListeners=_asyncListeners; } if (aListeners!=null) @@ -378,22 +397,20 @@ public class HttpChannelState } } } - + + boolean dispatch=false; synchronized (this) { - switch(_state) + if (_async==Async.EXPIRING) { - case ASYNCSTARTED: - case ASYNCWAIT: - _state=State.REDISPATCH; - break; - default: - _expired=false; - break; + _async=Async.EXPIRED; + if (_state==State.ASYNCWAIT) + dispatch=true; } } - scheduleDispatch(); + if (dispatch) + scheduleDispatch(); } public void complete() @@ -402,30 +419,15 @@ public class HttpChannelState boolean handle; synchronized (this) { - switch(_state) - { - case DISPATCHED: - case REDISPATCHED: - throw new IllegalStateException(this.getStatusString()); - - case IDLE: - case ASYNCSTARTED: - _state=State.COMPLETECALLED; - return; - - case ASYNCWAIT: - _state=State.COMPLETECALLED; - handle=!_expired; - break; - - default: - throw new IllegalStateException(this.getStatusString()); - } + if (_async!=Async.STARTED && _async!=Async.EXPIRING) + throw new IllegalStateException(this.getStatusString()); + _async=Async.COMPLETE; + handle=_state==State.ASYNCWAIT; } + cancelTimeout(); if (handle) { - cancelTimeout(); ContextHandler handler=getContextHandler(); if (handler!=null) handler.handle(_channel); @@ -453,30 +455,34 @@ public class HttpChannelState } } - if (aListeners!=null) + if (event!=null) { - for (AsyncListener listener : aListeners) + if (aListeners!=null) { - try + if (event.getThrowable()!=null) { - if (event!=null && event.getThrowable()!=null) - { - event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); - event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage()); - listener.onError(event); - } - else - listener.onComplete(event); + event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable()); + event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage()); } - catch(Exception e) + + for (AsyncListener listener : aListeners) { - LOG.warn(e); + try + { + if (event.getThrowable()!=null) + listener.onError(event); + else + listener.onComplete(event); + } + catch(Exception e) + { + LOG.warn(e); + } } } - } - if (event!=null) event.completed(); + } } protected void recycle() @@ -486,17 +492,19 @@ public class HttpChannelState switch(_state) { case DISPATCHED: - case REDISPATCHED: + case ASYNCIO: throw new IllegalStateException(getStatusString()); default: - _state=State.IDLE; + break; } - _initial = true; - _dispatched=false; - _expired=false; - _responseWrapped=false; - cancelTimeout(); + _asyncListeners=null; + _state=State.IDLE; + _async=null; + _initial=true; + _asyncRead=false; + _asyncWrite=false; _timeoutMs=DEFAULT_TIMEOUT; + cancelTimeout(); _event=null; } } @@ -515,7 +523,11 @@ public class HttpChannelState protected void cancelTimeout() { - AsyncContextEvent event=_event; + final AsyncContextEvent event; + synchronized (this) + { + event=_event; + } if (event!=null) event.cancelTimeoutTask(); } @@ -524,7 +536,7 @@ public class HttpChannelState { synchronized (this) { - return _expired; + return _async==Async.EXPIRED; } } @@ -540,17 +552,7 @@ public class HttpChannelState { synchronized(this) { - switch(_state) - { - case ASYNCSTARTED: - case REDISPATCHING: - case COMPLETECALLED: - case ASYNCWAIT: - return true; - - default: - return false; - } + return _state==State.ASYNCWAIT || _state==State.DISPATCHED && _async==Async.STARTED; } } @@ -573,18 +575,10 @@ public class HttpChannelState public boolean isAsyncStarted() { synchronized (this) - { - switch(_state) - { - case ASYNCSTARTED: // Suspend called, but not yet returned to container - case REDISPATCHING: // resumed while dispatched - case COMPLETECALLED: // complete called - case ASYNCWAIT: - return true; - - default: - return false; - } + { + if (_state==State.DISPATCHED) + return _async!=null; + return _async==Async.STARTED || _async==Async.EXPIRING; } } @@ -592,19 +586,7 @@ public class HttpChannelState { synchronized (this) { - switch(_state) - { - case ASYNCSTARTED: - case REDISPATCHING: - case ASYNCWAIT: - case REDISPATCHED: - case REDISPATCH: - case COMPLETECALLED: - return true; - - default: - return false; - } + return !_initial || _async!=null; } } @@ -620,7 +602,12 @@ public class HttpChannelState public ContextHandler getContextHandler() { - final AsyncContextEvent event=_event; + final AsyncContextEvent event; + synchronized (this) + { + event=_event; + } + if (event!=null) { Context context=((Context)event.getServletContext()); @@ -632,8 +619,13 @@ public class HttpChannelState public ServletResponse getServletResponse() { - if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) - return _event.getSuppliedResponse(); + final AsyncContextEvent event; + synchronized (this) + { + event=_event; + } + if (event!=null && event.getSuppliedResponse()!=null) + return event.getSuppliedResponse(); return _channel.getResponse(); } @@ -652,6 +644,34 @@ public class HttpChannelState _channel.getRequest().setAttribute(name,attribute); } + public void onReadPossible() + { + boolean handle; + + synchronized (this) + { + _asyncRead=true; + handle=_state==State.ASYNCWAIT; + } + + if (handle) + _channel.execute(_channel); + } + + public void onWritePossible() + { + boolean handle; + + synchronized (this) + { + _asyncWrite=true; + handle=_state==State.ASYNCWAIT; + } + + if (handle) + _channel.execute(_channel); + } + public class AsyncTimeout implements Runnable { @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index d56fdd3ddc..1518b75894 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -18,9 +18,7 @@ package org.eclipse.jetty.server; -import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; import java.util.concurrent.RejectedExecutionException; import org.eclipse.jetty.http.HttpGenerator; @@ -39,7 +37,7 @@ import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -63,7 +61,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private final HttpParser _parser; private volatile ByteBuffer _requestBuffer = null; private volatile ByteBuffer _chunk = null; - private BlockingCallback _readBlocker = new BlockingCallback(); private BlockingCallback _writeBlocker = new BlockingCallback(); @@ -92,9 +89,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _connector = connector; _bufferPool = _connector.getByteBufferPool(); _generator = new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy()); - _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input()); + _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new HttpInputOverHTTP(this)); _parser = newHttpParser(); - LOG.debug("New HTTP Connection {}", this); } @@ -123,40 +119,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http return _channel; } - public void reset() + public HttpParser getParser() { - // If we are still expecting - if (_channel.isExpecting100Continue()) - { - // reset to avoid seeking remaining content - _parser.reset(); - // close to seek EOF - _parser.close(); - if (getEndPoint().isOpen()) - fillInterested(); - } - // else if we are persistent - else if (_generator.isPersistent()) - // reset to seek next request - _parser.reset(); - else - { - // else seek EOF - _parser.close(); - if (getEndPoint().isOpen()) - fillInterested(); - } - - _generator.reset(); - _channel.reset(); - - releaseRequestBuffer(); - if (_chunk!=null) - { - _bufferPool.release(_chunk); - _chunk=null; - } + return _parser; } + @Override @@ -171,16 +138,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http return getHttpChannel().getRequests(); } - @Override - public String toString() - { - return String.format("%s,g=%s,p=%s", - super.toString(), - _generator, - _parser); - } - - private void releaseRequestBuffer() + void releaseRequestBuffer() { if (_requestBuffer != null && !_requestBuffer.hasRemaining()) { @@ -189,6 +147,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _bufferPool.release(buffer); } } + + public ByteBuffer getRequestBuffer() + { + if (_requestBuffer == null) + _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT); + return _requestBuffer; + } /** * <p>Parses and handles HTTP messages.</p> @@ -204,77 +169,53 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http LOG.debug("{} onFillable {}", this, _channel.getState()); setCurrentConnection(this); + int filled=Integer.MAX_VALUE; + boolean suspended=false; try { - while (true) + // while not suspended and not upgraded + while (!suspended && getEndPoint().getConnection()==this) { - // Can the parser progress (even with an empty buffer) - boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer); - - // Parse the buffer - if (call_channel) + // Do we need some data to parse + if (BufferUtil.isEmpty(_requestBuffer)) { - // Parse as much content as there is available before calling the channel - // this is both efficient (may queue many chunks), will correctly set available for 100 continues - // and will drive the parser to completion if all content is available. - while (_parser.inContentState()) + // If the previous iteration filled 0 bytes or saw a close, then break here + if (filled<=0) + break; + + // Can we fill? + if(getEndPoint().isInputShutdown()) { - if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer)) - break; + // No pretend we read -1 + filled=-1; + _parser.atEOF(); } + else + { + // Get a buffer + if (_requestBuffer == null) + _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT); - // The parser returned true, which indicates the channel is ready to handle a request. - // Call the channel and this will either handle the request/response to completion OR, - // if the request suspends, the request/response will be incomplete so the outer loop will exit. - boolean handle=_channel.handle(); - - // Return if suspended or upgraded - if (!handle || getEndPoint().getConnection()!=this) - return; - } - else if (BufferUtil.isEmpty(_requestBuffer)) - { - if (_requestBuffer == null) - _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT); - - int filled = getEndPoint().fill(_requestBuffer); - if (filled==0) // Do a retry on fill 0 (optimisation for SSL connections) + // fill filled = getEndPoint().fill(_requestBuffer); - - LOG.debug("{} filled {}", this, filled); - - // If we failed to fill - if (filled == 0) - { - // Somebody wanted to read, we didn't so schedule another attempt - releaseRequestBuffer(); - fillInterested(); - return; - } - else if (filled < 0) - { - _parser.shutdownInput(); - // We were only filling if fully consumed, so if we have - // read -1 then we have nothing to parse and thus nothing that - // will generate a response. If we had a suspended request pending - // a response or a request waiting in the buffer, we would not be here. - if (getEndPoint().isOutputShutdown()) - getEndPoint().close(); - else - getEndPoint().shutdownOutput(); - // buffer must be empty and the channel must be idle, so we can release. - releaseRequestBuffer(); - return; + if (filled==0) // Do a retry on fill 0 (optimization for SSL connections) + filled = getEndPoint().fill(_requestBuffer); + + // tell parser + if (filled < 0) + _parser.atEOF(); } } - else + + // Parse the buffer + if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer)) { - // TODO work out how we can get here and a better way to handle it - LOG.warn("Unexpected state: "+this+ " "+_channel+" "+_channel.getRequest()); - if (!_channel.getState().isSuspended()) - getEndPoint().close(); - return; + // The parser returned true, which indicates the channel is ready to handle a request. + // Call the channel and this will either handle the request/response to completion OR, + // if the request suspends, the request/response will be incomplete so the outer loop will exit. + suspended = !_channel.handle(); } + } } catch (EofException e) @@ -290,10 +231,22 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http close(); } finally - { + { setCurrentConnection(null); + if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this) + { + fillInterested(); + } } } + + + @Override + protected void onFillInterestedFailed(Throwable cause) + { + _parser.close(); + super.onFillInterestedFailed(cause); + } @Override public void onOpen() @@ -308,32 +261,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http onFillable(); } - @Override - public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException - { - try - { - if (info==null) - new ContentCallback(content,lastContent,_writeBlocker).iterate(); - else - { - // If we are still expecting a 100 continues - if (_channel.isExpecting100Continue()) - // then we can't be persistent - _generator.setPersistent(false); - new CommitCallback(info,content,lastContent,_writeBlocker).iterate(); - } - _writeBlocker.block(); - } - catch (ClosedChannelException e) - { - throw new EofException(e); - } - catch (IOException e) - { - throw e; - } - } @Override public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) @@ -359,11 +286,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public void completed() { - // Finish consuming the request - if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue()) - // Complete reading the request - _channel.getRequest().getHttpInput().consumeAll(); - // Handle connection upgrades if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101) { @@ -374,34 +296,58 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http onClose(); getEndPoint().setConnection(connection); connection.onOpen(); - reset(); + _channel.reset(); + _parser.reset(); + _generator.reset(); + releaseRequestBuffer(); return; } } + + // Finish consuming the request + // If we are still expecting + if (_channel.isExpecting100Continue()) + // close to seek EOF + _parser.close(); + else if (_parser.inContentState() && _generator.isPersistent()) + // Complete reading the request + _channel.getRequest().getHttpInput().consumeAll(); - reset(); + // Reset the channel, parsers and generator + _channel.reset(); + if (_generator.isPersistent() && !_parser.isClosed()) + _parser.reset(); + else + _parser.close(); + releaseRequestBuffer(); + if (_chunk!=null) + _bufferPool.release(_chunk); + _chunk=null; + _generator.reset(); // if we are not called from the onfillable thread, schedule completion if (getCurrentConnection()!=this) { + // If we are looking for the next request if (_parser.isStart()) { - // it wants to eat more + // if the buffer is empty if (_requestBuffer == null) { + // look for more data fillInterested(); } - else if (getConnector().isStarted()) + // else if we are still running + else if (getConnector().isRunning()) { - LOG.debug("{} pipelined", this); - + // Dispatched to handle a pipelined request try { getExecutor().execute(this); } catch (RejectedExecutionException e) { - if (getConnector().isStarted()) + if (getConnector().isRunning()) LOG.warn(e); else LOG.ignore(e); @@ -413,139 +359,34 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http getEndPoint().close(); } } + // else the parser must be closed, so seek the EOF if we are still open + else if (getEndPoint().isOpen()) + fillInterested(); } } - public ByteBuffer getRequestBuffer() - { - return _requestBuffer; - } - - private class Input extends ByteBufferHttpInput + private class HttpChannelOverHttp extends HttpChannel<ByteBuffer> { - @Override - protected void blockForContent() throws IOException - { - /* We extend the blockForContent method to replace the - default implementation of a blocking queue with an implementation - that uses the calling thread to block on a readable callback and - then to do the parsing before before attempting the read. - */ - while (!_parser.isComplete()) - { - // Can the parser progress (even with an empty buffer) - boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer); - - // If there is more content to parse, loop so we can queue all content from this buffer now without the - // need to call blockForContent again - while (!event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState()) - event=_parser.parseNext(_requestBuffer); - - // If we have content, return - if (_parser.isComplete() || available()>0) - return; - - // Do we have content ready to parse? - if (BufferUtil.isEmpty(_requestBuffer)) - { - // If no more input - if (getEndPoint().isInputShutdown()) - { - _parser.shutdownInput(); - shutdown(); - return; - } - - // Wait until we can read - block(_readBlocker); - LOG.debug("{} block readable on {}",this,_readBlocker); - _readBlocker.block(); - - // We will need a buffer to read into - if (_requestBuffer==null) - { - long content_length=_channel.getRequest().getContentLength(); - int size=getInputBufferSize(); - if (size<content_length) - size=size*4; // TODO tune this - _requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT); - } - - // read some data - int filled=getEndPoint().fill(_requestBuffer); - LOG.debug("{} block filled {}",this,filled); - if (filled<0) - { - _parser.shutdownInput(); - return; - } - } - } - } - - @Override - protected void onContentQueued(ByteBuffer ref) - { - /* This callback could be used to tell the connection - * that the request did contain content and thus the request - * buffer needs to be held until a call to #onAllContentConsumed - * - * However it turns out that nothing is needed here because either a - * request will have content, in which case the request buffer will be - * released by a call to onAllContentConsumed; or it will not have content. - * If it does not have content, either it will complete quickly and the - * buffers will be released in completed() or it will be suspended and - * onReadable() contains explicit handling to release if it is suspended. - * - * We extend this method anyway, to turn off the notify done by the - * default implementation as this is not needed by our implementation - * of blockForContent - */ - } - - @Override - public void earlyEOF() - { - synchronized (lock()) - { - _inputEOF=true; - _earlyEOF = true; - LOG.debug("{} early EOF", this); - } - } - - @Override - public void shutdown() + public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input) { - synchronized (lock()) - { - _inputEOF=true; - LOG.debug("{} shutdown", this); - } + super(connector,config,endPoint,transport,input); } @Override - protected void onAllContentConsumed() + public void earlyEOF() { - /* This callback tells the connection that all content that has - * been parsed has been consumed. Thus the request buffer may be - * released if it is empty. - */ - releaseRequestBuffer(); + // If we have no request yet, just close + if (getRequest().getMethod()==null) + close(); + else + super.earlyEOF(); } @Override - public String toString() + public boolean content(ByteBuffer item) { - return super.toString()+"{"+_channel+","+HttpConnection.this+"}"; - } - } - - private class HttpChannelOverHttp extends HttpChannel<ByteBuffer> - { - public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input) - { - super(connector,config,endPoint,transport,input); + super.content(item); + return true; } @Override @@ -612,7 +453,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } } - private class CommitCallback extends IteratingCallback + private class CommitCallback extends IteratingNestedCallback { final ByteBuffer _content; final boolean _lastContent; @@ -733,7 +574,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } } - private class ContentCallback extends IteratingCallback + private class ContentCallback extends IteratingNestedCallback { final ByteBuffer _content; final boolean _lastContent; @@ -814,4 +655,5 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index bbed8bd2fa..a9dba922ad 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -19,13 +19,11 @@ package org.eclipse.jetty.server; import java.io.IOException; -import java.io.InterruptedIOException; - +import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; -import org.eclipse.jetty.util.ArrayQueue; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -39,35 +37,82 @@ import org.eclipse.jetty.util.log.Logger; * {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the * caller will know when buffers are queued and consumed.</p> */ -public abstract class HttpInput<T> extends ServletInputStream +/** + * @author gregw + * + * @param <T> + */ +/** + * @author gregw + * + * @param <T> + */ +public abstract class HttpInput<T> extends ServletInputStream implements Runnable { private final static Logger LOG = Log.getLogger(HttpInput.class); - private final ArrayQueue<T> _inputQ = new ArrayQueue<>(); - protected boolean _earlyEOF; - protected boolean _inputEOF; - public Object lock() + private HttpChannelState _channelState; + private Throwable _onError; + private ReadListener _listener; + private boolean _notReady; + + protected State _state = BLOCKING; + private State _eof=null; + private final Object _lock; + + protected HttpInput() { - return _inputQ.lock(); + this(null); + } + + protected HttpInput(Object lock) + { + _lock=lock==null?this:lock; + } + + public final Object lock() + { + return _lock; } public void recycle() { synchronized (lock()) { - T item = _inputQ.peekUnsafe(); - while (item != null) - { - _inputQ.pollUnsafe(); - onContentConsumed(item); + _state = BLOCKING; + _eof=null; + _onError=null; + } + } - item = _inputQ.peekUnsafe(); - if (item == null) - onAllContentConsumed(); - } - _inputEOF = false; - _earlyEOF = false; + /** + * Access the next content to be consumed from. Returning the next item does not consume it + * and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)} + * or {@link #consume(Object, int)} are required to consume data from the content. + * @return Content or null if none available. + * @throws IOException + */ + protected abstract T nextContent() throws IOException; + + /** + * A convenience method to call nextContent and to check the return value, which if null then the + * a check is made for EOF and the state changed accordingly. + * @see #nextContent() + * @return Content or null if none available. + * @throws IOException + */ + protected T getNextContent() throws IOException + { + T content=nextContent(); + + if (content==null && _eof!=null) + { + LOG.debug("{} eof {}",this,_eof); + _state=_eof; + _eof=null; } + + return content; } @Override @@ -81,12 +126,17 @@ public abstract class HttpInput<T> extends ServletInputStream @Override public int available() { - synchronized (lock()) + try { - T item = _inputQ.peekUnsafe(); - if (item == null) - return 0; - return remaining(item); + synchronized (lock()) + { + T item = getNextContent(); + return item==null?0:remaining(item); + } + } + catch (IOException e) + { + throw new RuntimeIOException(e); } } @@ -96,200 +146,299 @@ public abstract class HttpInput<T> extends ServletInputStream T item = null; synchronized (lock()) { + // System.err.printf("read s=%s q=%d e=%s%n",_state,_inputQ.size(),_eof); + // Get the current head of the input Q - item = _inputQ.peekUnsafe(); - - // Skip empty items at the head of the queue - while (item != null && remaining(item) == 0) - { - _inputQ.pollUnsafe(); - onContentConsumed(item); - LOG.debug("{} consumed {}", this, item); - item = _inputQ.peekUnsafe(); - - // If that was the last item then notify - if (item==null) - onAllContentConsumed(); - } + item = getNextContent(); // If we have no item if (item == null) { - // Was it unexpectedly EOF'd? - if (isEarlyEOF()) - throw new EofException(); - - // check for EOF - if (isShutdown()) - { - onEOF(); - return -1; - } - - // OK then block for some more content - blockForContent(); - - // If still not content, we must be closed - item = _inputQ.peekUnsafe(); + _state.waitForContent(this); + item=getNextContent(); if (item==null) - { - if (isEarlyEOF()) - throw new EofException(); - - // blockForContent will only return with no - // content if it is closed. - if (!isShutdown()) - LOG.warn("Unexpected !EOF: "+this); - - onEOF(); - return -1; - } + return _state.noContent(); } } + return get(item, b, off, len); } protected abstract int remaining(T item); protected abstract int get(T item, byte[] buffer, int offset, int length); - protected abstract void onContentConsumed(T item); + protected abstract void consume(T item, int length); + + protected abstract void blockForContent() throws IOException; - protected void blockForContent() throws IOException + protected boolean onAsyncRead() { - synchronized (lock()) - { - while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF()) - { - try - { - LOG.debug("{} waiting for content", this); - lock().wait(); - } - catch (InterruptedException e) - { - throw (IOException)new InterruptedIOException().initCause(e); - } - } - } + if (_listener==null) + return false; + _channelState.onReadPossible(); + return true; } /* ------------------------------------------------------------ */ - /** Called by this HttpInput to signal new content has been queued + /** Add some content to the input stream * @param item */ - protected void onContentQueued(T item) - { - lock().notify(); - } + public abstract void content(T item); + /* ------------------------------------------------------------ */ - /** Called by this HttpInput to signal all available content has been consumed + /** This method should be called to signal to the HttpInput + * that an EOF has arrived before all the expected content. + * Typically this will result in an EOFException being thrown + * from a subsequent read rather than a -1 return. */ - protected void onAllContentConsumed() + public void earlyEOF() { + synchronized (lock()) + { + if (_eof==null || !_eof.isEOF()) + { + LOG.debug("{} early EOF", this); + _eof=EARLY_EOF; + if (_listener!=null) + _channelState.onReadPossible(); + } + } } /* ------------------------------------------------------------ */ - /** Called by this HttpInput to signal it has reached EOF - */ - protected void onEOF() + public void messageComplete() { + synchronized (lock()) + { + if (_eof==null || !_eof.isEOF()) + { + LOG.debug("{} EOF", this); + _eof=EOF; + if (_listener!=null) + _channelState.onReadPossible(); + } + } } /* ------------------------------------------------------------ */ - /** Add some content to the input stream - * @param item - */ - public void content(T item) + public void consumeAll() { synchronized (lock()) { - // The buffer is not copied here. This relies on the caller not recycling the buffer - // until the it is consumed. The onAllContentConsumed() callback is the signal to the - // caller that the buffers can be recycled. - _inputQ.add(item); - onContentQueued(item); - LOG.debug("{} queued {}", this, item); + try + { + while (!isFinished()) + { + T item = getNextContent(); + if (item==null) + _state.waitForContent(this); + else + consume(item,remaining(item)); + } + } + catch (IOException e) + { + throw new RuntimeIOException(e); + } } } - /* ------------------------------------------------------------ */ - /** This method should be called to signal to the HttpInput - * that an EOF has arrived before all the expected content. - * Typically this will result in an EOFException being thrown - * from a subsequent read rather than a -1 return. - */ - public void earlyEOF() + @Override + public boolean isFinished() { synchronized (lock()) { - _earlyEOF = true; - _inputEOF = true; - lock().notify(); - LOG.debug("{} early EOF", this); + return _state.isEOF(); } } - /* ------------------------------------------------------------ */ - public boolean isEarlyEOF() + @Override + public boolean isReady() { synchronized (lock()) { - return _earlyEOF; + if (_listener==null) + return true; + int available = available(); + if (available>0) + return true; + if (!_notReady) + { + _notReady=true; + if (_state.isEOF()) + _channelState.onReadPossible(); + else + unready(); + } + return false; } } - /* ------------------------------------------------------------ */ - public void shutdown() + protected void unready() { + } + + @Override + public void setReadListener(ReadListener readListener) + { + if (readListener==null) + throw new NullPointerException("readListener==null"); synchronized (lock()) { - _inputEOF = true; - lock().notify(); - LOG.debug("{} shutdown", this); + if (_state!=BLOCKING) + throw new IllegalStateException("state="+_state); + _state=ASYNC; + _listener=readListener; + _notReady=true; + + _channelState.onReadPossible(); } } - /* ------------------------------------------------------------ */ - public boolean isShutdown() + public void failed(Throwable x) { synchronized (lock()) { - return _inputEOF; + if (_onError==null) + LOG.warn(x); + else + _onError=x; } } - /* ------------------------------------------------------------ */ - public void consumeAll() + @Override + public void run() { + final boolean available; + final boolean eof; + final Throwable x; + synchronized (lock()) { - T item = _inputQ.peekUnsafe(); - loop: while (!isShutdown() && !isEarlyEOF()) + if (!_notReady || _listener==null) + return; + + x=_onError; + T item; + try { - while (item != null) - { - _inputQ.pollUnsafe(); - onContentConsumed(item); + item = getNextContent(); + } + catch(Exception e) + { + item=null; + failed(e); + } + available= item!=null && remaining(item)>0; - item = _inputQ.peekUnsafe(); - if (item == null) - onAllContentConsumed(); - } + eof = !available && _state.isEOF(); + _notReady=!available&&!eof; + } - try - { - blockForContent(); - item = _inputQ.peekUnsafe(); - if (item==null) - break; - } - catch (IOException e) - { - LOG.warn(e); - break loop; - } - } + try + { + if (x!=null) + _listener.onError(x); + else if (available) + _listener.onDataAvailable(); + else if (eof) + _listener.onAllDataRead(); + else + unready(); + } + catch(Throwable e) + { + LOG.warn(e.toString()); + LOG.debug(e); + _listener.onError(e); + } + } + + + protected static class State + { + public void waitForContent(HttpInput<?> in) throws IOException + { + } + + public int noContent() throws IOException + { + return -1; + } + + public boolean isEOF() + { + return false; + } + } + + protected static final State BLOCKING= new State() + { + @Override + public void waitForContent(HttpInput<?> in) throws IOException + { + in.blockForContent(); + } + public String toString() + { + return "OPEN"; + } + }; + + protected static final State ASYNC= new State() + { + @Override + public int noContent() throws IOException + { + return 0; + } + @Override + public String toString() + { + return "ASYNC"; + } + }; + + protected static final State EARLY_EOF= new State() + { + @Override + public int noContent() throws IOException + { + throw new EofException(); + } + @Override + public boolean isEOF() + { + return true; + } + public String toString() + { + return "EARLY_EOF"; + } + }; + + protected static final State EOF= new State() + { + @Override + public boolean isEOF() + { + return true; + } + + public String toString() + { + return "EOF"; + } + }; + + + public void init(HttpChannelState state) + { + synchronized (lock()) + { + _channelState=state; } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java new file mode 100644 index 0000000000..53f2c1287c --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java @@ -0,0 +1,168 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.BlockingCallback; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback +{ + private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class); + private final BlockingCallback _readBlocker = new BlockingCallback(); + private final HttpConnection _httpConnection; + private ByteBuffer _content; + + /** + * @param httpConnection + */ + HttpInputOverHTTP(HttpConnection httpConnection) + { + _httpConnection = httpConnection; + } + + @Override + public void recycle() + { + synchronized (lock()) + { + super.recycle(); + _content=null; + } + } + + @Override + protected void blockForContent() throws IOException + { + while(true) + { + _httpConnection.fillInterested(_readBlocker); + LOG.debug("{} block readable on {}",this,_readBlocker); + _readBlocker.block(); + + Object content=getNextContent(); + if (content!=null || isFinished()) + break; + } + } + + @Override + public String toString() + { + return String.format("%s@%x",getClass().getSimpleName(),hashCode()); + } + + @Override + protected ByteBuffer nextContent() throws IOException + { + // If we have some content available, return it + if (BufferUtil.hasContent(_content)) + return _content; + + // No - then we are going to need to parse some more content + _content=null; + ByteBuffer requestBuffer = _httpConnection.getRequestBuffer(); + + while (!_httpConnection.getParser().isComplete()) + { + // Can the parser progress (even with an empty buffer) + _httpConnection.getParser().parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer); + + // If we got some content, that will do for now! + if (BufferUtil.hasContent(_content)) + return _content; + + // No, we can we try reading some content? + if (BufferUtil.isEmpty(requestBuffer) && _httpConnection.getEndPoint().isInputShutdown()) + { + _httpConnection.getParser().atEOF(); + continue; + } + + // OK lets read some data + int filled=_httpConnection.getEndPoint().fill(requestBuffer); + if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled' + LOG.debug("{} filled {}",this,filled); + if (filled<=0) + { + if (filled<0) + { + _httpConnection.getParser().atEOF(); + continue; + } + return null; + } + } + + return null; + + } + + @Override + protected int remaining(ByteBuffer item) + { + return item.remaining(); + } + + @Override + protected int get(ByteBuffer item, byte[] buffer, int offset, int length) + { + int l = Math.min(item.remaining(), length); + item.get(buffer, offset, l); + return l; + } + + @Override + protected void consume(ByteBuffer item, int length) + { + item.position(item.position()+length); + } + + @Override + public void content(ByteBuffer item) + { + if (BufferUtil.hasContent(_content)) + throw new IllegalStateException(); + _content=item; + } + + @Override + protected void unready() + { + _httpConnection.fillInterested(this); + } + + @Override + public void succeeded() + { + _httpConnection.getHttpChannel().getState().onReadPossible(); + } + + @Override + public void failed(Throwable x) + { + super.failed(x); + _httpConnection.getHttpChannel().getState().onReadPossible(); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index bbea718ffa..dd0fd8228a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -18,26 +18,27 @@ package org.eclipse.jetty.server; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritePendingException; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import javax.servlet.WriteListener; import org.eclipse.jetty.http.HttpContent; -import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BlockingCallback; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.resource.Resource; /** * <p>{@link HttpOutput} implements {@link ServletOutputStream} @@ -49,19 +50,34 @@ import org.eclipse.jetty.util.resource.Resource; * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to * close the stream, to be reopened after the inclusion ends.</p> */ -public class HttpOutput extends ServletOutputStream +public class HttpOutput extends ServletOutputStream implements Runnable { private static Logger LOG = Log.getLogger(HttpOutput.class); private final HttpChannel<?> _channel; - private boolean _closed; private long _written; private ByteBuffer _aggregate; private int _bufferSize; + private int _commitSize; + private WriteListener _writeListener; + private volatile Throwable _onError; + + /* + ACTION OPEN ASYNC READY PENDING UNREADY + ------------------------------------------------------------------------------- + setWriteListener() READY->owp ise ise ise ise + write() OPEN ise PENDING wpe wpe + flush() OPEN ise PENDING wpe wpe + isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false + write completed - - - ASYNC READY->owp + */ + enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED } + private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN); public HttpOutput(HttpChannel<?> channel) { _channel = channel; _bufferSize = _channel.getHttpConfiguration().getOutputBufferSize(); + _commitSize=_bufferSize/4; } public boolean isWritten() @@ -82,49 +98,63 @@ public class HttpOutput extends ServletOutputStream public void reopen() { - _closed = false; + _state.set(State.OPEN); } - /** Called by the HttpChannel if the output was closed - * externally (eg by a 500 exception handling). - */ - void closed() + public boolean isAllContentWritten() { - if (!_closed) - { - _closed = true; - try - { - _channel.getResponse().closeOutput(); - } - catch(IOException e) - { - _channel.failed(); - LOG.ignore(e); - } - releaseBuffer(); - } + return _channel.getResponse().isAllContentWritten(_written); } @Override public void close() { - if (!isClosed()) + State state=_state.get(); + while(state!=State.CLOSED) { - try + if (_state.compareAndSet(state,State.CLOSED)) { - if (BufferUtil.hasContent(_aggregate)) - _channel.write(_aggregate, !_channel.getResponse().isIncluding()); - else - _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); + try + { + if (BufferUtil.hasContent(_aggregate)) + _channel.write(_aggregate, !_channel.getResponse().isIncluding()); + else + _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); + } + catch(IOException e) + { + LOG.debug(e); + _channel.failed(); + } + releaseBuffer(); + return; } - catch(IOException e) + state=_state.get(); + } + } + + /* Called to indicated that the output is already closed and the state needs to be updated to match */ + void closed() + { + State state=_state.get(); + while(state!=State.CLOSED) + { + if (_state.compareAndSet(state,State.CLOSED)) { - _channel.failed(); - LOG.ignore(e); + try + { + _channel.getResponse().closeOutput(); + } + catch(IOException e) + { + LOG.debug(e); + _channel.failed(); + } + releaseBuffer(); + return; } + state=_state.get(); } - closed(); } private void releaseBuffer() @@ -138,44 +168,109 @@ public class HttpOutput extends ServletOutputStream public boolean isClosed() { - return _closed; + return _state.get()==State.CLOSED; } @Override public void flush() throws IOException { - if (isClosed()) - return; - - if (BufferUtil.hasContent(_aggregate)) - _channel.write(_aggregate, false); - else - _channel.write(BufferUtil.EMPTY_BUFFER, false); + while(true) + { + switch(_state.get()) + { + case OPEN: + if (BufferUtil.hasContent(_aggregate)) + _channel.write(_aggregate, false); + else + _channel.write(BufferUtil.EMPTY_BUFFER, false); + return; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; + new AsyncFlush().process(); + return; + + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + return; + } + break; + } } - public boolean isAllContentWritten() - { - Response response=_channel.getResponse(); - return response.isAllContentWritten(_written); - } - - public void closeOutput() throws IOException - { - _channel.getResponse().closeOutput(); - } @Override public void write(byte[] b, int off, int len) throws IOException - { - if (isClosed()) - throw new EofException("Closed"); - + { _written+=len; boolean complete=_channel.getResponse().isAllContentWritten(_written); - int capacity = getBufferSize(); + + // Async or Blocking ? + while(true) + { + switch(_state.get()) + { + case OPEN: + // process blocking below + break; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; + + // Should we aggregate? + int capacity = getBufferSize(); + if (!complete && len<=_commitSize) + { + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(capacity, false); + + // YES - fill the aggregate with content from the buffer + int filled = BufferUtil.fill(_aggregate, b, off, len); + + // return if we are not complete, not full and filled all the content + if (filled==len && !BufferUtil.isFull(_aggregate)) + { + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + throw new IllegalStateException(); + return; + } + + // adjust offset/length + off+=filled; + len-=filled; + } + + // Do the asynchronous writing from the callback + new AsyncWrite(b,off,len,complete).process(); + return; + + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + throw new EofException("Closed"); + } + break; + } + + + // handle blocking write // Should we aggregate? - if (!complete && len<=capacity/4) + int capacity = getBufferSize(); + if (!complete && len<=_commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(capacity, false); @@ -184,12 +279,12 @@ public class HttpOutput extends ServletOutputStream int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not complete, not full and filled all the content - if (!complete && filled == len && !BufferUtil.isFull(_aggregate)) + if (filled==len && !BufferUtil.isFull(_aggregate)) return; // adjust offset/length - off += filled; - len -= filled; + off+=filled; + len-=filled; } // flush any content from the aggregate @@ -198,7 +293,7 @@ public class HttpOutput extends ServletOutputStream _channel.write(_aggregate, complete && len==0); // should we fill aggregate again from the buffer? - if (len>0 && !complete && len<=_aggregate.capacity()/4) + if (len>0 && !complete && len<=_commitSize) { BufferUtil.append(_aggregate, b, off, len); return; @@ -212,37 +307,128 @@ public class HttpOutput extends ServletOutputStream _channel.write(BufferUtil.EMPTY_BUFFER,complete); if (complete) + { closed(); - } + } + } - @Override - public void write(int b) throws IOException + public void write(ByteBuffer buffer) throws IOException { - if (isClosed()) - throw new EOFException("Closed"); + _written+=buffer.remaining(); + boolean complete=_channel.getResponse().isAllContentWritten(_written); - // Allocate an aggregate buffer. - // Never direct as it is slow to do little writes to a direct buffer. - if (_aggregate == null) - _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); + // Async or Blocking ? + while(true) + { + switch(_state.get()) + { + case OPEN: + // process blocking below + break; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; + + // Do the asynchronous writing from the callback + new AsyncWrite(buffer,complete).process(); + return; + + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + throw new EofException("Closed"); + } + break; + } + + + // handle blocking write + int len=BufferUtil.length(buffer); + + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + _channel.write(_aggregate, complete && len==0); - BufferUtil.append(_aggregate, (byte)b); - _written++; + // write any remaining content in the buffer directly + if (len>0) + _channel.write(buffer, complete); + else if (complete) + _channel.write(BufferUtil.EMPTY_BUFFER,complete); + + if (complete) + closed(); + } + @Override + public void write(int b) throws IOException + { + _written+=1; boolean complete=_channel.getResponse().isAllContentWritten(_written); - - // Check if all written or full - if (complete || BufferUtil.isFull(_aggregate)) - { - BlockingCallback callback = _channel.getWriteBlockingCallback(); - _channel.write(_aggregate, false, callback); - callback.block(); - if (complete) - closed(); + + // Async or Blocking ? + while(true) + { + switch(_state.get()) + { + case OPEN: + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); + BufferUtil.append(_aggregate, (byte)b); + + // Check if all written or full + if (complete || BufferUtil.isFull(_aggregate)) + { + BlockingCallback callback = _channel.getWriteBlockingCallback(); + _channel.write(_aggregate, complete, callback); + callback.block(); + if (complete) + closed(); + } + break; + + case ASYNC: + throw new IllegalStateException("isReady() not called"); + + case READY: + if (!_state.compareAndSet(State.READY, State.PENDING)) + continue; + + if (_aggregate == null) + _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false); + BufferUtil.append(_aggregate, (byte)b); + + // Check if all written or full + if (!complete && !BufferUtil.isFull(_aggregate)) + { + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + throw new IllegalStateException(); + return; + } + + // Do the asynchronous writing from the callback + new AsyncFlush().process(); + return; + + case PENDING: + case UNREADY: + throw new WritePendingException(); + + case CLOSED: + throw new EofException("Closed"); + } + break; } } + + @Override public void print(String s) throws IOException { @@ -253,51 +439,6 @@ public class HttpOutput extends ServletOutputStream } /* ------------------------------------------------------------ */ - /** Set headers and send content. - * @deprecated Use {@link Response#setHeaders(HttpContent)} and {@link #sendContent(HttpContent)} instead. - * @param content - * @throws IOException - */ - @Deprecated - public void sendContent(Object content) throws IOException - { - final BlockingCallback callback =_channel.getWriteBlockingCallback(); - - if (content instanceof HttpContent) - { - _channel.getResponse().setHeaders((HttpContent)content); - sendContent((HttpContent)content,callback); - } - else if (content instanceof Resource) - { - Resource resource = (Resource)content; - _channel.getResponse().getHttpFields().putDateField(HttpHeader.LAST_MODIFIED, resource.lastModified()); - - ReadableByteChannel in=((Resource)content).getReadableByteChannel(); - if (in!=null) - sendContent(in,callback); - else - sendContent(resource.getInputStream(),callback); - } - else if (content instanceof ByteBuffer) - { - sendContent((ByteBuffer)content,callback); - } - else if (content instanceof ReadableByteChannel) - { - sendContent((ReadableByteChannel)content,callback); - } - else if (content instanceof InputStream) - { - sendContent((InputStream)content,callback); - } - else - callback.failed(new IllegalArgumentException("unknown content type "+content.getClass())); - - callback.block(); - } - - /* ------------------------------------------------------------ */ /** Blocking send of content. * @param content The content to send * @throws IOException @@ -332,7 +473,7 @@ public class HttpOutput extends ServletOutputStream new ReadableByteChannelWritingCB(in,callback).iterate(); callback.block(); } - + /* ------------------------------------------------------------ */ /** Blocking send of content. @@ -345,7 +486,7 @@ public class HttpOutput extends ServletOutputStream sendContent(content,callback); callback.block(); } - + /* ------------------------------------------------------------ */ /** Asynchronous send of content. @@ -367,13 +508,13 @@ public class HttpOutput extends ServletOutputStream public void failed(Throwable x) { callback.failed(x); - } + } }); } /* ------------------------------------------------------------ */ /** Asynchronous send of content. - * @param in The content to send as a stream. The stream will be closed + * @param in The content to send as a stream. The stream will be closed * after reading all content. * @param callback The callback to use to notify success or failure */ @@ -384,7 +525,7 @@ public class HttpOutput extends ServletOutputStream /* ------------------------------------------------------------ */ /** Asynchronous send of content. - * @param in The content to send as a channel. The channel will be closed + * @param in The content to send as a channel. The channel will be closed * after reading all content. * @param callback The callback to use to notify success or failure */ @@ -400,23 +541,36 @@ public class HttpOutput extends ServletOutputStream */ public void sendContent(HttpContent httpContent, Callback callback) throws IOException { - if (isClosed()) - throw new IOException("Closed"); if (BufferUtil.hasContent(_aggregate)) throw new IOException("written"); if (_channel.isCommitted()) throw new IOException("committed"); - + + while (true) + { + switch(_state.get()) + { + case OPEN: + if (!_state.compareAndSet(State.OPEN, State.PENDING)) + continue; + break; + case CLOSED: + throw new EofException("Closed"); + default: + throw new IllegalStateException(); + } + break; + } ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null; if (buffer == null) buffer = httpContent.getIndirectBuffer(); - + if (buffer!=null) { sendContent(buffer,callback); return; } - + ReadableByteChannel rbc=httpContent.getReadableByteChannel(); if (rbc!=null) { @@ -424,7 +578,7 @@ public class HttpOutput extends ServletOutputStream sendContent(rbc,callback); return; } - + InputStream in = httpContent.getInputStream(); if ( in!=null ) { @@ -442,7 +596,8 @@ public class HttpOutput extends ServletOutputStream public void setBufferSize(int size) { - this._bufferSize = size; + _bufferSize = size; + _commitSize = size; } public void resetBuffer() @@ -450,24 +605,223 @@ public class HttpOutput extends ServletOutputStream if (BufferUtil.hasContent(_aggregate)) BufferUtil.clear(_aggregate); } - - + + @Override + public void setWriteListener(WriteListener writeListener) + { + if (!_channel.getState().isAsync()) + throw new IllegalStateException("!ASYNC"); + + if (_state.compareAndSet(State.OPEN, State.READY)) + { + _writeListener = writeListener; + _channel.getState().onWritePossible(); + } + else + throw new IllegalStateException(); + } + + /** + * @see javax.servlet.ServletOutputStream#isReady() + */ + @Override + public boolean isReady() + { + while (true) + { + switch(_state.get()) + { + case OPEN: + return true; + case ASYNC: + if (!_state.compareAndSet(State.ASYNC, State.READY)) + continue; + return true; + case READY: + return true; + case PENDING: + if (!_state.compareAndSet(State.PENDING, State.UNREADY)) + continue; + return false; + case UNREADY: + return false; + case CLOSED: + return false; + } + } + } + + @Override + public void run() + { + if(_onError!=null) + { + Throwable th=_onError; + _onError=null; + _writeListener.onError(th); + close(); + } + if (_state.get()==State.READY) + { + try + { + _writeListener.onWritePossible(); + } + catch (Throwable e) + { + _writeListener.onError(e); + close(); + } + } + } + + private class AsyncWrite extends AsyncFlush + { + private final ByteBuffer _buffer; + private final boolean _complete; + private final int _len; + + public AsyncWrite(byte[] b, int off, int len, boolean complete) + { + _buffer=ByteBuffer.wrap(b, off, len); + _complete=complete; + _len=len; + } + + public AsyncWrite(ByteBuffer buffer, boolean complete) + { + _buffer=buffer; + _complete=complete; + _len=buffer.remaining(); + } + + @Override + protected boolean process() + { + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + { + _channel.write(_aggregate, _complete && _len==0, this); + return false; + } + + if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize) + { + BufferUtil.put(_buffer,_aggregate); + } + else if (_len>0 && !_flushed) + { + _flushed=true; + _channel.write(_buffer, _complete, this); + return false; + } + else if (_len==0 && !_flushed) + { + _flushed=true; + _channel.write(BufferUtil.EMPTY_BUFFER, _complete, this); + return false; + } + + if (_complete) + closed(); + return true; + } + } + + private class AsyncFlush extends IteratingCallback + { + protected boolean _flushed; + + public AsyncFlush() + { + } + + @Override + protected boolean process() + { + if (BufferUtil.hasContent(_aggregate)) + { + _flushed=true; + _channel.write(_aggregate, false, this); + return false; + } + + if (!_flushed) + { + _flushed=true; + _channel.write(BufferUtil.EMPTY_BUFFER,false,this); + return false; + } + + return true; + } + + @Override + protected void completed() + { + try + { + while(true) + { + State last=_state.get(); + switch(last) + { + case PENDING: + if (!_state.compareAndSet(State.PENDING, State.ASYNC)) + continue; + break; + + case UNREADY: + if (!_state.compareAndSet(State.UNREADY, State.READY)) + continue; + _channel.getState().onWritePossible(); + break; + + case CLOSED: + _onError=new EofException("Closed"); + break; + + default: + throw new IllegalStateException(); + } + break; + } + } + catch (Exception e) + { + _onError=e; + _channel.getState().onWritePossible(); + } + } + + @Override + public void failed(Throwable e) + { + super.failed(e); + _onError=e; + _channel.getState().onWritePossible(); + } + + + } + + /* ------------------------------------------------------------ */ - /** An iterating callback that will take content from an + /** An iterating callback that will take content from an * InputStream and write it to the associated {@link HttpChannel}. - * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used. + * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used. * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to - * be notified as each buffer is written and only once all the input is consumed will the - * wrapped {@link Callback#succeeded()} method be called. + * be notified as each buffer is written and only once all the input is consumed will the + * wrapped {@link Callback#succeeded()} method be called. */ - private class InputStreamWritingCB extends IteratingCallback + private class InputStreamWritingCB extends IteratingNestedCallback { private final InputStream _in; private final ByteBuffer _buffer; private boolean _eof; - + public InputStreamWritingCB(InputStream in, Callback callback) - { + { super(callback); _in=in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false); @@ -502,6 +856,7 @@ public class HttpOutput extends ServletOutputStream _buffer.position(0); _buffer.limit(len); _channel.write(_buffer,_eof,this); + return false; } @@ -519,26 +874,26 @@ public class HttpOutput extends ServletOutputStream LOG.ignore(e); } } - + } /* ------------------------------------------------------------ */ - /** An iterating callback that will take content from a + /** An iterating callback that will take content from a * ReadableByteChannel and write it to the {@link HttpChannel}. * A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if * {@link HttpChannel#useDirectBuffers()} is true. * This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to - * be notified as each buffer is written and only once all the input is consumed will the - * wrapped {@link Callback#succeeded()} method be called. + * be notified as each buffer is written and only once all the input is consumed will the + * wrapped {@link Callback#succeeded()} method be called. */ - private class ReadableByteChannelWritingCB extends IteratingCallback + private class ReadableByteChannelWritingCB extends IteratingNestedCallback { private final ReadableByteChannel _in; private final ByteBuffer _buffer; private boolean _eof; - + public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) - { + { super(callback); _in=in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); @@ -561,10 +916,11 @@ public class HttpOutput extends ServletOutputStream _buffer.clear(); while (_buffer.hasRemaining() && !_eof) _eof = (_in.read(_buffer)) < 0; - + // write what we have _buffer.flip(); _channel.write(_buffer,_eof,this); + return false; } @@ -583,4 +939,5 @@ public class HttpOutput extends ServletOutputStream } } } + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java index ef62bdc86f..0ab12b207e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.server; -import java.io.IOException; import java.nio.ByteBuffer; import org.eclipse.jetty.http.HttpGenerator; @@ -26,9 +25,6 @@ import org.eclipse.jetty.util.Callback; public interface HttpTransport { - @Deprecated - void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException; - void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback); void send(ByteBuffer content, boolean lastContent, Callback callback); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java index 83335c4dc3..0f4abd90fb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java @@ -35,10 +35,10 @@ public class Iso88591HttpWriter extends HttpWriter public void write (char[] s,int offset, int length) throws IOException { HttpOutput out = _out; - if (length==0) + if (length==0 && out.isAllContentWritten()) { - if (_out.isAllContentWritten()) - close(); + close(); + return; } if (length==1) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 6d161def3a..acf1cf073f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -172,7 +172,6 @@ public class LocalConnector extends AbstractConnector Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint); endPoint.setConnection(connection); -// connectionOpened(connection); connection.onOpen(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java index d7304d8fc3..df8ce174c6 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java @@ -39,7 +39,6 @@ public interface NetworkConnector extends Connector, AutoCloseable * (for example, to stop accepting network connections).</p> * Once a connector has been closed, it cannot be opened again without first * calling {@link #stop()} and it will not be active again until a subsequent call to {@link #start()} - * @throws IOException if this connector cannot be closed */ @Override void close(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java new file mode 100644 index 0000000000..fb0c23a3fe --- /dev/null +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java @@ -0,0 +1,160 @@ +// +// ======================================================================== +// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.io.InterruptedIOException; +import javax.servlet.ServletInputStream; + +import org.eclipse.jetty.util.ArrayQueue; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + * <p>{@link QueuedHttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p> + * <p>{@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.</p> + * <p>{@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them + * but simply holds references to the item, thus the caller must organize for those buffers to valid while + * held by this class.</p> + * <p>To assist the caller, subclasses may override methods {@link #onAsyncRead()}, + * {@link #onContentConsumed(Object)} and {@link #onAllContentConsumed()} that can be implemented so that the + * caller will know when buffers are queued and consumed.</p> + */ +public abstract class QueuedHttpInput<T> extends HttpInput<T> +{ + private final static Logger LOG = Log.getLogger(QueuedHttpInput.class); + + private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock()); + + public QueuedHttpInput() + {} + + public void recycle() + { + synchronized (lock()) + { + T item = _inputQ.peekUnsafe(); + while (item != null) + { + _inputQ.pollUnsafe(); + onContentConsumed(item); + + item = _inputQ.peekUnsafe(); + if (item == null) + onAllContentConsumed(); + } + super.recycle(); + } + } + + @Override + protected T nextContent() + { + T item = _inputQ.peekUnsafe(); + + // Skip empty items at the head of the queue + while (item != null && remaining(item) == 0) + { + _inputQ.pollUnsafe(); + onContentConsumed(item); + LOG.debug("{} consumed {}", this, item); + item = _inputQ.peekUnsafe(); + + // If that was the last item then notify + if (item==null) + onAllContentConsumed(); + } + return item; + } + + protected abstract void onContentConsumed(T item); + + protected void blockForContent() throws IOException + { + synchronized (lock()) + { + while (_inputQ.isEmpty() && !_state.isEOF()) + { + try + { + LOG.debug("{} waiting for content", this); + lock().wait(); + } + catch (InterruptedException e) + { + throw (IOException)new InterruptedIOException().initCause(e); + } + } + } + } + + + /* ------------------------------------------------------------ */ + /** Called by this HttpInput to signal all available content has been consumed + */ + protected void onAllContentConsumed() + { + } + + /* ------------------------------------------------------------ */ + /** Add some content to the input stream + * @param item + */ + public void content(T item) + { + // The buffer is not copied here. This relies on the caller not recycling the buffer + // until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are + // the signals to the caller that the buffers can be recycled. + + synchronized (lock()) + { + boolean empty=_inputQ.isEmpty(); + + _inputQ.add(item); + + if (empty) + { + if (!onAsyncRead()) + lock().notify(); + } + + LOG.debug("{} queued {}", this, item); + } + } + + + public void earlyEOF() + { + synchronized (lock()) + { + super.earlyEOF(); + lock().notify(); + } + } + + public void messageComplete() + { + synchronized (lock()) + { + super.messageComplete(); + lock().notify(); + } + } + +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index c6db08cc1e..2a7ba2574e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -57,6 +57,7 @@ import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpUpgradeHandler; import javax.servlet.http.Part; import org.eclipse.jetty.http.HttpCookie; @@ -494,6 +495,16 @@ public class Request implements HttpServletRequest { return (int)_fields.getLongField(HttpHeader.CONTENT_LENGTH.toString()); } + + /* ------------------------------------------------------------ */ + /* + * @see javax.servlet.ServletRequest.getContentLengthLong() + */ + @Override + public long getContentLengthLong() + { + return _fields.getLongField(HttpHeader.CONTENT_LENGTH.toString()); + } /* ------------------------------------------------------------ */ /* @@ -532,7 +543,12 @@ public class Request implements HttpServletRequest public Cookie[] getCookies() { if (_cookiesExtracted) - return _cookies == null?null:_cookies.getCookies(); + { + if (_cookies == null || _cookies.getCookies().length == 0) + return null; + + return _cookies.getCookies(); + } _cookiesExtracted = true; @@ -551,7 +567,11 @@ public class Request implements HttpServletRequest } } - return _cookies == null?null:_cookies.getCookies(); + //Javadoc for Request.getCookies() stipulates null for no cookies + if (_cookies == null || _cookies.getCookies().length == 0) + return null; + + return _cookies.getCookies(); } /* ------------------------------------------------------------ */ @@ -1607,9 +1627,7 @@ public class Request implements HttpServletRequest /* ------------------------------------------------------------ */ /* * Set a request attribute. if the attribute name is "org.eclipse.jetty.server.server.Request.queryEncoding" then the value is also passed in a call to - * {@link #setQueryEncoding}. <p> if the attribute name is "org.eclipse.jetty.server.server.ResponseBuffer", then the response buffer is flushed with @{link - * #flushResponseBuffer} <p> if the attribute name is "org.eclipse.jetty.io.EndPoint.maxIdleTime", then the value is passed to the associated {@link - * EndPoint#setIdleTimeout}. + * {@link #setQueryEncoding}. * * @see javax.servlet.ServletRequest#setAttribute(java.lang.String, java.lang.Object) */ @@ -1618,35 +1636,11 @@ public class Request implements HttpServletRequest { Object old_value = _attributes == null?null:_attributes.getAttribute(name); - if (name.startsWith("org.eclipse.jetty.")) - { - if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name)) - setQueryEncoding(value == null?null:value.toString()); - else if ("org.eclipse.jetty.server.sendContent".equals(name)) - { - try - { - ((HttpOutput)getServletResponse().getOutputStream()).sendContent(value); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - else if ("org.eclipse.jetty.server.ResponseBuffer".equals(name)) - { - try - { - throw new IOException("not implemented"); - //((HttpChannel.Output)getServletResponse().getOutputStream()).sendResponse(byteBuffer); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - } - + if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name)) + setQueryEncoding(value == null?null:value.toString()); + else if ("org.eclipse.jetty.server.sendContent".equals(name)) + LOG.warn("Deprecated: org.eclipse.jetty.server.sendContent"); + if (_attributes == null) _attributes = new AttributesMap(); _attributes.setAttribute(name,value); @@ -2194,4 +2188,32 @@ public class Request implements HttpServletRequest setParameters(parameters); setQueryString(query); } + + + + /** + * @see javax.servlet.http.HttpServletRequest#upgrade(java.lang.Class) + */ + @Override + public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException + { + if (getContext() == null) + throw new ServletException ("Unable to instantiate "+handlerClass); + + try + { + //Instantiate an instance and inject it + T h = getContext().createInstance(handlerClass); + + //TODO handle the rest of the upgrade process + + return h; + } + catch (Exception e) + { + if (e instanceof ServletException) + throw (ServletException)e; + throw new ServletException(e); + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java index c41ee2b03b..0d67712f3e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java @@ -284,26 +284,9 @@ public class ResourceCache { try { - int len=(int)resource.length(); - if (len<0) - { - LOG.warn("invalid resource: "+String.valueOf(resource)+" "+len); - return null; - } - ByteBuffer buffer = BufferUtil.allocate(len); - int pos=BufferUtil.flipToFill(buffer); - if (resource.getFile()!=null) - BufferUtil.readFrom(resource.getFile(),buffer); - else - { - InputStream is = resource.getInputStream(); - BufferUtil.readFrom(is,len,buffer); - is.close(); - } - BufferUtil.flipToFlush(buffer,pos); - return buffer; + return BufferUtil.toBuffer(resource,true); } - catch(IOException e) + catch(IOException|IllegalArgumentException e) { LOG.warn(e); return null; @@ -316,30 +299,11 @@ public class ResourceCache try { if (_useFileMappedBuffer && resource.getFile()!=null) - return BufferUtil.toBuffer(resource.getFile()); - - int len=(int)resource.length(); - if (len<0) - { - LOG.warn("invalid resource: "+String.valueOf(resource)+" "+len); - return null; - } - ByteBuffer buffer = BufferUtil.allocateDirect(len); - - int pos=BufferUtil.flipToFill(buffer); - if (resource.getFile()!=null) - BufferUtil.readFrom(resource.getFile(),buffer); - else - { - InputStream is = resource.getInputStream(); - BufferUtil.readFrom(is,len,buffer); - is.close(); - } - BufferUtil.flipToFlush(buffer,pos); + return BufferUtil.toMappedBuffer(resource.getFile()); - return buffer; + return BufferUtil.toBuffer(resource,true); } - catch(IOException e) + catch(IOException|IllegalArgumentException e) { LOG.warn(e); return null; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index b1fa612447..b32e681ec7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -99,6 +99,7 @@ public class Response implements HttpServletResponse private Locale _locale; private MimeTypes.Type _mimeType; private String _characterEncoding; + private boolean _explicitEncoding; private String _contentType; private OutputType _outputType = OutputType.NONE; private ResponseWriter _writer; @@ -128,6 +129,7 @@ public class Response implements HttpServletResponse _contentLength = -1; _out.reset(); _fields.clear(); + _explicitEncoding=false; } public void setHeaders(HttpContent httpContent) @@ -173,6 +175,10 @@ public class Response implements HttpServletResponse public void included() { _include.decrementAndGet(); + if (_outputType == OutputType.WRITER) + { + _writer.reopen(); + } _out.reopen(); } @@ -454,10 +460,18 @@ public class Response implements HttpServletResponse _channel.sendResponse(HttpGenerator.PROGRESS_102_INFO, null, true); } } - - @Override - public void sendRedirect(String location) throws IOException + + /** + * Sends a response with one of the 300 series redirection codes. + * @param code + * @param location + * @throws IOException + */ + public void sendRedirect(int code, String location) throws IOException { + if ((code < HttpServletResponse.SC_MULTIPLE_CHOICES) || (code >= HttpServletResponse.SC_BAD_REQUEST)) + throw new IllegalArgumentException("Not a 3xx redirect code"); + if (isIncluding()) return; @@ -467,7 +481,15 @@ public class Response implements HttpServletResponse if (!URIUtil.hasScheme(location)) { StringBuilder buf = _channel.getRequest().getRootURL(); - if (location.startsWith("/")) + + if (location.startsWith("//")) + { + buf.delete(0, buf.length()); + buf.append(_channel.getRequest().getScheme()); + buf.append(":"); + buf.append(location); + } + else if (location.startsWith("/")) buf.append(location); else { @@ -515,11 +537,17 @@ public class Response implements HttpServletResponse resetBuffer(); setHeader(HttpHeader.LOCATION, location); - setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY); + setStatus(code); closeOutput(); } @Override + public void sendRedirect(String location) throws IOException + { + sendRedirect(HttpServletResponse.SC_MOVED_TEMPORARILY, location); + } + + @Override public void setDateHeader(String name, long date) { if (!isIncluding()) @@ -723,7 +751,7 @@ public class Response implements HttpServletResponse encoding = MimeTypes.inferCharsetFromContentType(_contentType); if (encoding == null) encoding = StringUtil.__ISO_8859_1; - setCharacterEncoding(encoding); + setCharacterEncoding(encoding,false); } if (_writer != null && _writer.isFor(encoding)) @@ -787,6 +815,8 @@ public class Response implements HttpServletResponse { case WRITER: _writer.close(); + if (!_out.isClosed()) + _out.close(); break; case STREAM: getOutputStream().close(); @@ -811,10 +841,21 @@ public class Response implements HttpServletResponse _contentLength = len; _fields.putLongField(HttpHeader.CONTENT_LENGTH.toString(), len); } + + @Override + public void setContentLengthLong(long length) + { + setLongContentLength(length); + } @Override public void setCharacterEncoding(String encoding) { + setCharacterEncoding(encoding,true); + } + + private void setCharacterEncoding(String encoding, boolean explicit) + { if (isIncluding()) return; @@ -822,6 +863,8 @@ public class Response implements HttpServletResponse { if (encoding == null) { + _explicitEncoding=false; + // Clear any encoding. if (_characterEncoding != null) { @@ -840,6 +883,7 @@ public class Response implements HttpServletResponse else { // No, so just add this one to the mimetype + _explicitEncoding=explicit; _characterEncoding = StringUtil.normalizeCharset(encoding); if (_contentType != null) { @@ -900,6 +944,7 @@ public class Response implements HttpServletResponse else { _characterEncoding = charset; + _explicitEncoding = true; } HttpField field = HttpField.CONTENT_TYPE.get(_contentType); @@ -1040,8 +1085,8 @@ public class Response implements HttpServletResponse String charset = _channel.getRequest().getContext().getContextHandler().getLocaleEncoding(locale); - if (charset != null && charset.length() > 0 && _characterEncoding == null) - setCharacterEncoding(charset); + if (charset != null && charset.length() > 0 && !_explicitEncoding) + setCharacterEncoding(charset,false); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 889e2ffefa..7fedc13f42 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -67,7 +67,7 @@ import org.eclipse.jetty.util.thread.Scheduler; * which are implemented to each use a NIO {@link Selector} instance to asynchronously * schedule a set of accepted connections. It is the selector thread that will call the * {@link Callback} instances passed in the {@link EndPoint#fillInterested(Callback)} or - * {@link EndPoint#write(Object, Callback, java.nio.ByteBuffer...)} methods. It is expected + * {@link EndPoint#write(Callback, java.nio.ByteBuffer...)} methods. It is expected * that these callbacks may do some non-blocking IO work, but will always dispatch to the * {@link Executor} service any blocking, long running or application tasks. * <p> diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java index b13144840a..b0c6270e96 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java @@ -31,10 +31,15 @@ import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpUpgradeHandler; import javax.servlet.http.Part; + /* ------------------------------------------------------------ */ -/** Class to tunnel a ServletRequest via a HttpServletRequest +/** + * ServletRequestHttpWrapper + * + * Class to tunnel a ServletRequest via a HttpServletRequest */ public class ServletRequestHttpWrapper extends ServletRequestWrapper implements HttpServletRequest { @@ -209,4 +214,25 @@ public class ServletRequestHttpWrapper extends ServletRequestWrapper implements } + /** + * @see javax.servlet.http.HttpServletRequest#changeSessionId() + */ + @Override + public String changeSessionId() + { + // TODO 3.1 Auto-generated method stub + return null; + } + + /** + * @see javax.servlet.http.HttpServletRequest#upgrade(java.lang.Class) + */ + @Override + public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException + { + // TODO 3.1 Auto-generated method stub + return null; + } + + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java index 1f1ffb4f42..4c62b9e692 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java @@ -28,7 +28,10 @@ import javax.servlet.http.HttpServletResponse; /* ------------------------------------------------------------ */ -/** Wrapper to tunnel a ServletResponse via a HttpServletResponse +/** + * ServletResponseHttpWrapper + * + * Wrapper to tunnel a ServletResponse via a HttpServletResponse */ public class ServletResponseHttpWrapper extends ServletResponseWrapper implements HttpServletResponse { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java index 251e3d0fd4..6028109f8e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java @@ -43,10 +43,10 @@ public class Utf8HttpWriter extends HttpWriter public void write (char[] s,int offset, int length) throws IOException { HttpOutput out = _out; - if (length==0) + if (length==0 && out.isAllContentWritten()) { - if (_out.isAllContentWritten()) - close(); + close(); + return; } while (length > 0) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java index 91d66baa23..2554437302 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java @@ -104,10 +104,16 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu { public final static int SERVLET_MAJOR_VERSION=3; public final static int SERVLET_MINOR_VERSION=0; + public static final Class[] SERVLET_LISTENER_TYPES = new Class[] {ServletContextListener.class, + ServletContextAttributeListener.class, + ServletRequestListener.class, + ServletRequestAttributeListener.class}; - final private static String __unimplmented="Unimplemented - use org.eclipse.jetty.servlet.ServletContextHandler"; + public static final int DEFAULT_LISTENER_TYPE_INDEX = 1; + public static final int EXTENDED_LISTENER_TYPE_INDEX = 0; + final private static String __unimplmented="Unimplemented - use org.eclipse.jetty.servlet.ServletContextHandler"; private static final Logger LOG = Log.getLogger(ContextHandler.class); @@ -550,7 +556,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu _requestListeners.clear(); _requestAttributeListeners.clear(); _eventListeners.clear(); - + if (eventListeners!=null) for (EventListener listener : eventListeners) addEventListener(listener); @@ -628,7 +634,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu return _programmaticListeners.contains(listener); } - + /* ------------------------------------------------------------ */ /** @@ -828,7 +834,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu for (int i = _contextListeners.size(); i-->0;) callContextDestroyed(_contextListeners.get(i),event); } - + //retain only durable listeners setEventListeners(_durableListeners.toArray(new EventListener[_durableListeners.size()])); _durableListeners.clear(); @@ -968,7 +974,9 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu if (old_context != _scontext) { // check the target. - if (DispatcherType.REQUEST.equals(dispatch) || DispatcherType.ASYNC.equals(dispatch) || (DispatcherType.ERROR.equals(dispatch) && baseRequest.getHttpChannelState().isExpired())) + if (DispatcherType.REQUEST.equals(dispatch) || + DispatcherType.ASYNC.equals(dispatch) || + DispatcherType.ERROR.equals(dispatch) && baseRequest.getHttpChannelState().isAsync()) { if (_compactPath) target = URIUtil.compactPath(target); @@ -1303,13 +1311,13 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu LOG.warn(this+" contextPath ends with /"); contextPath=contextPath.substring(0,contextPath.length()-1); } - + if (contextPath.length()==0) { LOG.warn("Empty contextPath"); contextPath="/"; } - + _contextPath = contextPath; if (getServer() != null && (getServer().isStarting() || getServer().isStarted())) @@ -1720,6 +1728,8 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu public class Context extends NoContext { protected boolean _enabled = true; //whether or not the dynamic API is enabled for callers + protected boolean _extendedListenerTypes = false; + /* ------------------------------------------------------------ */ protected Context() @@ -2124,6 +2134,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu try { Class<? extends EventListener> clazz = _classLoader==null?Loader.loadClass(ContextHandler.class,className):_classLoader.loadClass(className); + checkListener(clazz); addListener(clazz); } catch (ClassNotFoundException e) @@ -2137,6 +2148,9 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu { if (!_enabled) throw new UnsupportedOperationException(); + + checkListener(t.getClass()); + ContextHandler.this.addEventListener(t); ContextHandler.this.addProgrammaticListener(t); } @@ -2147,6 +2161,8 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu if (!_enabled) throw new UnsupportedOperationException(); + checkListener(listenerClass); + try { EventListener e = createListener(listenerClass); @@ -2164,18 +2180,42 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu { try { - return clazz.newInstance(); + return createInstance(clazz); } - catch (InstantiationException e) + catch (Exception e) { throw new ServletException(e); } - catch (IllegalAccessException e) + } + + + public void checkListener (Class<? extends EventListener> listener) throws IllegalStateException + { + boolean ok = false; + int startIndex = (isExtendedListenerTypes()?EXTENDED_LISTENER_TYPE_INDEX:DEFAULT_LISTENER_TYPE_INDEX); + for (int i=startIndex;i<SERVLET_LISTENER_TYPES.length;i++) { - throw new ServletException(e); + if (SERVLET_LISTENER_TYPES[i].isAssignableFrom(listener)) + { + ok = true; + break; + } } + if (!ok) + throw new IllegalArgumentException("Inappropriate listener class "+listener.getName()); } + public void setExtendedListenerTypes (boolean extended) + { + _extendedListenerTypes = extended; + } + + public boolean isExtendedListenerTypes() + { + return _extendedListenerTypes; + } + + @Override public ClassLoader getClassLoader() { @@ -2213,6 +2253,14 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu { return _enabled; } + + + + public <T> T createInstance (Class<T> clazz) throws Exception + { + T o = clazz.newInstance(); + return o; + } } @@ -2553,6 +2601,16 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu { LOG.warn(__unimplmented); } + + /** + * @see javax.servlet.ServletContext#getVirtualServerName() + */ + @Override + public String getVirtualServerName() + { + // TODO 3.1 Auto-generated method stub + return null; + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java index d1e13ce35b..451e3afae1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java @@ -533,7 +533,7 @@ public class ResourceHandler extends HandlerWrapper resource.length()>_minMemoryMappedContentLength && resource instanceof FileResource) { - ByteBuffer buffer = BufferUtil.toBuffer(resource.getFile()); + ByteBuffer buffer = BufferUtil.toMappedBuffer(resource.getFile()); ((HttpOutput)out).sendContent(buffer,callback); } else // Do a blocking write of a channel (if available) or input stream @@ -553,7 +553,7 @@ public class ResourceHandler extends HandlerWrapper resource.length()>_minMemoryMappedContentLength && resource instanceof FileResource) { - ByteBuffer buffer = BufferUtil.toBuffer(resource.getFile()); + ByteBuffer buffer = BufferUtil.toMappedBuffer(resource.getFile()); ((HttpOutput)out).sendContent(buffer); } else // Do a blocking write of a channel (if available) or input stream diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java index 78e8eb71ef..26a0d8dcf2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java @@ -81,7 +81,6 @@ public class StatisticsHandler extends HandlerWrapper @Override public void onComplete(AsyncEvent event) throws IOException { - HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState(); Request request = state.getBaseRequest(); @@ -92,8 +91,7 @@ public class StatisticsHandler extends HandlerWrapper updateResponse(request); - if (!state.isDispatched()) - _asyncWaitStats.decrement(); + _asyncWaitStats.decrement(); } }; @@ -139,9 +137,7 @@ public class StatisticsHandler extends HandlerWrapper { // resumed request start = System.currentTimeMillis(); - _asyncWaitStats.decrement(); - if (state.isDispatched()) - _asyncDispatches.incrementAndGet(); + _asyncDispatches.incrementAndGet(); } try @@ -159,8 +155,10 @@ public class StatisticsHandler extends HandlerWrapper if (state.isSuspended()) { if (state.isInitial()) + { state.addListener(_onCompletion); - _asyncWaitStats.increment(); + _asyncWaitStats.increment(); + } } else if (state.isInitial()) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java index db6b465c5e..96db015f21 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java @@ -51,8 +51,8 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI { final static Logger LOG = SessionHandler.LOG; public final static String SESSION_KNOWN_ONLY_TO_AUTHENTICATED="org.eclipse.jetty.security.sessionKnownOnlytoAuthenticated"; - private String _clusterId; // ID unique within cluster - private String _nodeId; // ID unique within node + private String _clusterId; // ID without any node (ie "worker") id appended + private String _nodeId; // ID of session with node(ie "worker") id appended private final AbstractSessionManager _manager; private final Map<String,Object> _attributes=new HashMap<String, Object>(); private boolean _idChanged; @@ -185,6 +185,7 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI @Override public long getCreationTime() throws IllegalStateException { + checkValid(); return _created; } @@ -365,6 +366,7 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI @Override public void invalidate() throws IllegalStateException { + checkValid(); // remove session from context and invalidate other sessions with same ID. _manager.removeSession(this,true); doInvalidate(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java index 4801d16dd4..b13419bc21 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java @@ -38,6 +38,7 @@ import javax.servlet.http.HttpSessionAttributeListener; import javax.servlet.http.HttpSessionBindingEvent; import javax.servlet.http.HttpSessionContext; import javax.servlet.http.HttpSessionEvent; +import javax.servlet.http.HttpSessionIdListener; import javax.servlet.http.HttpSessionListener; import org.eclipse.jetty.http.HttpCookie; @@ -107,6 +108,7 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement protected final List<HttpSessionAttributeListener> _sessionAttributeListeners = new CopyOnWriteArrayList<HttpSessionAttributeListener>(); protected final List<HttpSessionListener> _sessionListeners= new CopyOnWriteArrayList<HttpSessionListener>(); + protected final List<HttpSessionIdListener> _sessionIdListeners = new CopyOnWriteArrayList<HttpSessionIdListener>(); protected ClassLoader _loader; protected ContextHandler.Context _context; @@ -191,6 +193,8 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement _sessionAttributeListeners.add((HttpSessionAttributeListener)listener); if (listener instanceof HttpSessionListener) _sessionListeners.add((HttpSessionListener)listener); + if (listener instanceof HttpSessionIdListener) + _sessionIdListeners.add((HttpSessionIdListener)listener); } /* ------------------------------------------------------------ */ @@ -198,6 +202,7 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement { _sessionAttributeListeners.clear(); _sessionListeners.clear(); + _sessionIdListeners.clear(); } /* ------------------------------------------------------------ */ @@ -411,7 +416,6 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement /* ------------------------------------------------------------ */ /** - * @return if true, session cookie will be marked as secure only iff * HTTPS request. Can be overridden by setting SessionCookieConfig.setSecure(true), * in which case the session cookie will be marked as secure on both HTTPS and HTTP. */ @@ -1006,6 +1010,29 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement { _checkingRemoteSessionIdEncoding=remote; } + + + /* ------------------------------------------------------------ */ + /** + * Tell the HttpSessionIdListeners the id changed. + * NOTE: this method must be called LAST in subclass overrides, after the session has been updated + * with the new id. + * @see org.eclipse.jetty.server.SessionManager#renewSessionId(java.lang.String, java.lang.String, java.lang.String, java.lang.String) + */ + @Override + public void renewSessionId(String oldClusterId, String oldNodeId, String newClusterId, String newNodeId) + { + if (!_sessionIdListeners.isEmpty()) + { + AbstractSession session = getSession(newClusterId); + HttpSessionEvent event = new HttpSessionEvent(session); + for (HttpSessionIdListener l:_sessionIdListeners) + { + l.sessionIdChanged(event, oldClusterId); + } + } + + } /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java index 2f7e1b4f5e..65bcf1f071 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java @@ -83,7 +83,7 @@ public class HashSessionManager extends AbstractSessionManager /* ------------------------------------------------------------ */ /** - * @see org.eclipse.jetty.servlet.AbstractSessionManager#doStart() + * @see AbstractSessionManager#doStart() */ @Override public void doStart() throws Exception @@ -116,7 +116,7 @@ public class HashSessionManager extends AbstractSessionManager /* ------------------------------------------------------------ */ /** - * @see org.eclipse.jetty.servlet.AbstractSessionManager#doStop() + * @see AbstractSessionManager#doStop() */ @Override public void doStop() throws Exception @@ -440,6 +440,8 @@ public class HashSessionManager extends AbstractSessionManager session.setNodeId(newNodeId); session.save(); //save updated session: TODO consider only saving file if idled sessions.put(newClusterId, session); + + super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId); } catch (Exception e) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java index bf595d15bb..75c40e8a4e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java @@ -647,6 +647,8 @@ public class JDBCSessionManager extends AbstractSessionManager LOG.warn(e); } } + + super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId); } @@ -949,7 +951,7 @@ public class JDBCSessionManager extends AbstractSessionManager /** * Insert a session into the database. * - * @param data + * @param session * @throws Exception */ protected void storeSession (Session session) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java index 0c8df262b7..44de98106a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java @@ -29,6 +29,9 @@ import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpSessionAttributeListener; +import javax.servlet.http.HttpSessionIdListener; +import javax.servlet.http.HttpSessionListener; import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.server.Request; @@ -46,6 +49,11 @@ public class SessionHandler extends ScopedHandler final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session"); public final static EnumSet<SessionTrackingMode> DEFAULT_TRACKING = EnumSet.of(SessionTrackingMode.COOKIE,SessionTrackingMode.URL); + + public static final Class[] SESSION_LISTENER_TYPES = new Class[] {HttpSessionAttributeListener.class, + HttpSessionIdListener.class, + HttpSessionListener.class}; + /* -------------------------------------------------------------- */ |