Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-server/src/main/java')
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java20
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java11
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferQueuedHttpInput.java (renamed from jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferHttpInput.java)11
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java6
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java10
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java4
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java101
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java464
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java378
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java451
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java168
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java669
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java4
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java6
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java1
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java1
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java160
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/Request.java90
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java46
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/Response.java61
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java2
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java28
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java5
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java6
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java80
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java4
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java12
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java6
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java29
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java6
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java4
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java8
32 files changed, 1875 insertions, 977 deletions
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
index 28fea907e5..0747b01c52 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
@@ -268,10 +268,13 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
protected void interruptAcceptors()
{
- for (Thread thread : _acceptors)
+ synchronized (this)
{
- if (thread != null)
- thread.interrupt();
+ for (Thread thread : _acceptors)
+ {
+ if (thread != null)
+ thread.interrupt();
+ }
}
}
@@ -306,9 +309,12 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
public void join(long timeout) throws InterruptedException
{
- for (Thread thread : _acceptors)
- if (thread != null)
- thread.join(timeout);
+ synchronized (this)
+ {
+ for (Thread thread : _acceptors)
+ if (thread != null)
+ thread.join(timeout);
+ }
}
protected abstract void accept(int acceptorID) throws IOException, InterruptedException;
@@ -464,7 +470,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
if (isAccepting())
LOG.warn(e);
else
- LOG.debug(e);
+ LOG.ignore(e);
}
}
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java
index 876e7aeae3..2538f41bd3 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java
@@ -28,6 +28,8 @@ import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
+import org.eclipse.jetty.server.handler.ContextHandler;
+
public class AsyncContextState implements AsyncContext
{
@@ -92,17 +94,20 @@ public class AsyncContextState implements AsyncContext
@Override
public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
- {
+ {
+ ContextHandler contextHandler = state().getContextHandler();
+ if (contextHandler != null)
+ return contextHandler.getServletContext().createListener(clazz);
try
{
return clazz.newInstance();
}
- catch(Exception e)
+ catch (Exception e)
{
throw new ServletException(e);
}
}
-
+
@Override
public void dispatch()
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferQueuedHttpInput.java
index 90d81d90e2..db5e841927 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferHttpInput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ByteBufferQueuedHttpInput.java
@@ -20,10 +20,12 @@ package org.eclipse.jetty.server;
import java.nio.ByteBuffer;
+import javax.servlet.ReadListener;
+
/**
* <p>An implementation of HttpInput using {@link ByteBuffer} as items.</p>
*/
-public class ByteBufferHttpInput extends HttpInput<ByteBuffer>
+public class ByteBufferQueuedHttpInput extends QueuedHttpInput<ByteBuffer>
{
@Override
protected int remaining(ByteBuffer item)
@@ -38,9 +40,16 @@ public class ByteBufferHttpInput extends HttpInput<ByteBuffer>
item.get(buffer, offset, l);
return l;
}
+
+ @Override
+ protected void consume(ByteBuffer item, int length)
+ {
+ item.position(item.position()+length);
+ }
@Override
protected void onContentConsumed(ByteBuffer item)
{
}
+
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java
index 7725a342eb..fd8c1c5eea 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java
@@ -49,10 +49,10 @@ public class EncodingHttpWriter extends HttpWriter
public void write (char[] s,int offset, int length) throws IOException
{
HttpOutput out = _out;
- if (length==0)
+ if (length==0 && out.isAllContentWritten())
{
- if (_out.isAllContentWritten())
- close();
+ out.close();
+ return;
}
while (length > 0)
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java
index 608a861d7f..845cbc8110 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java
@@ -44,7 +44,7 @@ import org.eclipse.jetty.server.HttpConfiguration.Customizer;
* the request came</p>
* <p>Headers can also be defined so that forwarded SSL Session IDs and Cipher
* suites may be customised</p>
- * @see http://en.wikipedia.org/wiki/X-Forwarded-For
+ * @see <a href="http://en.wikipedia.org/wiki/X-Forwarded-For">Wikipedia: X-Forwarded-For</a>
*/
public class ForwardedRequestCustomizer implements Customizer
{
@@ -66,7 +66,6 @@ public class ForwardedRequestCustomizer implements Customizer
/* ------------------------------------------------------------ */
/**
* Set a forced valued for the host header to control what is returned by {@link ServletRequest#getServerName()} and {@link ServletRequest#getServerPort()}.
- * This value is only used if {@link #isForwarded()} is true.
*
* @param hostHeader
* The value of the host header to force.
@@ -90,7 +89,6 @@ public class ForwardedRequestCustomizer implements Customizer
/**
* @param forwardedHostHeader
* The header name for forwarded hosts (default x-forwarded-host)
- * @see #setForwarded(boolean)
*/
public void setForwardedHostHeader(String forwardedHostHeader)
{
@@ -100,7 +98,6 @@ public class ForwardedRequestCustomizer implements Customizer
/* ------------------------------------------------------------ */
/**
* @return the header name for forwarded server.
- * @see #setForwarded(boolean)
*/
public String getForwardedServerHeader()
{
@@ -111,7 +108,6 @@ public class ForwardedRequestCustomizer implements Customizer
/**
* @param forwardedServerHeader
* The header name for forwarded server (default x-forwarded-server)
- * @see #setForwarded(boolean)
*/
public void setForwardedServerHeader(String forwardedServerHeader)
{
@@ -121,7 +117,6 @@ public class ForwardedRequestCustomizer implements Customizer
/* ------------------------------------------------------------ */
/**
* @return the forwarded for header
- * @see #setForwarded(boolean)
*/
public String getForwardedForHeader()
{
@@ -132,7 +127,6 @@ public class ForwardedRequestCustomizer implements Customizer
/**
* @param forwardedRemoteAddressHeader
* The header name for forwarded for (default x-forwarded-for)
- * @see #setForwarded(boolean)
*/
public void setForwardedForHeader(String forwardedRemoteAddressHeader)
{
@@ -144,7 +138,6 @@ public class ForwardedRequestCustomizer implements Customizer
* Get the forwardedProtoHeader.
*
* @return the forwardedProtoHeader (default X-Forwarded-For)
- * @see #setForwarded(boolean)
*/
public String getForwardedProtoHeader()
{
@@ -157,7 +150,6 @@ public class ForwardedRequestCustomizer implements Customizer
*
* @param forwardedProtoHeader
* the forwardedProtoHeader to set (default X-Forwarded-For)
- * @see #setForwarded(boolean)
*/
public void setForwardedProtoHeader(String forwardedProtoHeader)
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java
index 7cedf160ad..bb96ef689c 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Handler.java
@@ -56,10 +56,10 @@ public interface Handler extends LifeCycle, Destroyable
* @param target The target of the request - either a URI or a name.
* @param baseRequest The original unwrapped request object.
* @param request The request either as the {@link Request}
- * object or a wrapper of that request. The {@link AbstractHttpConnection#getCurrentHttpChannel()}
+ * object or a wrapper of that request. The {@link HttpChannel#getCurrentHttpChannel()}
* method can be used access the Request object if required.
* @param response The response as the {@link Response}
- * object or a wrapper of that request. The {@link AbstractHttpConnection#getCurrentHttpChannel()}
+ * object or a wrapper of that request. The {@link HttpChannel#getCurrentHttpChannel()}
* method can be used access the Response object if required.
* @throws IOException
* @throws ServletException
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
index de338c13db..932135d2a2 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
+import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@@ -43,7 +44,8 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
-import org.eclipse.jetty.server.HttpChannelState.Next;
+import org.eclipse.jetty.server.HttpChannelState.Action;
+import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ErrorHandler;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.Callback;
@@ -105,6 +107,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
_uri = new HttpURI(URIUtil.__CHARSET);
_state = new HttpChannelState(this);
+ input.init(_state);
_request = new Request(this, input);
_response = new Response(this, new HttpOutput(this));
}
@@ -250,36 +253,69 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
// The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called.
- HttpChannelState.Next next = _state.handling();
- while (next==Next.CONTINUE && getServer().isRunning())
+ HttpChannelState.Action action = _state.handling();
+ loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && getServer().isRunning())
{
try
{
- _request.setHandled(false);
- _response.getHttpOutput().reopen();
-
- if (_state.isInitial())
+ LOG.debug("{} action {}",this,action);
+
+ switch(action)
{
- _request.setTimeStamp(System.currentTimeMillis());
- _request.setDispatcherType(DispatcherType.REQUEST);
-
- for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
- customizer.customize(getConnector(),_configuration,_request);
- getServer().handle(this);
- }
- else
- {
- if (_request.getHttpChannelState().isExpired())
- {
+ case REQUEST_DISPATCH:
+ _request.setHandled(false);
+ _response.getHttpOutput().reopen();
+ _request.setTimeStamp(System.currentTimeMillis());
+ _request.setDispatcherType(DispatcherType.REQUEST);
+
+ for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
+ customizer.customize(getConnector(),_configuration,_request);
+ getServer().handle(this);
+ break;
+
+ case ASYNC_DISPATCH:
+ _request.setHandled(false);
+ _response.getHttpOutput().reopen();
+ _request.setDispatcherType(DispatcherType.ASYNC);
+ getServer().handleAsync(this);
+ break;
+
+ case ASYNC_EXPIRED:
+ _request.setHandled(false);
+ _response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR);
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500));
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE,"Async Timeout");
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI());
_response.setStatusWithReason(500,"Async Timeout");
+
+ getServer().handleAsync(this);
+ break;
+
+ case READ_CALLBACK:
+ {
+ ContextHandler handler=_state.getContextHandler();
+ if (handler!=null)
+ handler.handle(_request.getHttpInput());
+ else
+ _request.getHttpInput().run();
+ break;
}
- else
- _request.setDispatcherType(DispatcherType.ASYNC);
- getServer().handleAsync(this);
+
+ case WRITE_CALLBACK:
+ {
+ ContextHandler handler=_state.getContextHandler();
+
+ if (handler!=null)
+ handler.handle(_response.getHttpOutput());
+ else
+ _response.getHttpOutput().run();
+ break;
+ }
+
+ default:
+ break loop;
+
}
}
catch (Error e)
@@ -301,7 +337,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
}
finally
{
- next = _state.unhandle();
+ action = _state.unhandle();
}
}
@@ -309,7 +345,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
Thread.currentThread().setName(threadName);
setCurrentHttpChannel(null);
- if (next==Next.COMPLETE)
+ if (action==Action.COMPLETE)
{
try
{
@@ -327,7 +363,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
}
catch(Exception e)
{
- LOG.warn(e);
+ LOG.warn("handle complete",e);
}
finally
{
@@ -336,9 +372,9 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
}
}
- LOG.debug("{} handle exit, result {}", this, next);
+ LOG.debug("{} handle exit, result {}", this, action);
- return next!=Next.WAIT;
+ return action!=Action.WAIT;
}
/**
@@ -576,7 +612,8 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
@Override
public boolean messageComplete()
{
- _request.getHttpInput().shutdown();
+ LOG.debug("{} messageComplete", this);
+ _request.getHttpInput().messageComplete();
return true;
}
@@ -594,17 +631,19 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
try
{
- if (_state.handling()==Next.CONTINUE)
+ if (_state.handling()==Action.REQUEST_DISPATCH)
sendResponse(new ResponseInfo(HttpVersion.HTTP_1_1,new HttpFields(),0,status,reason,false),null,true);
}
catch (IOException e)
{
- LOG.warn(e);
+ LOG.debug(e);
}
finally
{
- if (_state.unhandle()==Next.COMPLETE)
+ if (_state.unhandle()==Action.COMPLETE)
_state.completed();
+ else
+ throw new IllegalStateException();
}
}
@@ -727,7 +766,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
}
else
{
- LOG.warn(x);
+ LOG.warn("Commit failed",x);
_transport.send(HttpGenerator.RESPONSE_500_INFO,null,true,new Callback()
{
@Override
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
index 127bafa3c1..470acc0a02 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
@@ -21,7 +21,6 @@ package org.eclipse.jetty.server;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.RequestDispatcher;
@@ -62,32 +61,41 @@ public class HttpChannelState
{
IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet
- ASYNCSTARTED, // Suspend called, but not yet returned to container
- REDISPATCHING, // resumed while dispatched
ASYNCWAIT, // Suspended and parked
- REDISPATCH, // Has been scheduled
- REDISPATCHED, // Request redispatched to filter/servlet
- COMPLETECALLED,// complete called
+ ASYNCIO, // Has been dispatched for async IO
COMPLETING, // Request is completable
COMPLETED // Request is complete
}
- public enum Next
+ public enum Action
+ {
+ REQUEST_DISPATCH, // handle a normal request dispatch
+ ASYNC_DISPATCH, // handle an async request dispatch
+ ASYNC_EXPIRED, // handle an async timeout
+ WRITE_CALLBACK, // handle an IO write callback
+ READ_CALLBACK, // handle an IO read callback
+ WAIT, // Wait for further events
+ COMPLETE // Complete the channel
+ }
+
+ public enum Async
{
- CONTINUE, // Continue handling the channel
- WAIT, // Wait for further events
- COMPLETE // Complete the channel
+ STARTED,
+ DISPATCH,
+ COMPLETE,
+ EXPIRING,
+ EXPIRED
}
+ private final boolean DEBUG=LOG.isDebugEnabled();
private final HttpChannel<?> _channel;
- private List<AsyncListener> _lastAsyncListeners;
- private List<AsyncListener> _asyncListeners;
+ private List<AsyncListener> _asyncListeners;
private State _state;
+ private Async _async;
private boolean _initial;
- private boolean _dispatched;
- private boolean _expired;
- private volatile boolean _responseWrapped;
+ private boolean _asyncRead;
+ private boolean _asyncWrite;
private long _timeoutMs=DEFAULT_TIMEOUT;
private AsyncContextEvent _event;
@@ -95,6 +103,7 @@ public class HttpChannelState
{
_channel=channel;
_state=State.IDLE;
+ _async=null;
_initial=true;
}
@@ -145,7 +154,7 @@ public class HttpChannelState
{
synchronized (this)
{
- return super.toString()+"@"+getStatusString();
+ return String.format("%s@%x{s=%s i=%b a=%s}",getClass().getSimpleName(),hashCode(),_state,_initial,_async);
}
}
@@ -153,97 +162,102 @@ public class HttpChannelState
{
synchronized (this)
{
- return _state+
- (_initial?",initial":"")+
- (_dispatched?",resumed":"")+
- (_expired?",expired":"");
+ return String.format("s=%s i=%b a=%s",_state,_initial,_async);
}
}
/**
* @return Next handling of the request should proceed
*/
- protected Next handling()
+ protected Action handling()
{
synchronized (this)
{
+ if(DEBUG)
+ LOG.debug("{} handling {}",this,_state);
switch(_state)
{
case IDLE:
_initial=true;
_state=State.DISPATCHED;
- if (_lastAsyncListeners!=null)
- _lastAsyncListeners.clear();
- if (_asyncListeners!=null)
- _asyncListeners.clear();
- else
- {
- _asyncListeners=_lastAsyncListeners;
- _lastAsyncListeners=null;
- }
- break;
-
- case COMPLETECALLED:
- _state=State.COMPLETING;
- return Next.COMPLETE;
+ return Action.REQUEST_DISPATCH;
case COMPLETING:
- return Next.COMPLETE;
-
- case ASYNCWAIT:
- return Next.WAIT;
+ return Action.COMPLETE;
case COMPLETED:
- return Next.WAIT;
+ return Action.WAIT;
- case REDISPATCH:
- _state=State.REDISPATCHED;
- break;
+ case ASYNCWAIT:
+ if (_asyncRead)
+ {
+ _state=State.ASYNCIO;
+ _asyncRead=false;
+ return Action.READ_CALLBACK;
+ }
+ if (_asyncWrite)
+ {
+ _state=State.ASYNCIO;
+ _asyncWrite=false;
+ return Action.WRITE_CALLBACK;
+ }
+
+ if (_async!=null)
+ {
+ Async async=_async;
+ switch(async)
+ {
+ case COMPLETE:
+ _state=State.COMPLETING;
+ return Action.COMPLETE;
+ case DISPATCH:
+ _state=State.DISPATCHED;
+ _async=null;
+ return Action.ASYNC_DISPATCH;
+ case EXPIRING:
+ break;
+ case EXPIRED:
+ _state=State.DISPATCHED;
+ _async=null;
+ return Action.ASYNC_EXPIRED;
+ case STARTED:
+ if (DEBUG)
+ LOG.debug("TODO Fix this double dispatch",new IllegalStateException(this
+ .getStatusString()));
+ return Action.WAIT;
+ }
+ }
+
+ return Action.WAIT;
default:
throw new IllegalStateException(this.getStatusString());
}
-
- _responseWrapped=false;
- return Next.CONTINUE;
-
}
}
-
public void startAsync(AsyncContextEvent event)
{
+ final List<AsyncListener> lastAsyncListeners;
+
synchronized (this)
{
- switch(_state)
- {
- case DISPATCHED:
- case REDISPATCHED:
- _dispatched=false;
- _expired=false;
- _responseWrapped=event.getSuppliedResponse()!=_channel.getResponse();
- _responseWrapped=false;
- _event=event;
- _state=State.ASYNCSTARTED;
- List<AsyncListener> listeners=_lastAsyncListeners;
- _lastAsyncListeners=_asyncListeners;
- if (listeners!=null)
- listeners.clear();
- _asyncListeners=listeners;
- break;
-
- default:
- throw new IllegalStateException(this.getStatusString());
- }
+ if (_state!=State.DISPATCHED || _async!=null)
+ throw new IllegalStateException(this.getStatusString());
+
+ _async=Async.STARTED;
+ _event=event;
+ lastAsyncListeners=_asyncListeners;
+ _asyncListeners=null;
}
- if (_lastAsyncListeners!=null)
+ if (lastAsyncListeners!=null)
{
- for (AsyncListener listener : _lastAsyncListeners)
+ for (AsyncListener listener : lastAsyncListeners)
{
try
{
- listener.onStartAsync(_event);
+ listener.onStartAsync(event);
}
catch(Exception e)
{
@@ -269,39 +283,63 @@ public class HttpChannelState
* @return next actions
* be handled again (eg because of a resume that happened before unhandle was called)
*/
- protected Next unhandle()
+ protected Action unhandle()
{
synchronized (this)
{
+ if(DEBUG)
+ LOG.debug("{} unhandle {}",this,_state);
+
switch(_state)
{
- case REDISPATCHED:
case DISPATCHED:
- _state=State.COMPLETING;
- return Next.COMPLETE;
-
- case IDLE:
+ case ASYNCIO:
+ break;
+ default:
throw new IllegalStateException(this.getStatusString());
+ }
- case ASYNCSTARTED:
- _initial=false;
- _state=State.ASYNCWAIT;
- scheduleTimeout();
- return Next.WAIT;
-
- case REDISPATCHING:
- _initial=false;
- _state=State.REDISPATCHED;
- return Next.CONTINUE;
-
- case COMPLETECALLED:
- _initial=false;
- _state=State.COMPLETING;
- return Next.COMPLETE;
+ if (_asyncRead)
+ {
+ _state=State.ASYNCIO;
+ _asyncRead=false;
+ return Action.READ_CALLBACK;
+ }
+
+ if (_asyncWrite)
+ {
+ _asyncWrite=false;
+ _state=State.ASYNCIO;
+ return Action.WRITE_CALLBACK;
+ }
- default:
- throw new IllegalStateException(this.getStatusString());
+ if (_async!=null)
+ {
+ _initial=false;
+ switch(_async)
+ {
+ case COMPLETE:
+ _state=State.COMPLETING;
+ _async=null;
+ return Action.COMPLETE;
+ case DISPATCH:
+ _state=State.DISPATCHED;
+ _async=null;
+ return Action.ASYNC_DISPATCH;
+ case EXPIRED:
+ _state=State.DISPATCHED;
+ _async=null;
+ return Action.ASYNC_EXPIRED;
+ case EXPIRING:
+ case STARTED:
+ scheduleTimeout();
+ _state=State.ASYNCWAIT;
+ return Action.WAIT;
+ }
}
+
+ _state=State.COMPLETING;
+ return Action.COMPLETE;
}
}
@@ -310,39 +348,26 @@ public class HttpChannelState
boolean dispatch;
synchronized (this)
{
+ if (_async!=Async.STARTED && _async!=Async.EXPIRING)
+ throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString());
+ _async=Async.DISPATCH;
+ _event.setDispatchTarget(context,path);
+
switch(_state)
{
- case ASYNCSTARTED:
- _state=State.REDISPATCHING;
- _event.setDispatchTarget(context,path);
- _dispatched=true;
- return;
-
- case ASYNCWAIT:
- dispatch=!_expired;
- _state=State.REDISPATCH;
- _event.setDispatchTarget(context,path);
- _dispatched=true;
+ case DISPATCHED:
+ case ASYNCIO:
+ dispatch=false;
break;
-
default:
- throw new IllegalStateException(this.getStatusString());
+ dispatch=true;
+ break;
}
}
+ cancelTimeout();
if (dispatch)
- {
- cancelTimeout();
scheduleDispatch();
- }
- }
-
- public boolean isDispatched()
- {
- synchronized (this)
- {
- return _dispatched;
- }
}
protected void expired()
@@ -351,17 +376,11 @@ public class HttpChannelState
AsyncEvent event;
synchronized (this)
{
- switch(_state)
- {
- case ASYNCSTARTED:
- case ASYNCWAIT:
- _expired=true;
- event=_event;
- aListeners=_asyncListeners;
- break;
- default:
- return;
- }
+ if (_async!=Async.STARTED)
+ return;
+ _async=Async.EXPIRING;
+ event=_event;
+ aListeners=_asyncListeners;
}
if (aListeners!=null)
@@ -378,22 +397,20 @@ public class HttpChannelState
}
}
}
-
+
+ boolean dispatch=false;
synchronized (this)
{
- switch(_state)
+ if (_async==Async.EXPIRING)
{
- case ASYNCSTARTED:
- case ASYNCWAIT:
- _state=State.REDISPATCH;
- break;
- default:
- _expired=false;
- break;
+ _async=Async.EXPIRED;
+ if (_state==State.ASYNCWAIT)
+ dispatch=true;
}
}
- scheduleDispatch();
+ if (dispatch)
+ scheduleDispatch();
}
public void complete()
@@ -402,30 +419,15 @@ public class HttpChannelState
boolean handle;
synchronized (this)
{
- switch(_state)
- {
- case DISPATCHED:
- case REDISPATCHED:
- throw new IllegalStateException(this.getStatusString());
-
- case IDLE:
- case ASYNCSTARTED:
- _state=State.COMPLETECALLED;
- return;
-
- case ASYNCWAIT:
- _state=State.COMPLETECALLED;
- handle=!_expired;
- break;
-
- default:
- throw new IllegalStateException(this.getStatusString());
- }
+ if (_async!=Async.STARTED && _async!=Async.EXPIRING)
+ throw new IllegalStateException(this.getStatusString());
+ _async=Async.COMPLETE;
+ handle=_state==State.ASYNCWAIT;
}
+ cancelTimeout();
if (handle)
{
- cancelTimeout();
ContextHandler handler=getContextHandler();
if (handler!=null)
handler.handle(_channel);
@@ -453,30 +455,34 @@ public class HttpChannelState
}
}
- if (aListeners!=null)
+ if (event!=null)
{
- for (AsyncListener listener : aListeners)
+ if (aListeners!=null)
{
- try
+ if (event.getThrowable()!=null)
{
- if (event!=null && event.getThrowable()!=null)
- {
- event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
- event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
- listener.onError(event);
- }
- else
- listener.onComplete(event);
+ event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
+ event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
}
- catch(Exception e)
+
+ for (AsyncListener listener : aListeners)
{
- LOG.warn(e);
+ try
+ {
+ if (event.getThrowable()!=null)
+ listener.onError(event);
+ else
+ listener.onComplete(event);
+ }
+ catch(Exception e)
+ {
+ LOG.warn(e);
+ }
}
}
- }
- if (event!=null)
event.completed();
+ }
}
protected void recycle()
@@ -486,17 +492,19 @@ public class HttpChannelState
switch(_state)
{
case DISPATCHED:
- case REDISPATCHED:
+ case ASYNCIO:
throw new IllegalStateException(getStatusString());
default:
- _state=State.IDLE;
+ break;
}
- _initial = true;
- _dispatched=false;
- _expired=false;
- _responseWrapped=false;
- cancelTimeout();
+ _asyncListeners=null;
+ _state=State.IDLE;
+ _async=null;
+ _initial=true;
+ _asyncRead=false;
+ _asyncWrite=false;
_timeoutMs=DEFAULT_TIMEOUT;
+ cancelTimeout();
_event=null;
}
}
@@ -515,7 +523,11 @@ public class HttpChannelState
protected void cancelTimeout()
{
- AsyncContextEvent event=_event;
+ final AsyncContextEvent event;
+ synchronized (this)
+ {
+ event=_event;
+ }
if (event!=null)
event.cancelTimeoutTask();
}
@@ -524,7 +536,7 @@ public class HttpChannelState
{
synchronized (this)
{
- return _expired;
+ return _async==Async.EXPIRED;
}
}
@@ -540,17 +552,7 @@ public class HttpChannelState
{
synchronized(this)
{
- switch(_state)
- {
- case ASYNCSTARTED:
- case REDISPATCHING:
- case COMPLETECALLED:
- case ASYNCWAIT:
- return true;
-
- default:
- return false;
- }
+ return _state==State.ASYNCWAIT || _state==State.DISPATCHED && _async==Async.STARTED;
}
}
@@ -573,18 +575,10 @@ public class HttpChannelState
public boolean isAsyncStarted()
{
synchronized (this)
- {
- switch(_state)
- {
- case ASYNCSTARTED: // Suspend called, but not yet returned to container
- case REDISPATCHING: // resumed while dispatched
- case COMPLETECALLED: // complete called
- case ASYNCWAIT:
- return true;
-
- default:
- return false;
- }
+ {
+ if (_state==State.DISPATCHED)
+ return _async!=null;
+ return _async==Async.STARTED || _async==Async.EXPIRING;
}
}
@@ -592,19 +586,7 @@ public class HttpChannelState
{
synchronized (this)
{
- switch(_state)
- {
- case ASYNCSTARTED:
- case REDISPATCHING:
- case ASYNCWAIT:
- case REDISPATCHED:
- case REDISPATCH:
- case COMPLETECALLED:
- return true;
-
- default:
- return false;
- }
+ return !_initial || _async!=null;
}
}
@@ -620,7 +602,12 @@ public class HttpChannelState
public ContextHandler getContextHandler()
{
- final AsyncContextEvent event=_event;
+ final AsyncContextEvent event;
+ synchronized (this)
+ {
+ event=_event;
+ }
+
if (event!=null)
{
Context context=((Context)event.getServletContext());
@@ -632,8 +619,13 @@ public class HttpChannelState
public ServletResponse getServletResponse()
{
- if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
- return _event.getSuppliedResponse();
+ final AsyncContextEvent event;
+ synchronized (this)
+ {
+ event=_event;
+ }
+ if (event!=null && event.getSuppliedResponse()!=null)
+ return event.getSuppliedResponse();
return _channel.getResponse();
}
@@ -652,6 +644,34 @@ public class HttpChannelState
_channel.getRequest().setAttribute(name,attribute);
}
+ public void onReadPossible()
+ {
+ boolean handle;
+
+ synchronized (this)
+ {
+ _asyncRead=true;
+ handle=_state==State.ASYNCWAIT;
+ }
+
+ if (handle)
+ _channel.execute(_channel);
+ }
+
+ public void onWritePossible()
+ {
+ boolean handle;
+
+ synchronized (this)
+ {
+ _asyncWrite=true;
+ handle=_state==State.ASYNCWAIT;
+ }
+
+ if (handle)
+ _channel.execute(_channel);
+ }
+
public class AsyncTimeout implements Runnable
{
@Override
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
index d56fdd3ddc..1518b75894 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java
@@ -18,9 +18,7 @@
package org.eclipse.jetty.server;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.http.HttpGenerator;
@@ -39,7 +37,7 @@ import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.IteratingCallback;
+import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -63,7 +61,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpParser _parser;
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
- private BlockingCallback _readBlocker = new BlockingCallback();
private BlockingCallback _writeBlocker = new BlockingCallback();
@@ -92,9 +89,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_generator = new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
- _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input());
+ _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new HttpInputOverHTTP(this));
_parser = newHttpParser();
-
LOG.debug("New HTTP Connection {}", this);
}
@@ -123,40 +119,11 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return _channel;
}
- public void reset()
+ public HttpParser getParser()
{
- // If we are still expecting
- if (_channel.isExpecting100Continue())
- {
- // reset to avoid seeking remaining content
- _parser.reset();
- // close to seek EOF
- _parser.close();
- if (getEndPoint().isOpen())
- fillInterested();
- }
- // else if we are persistent
- else if (_generator.isPersistent())
- // reset to seek next request
- _parser.reset();
- else
- {
- // else seek EOF
- _parser.close();
- if (getEndPoint().isOpen())
- fillInterested();
- }
-
- _generator.reset();
- _channel.reset();
-
- releaseRequestBuffer();
- if (_chunk!=null)
- {
- _bufferPool.release(_chunk);
- _chunk=null;
- }
+ return _parser;
}
+
@Override
@@ -171,16 +138,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return getHttpChannel().getRequests();
}
- @Override
- public String toString()
- {
- return String.format("%s,g=%s,p=%s",
- super.toString(),
- _generator,
- _parser);
- }
-
- private void releaseRequestBuffer()
+ void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
{
@@ -189,6 +147,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_bufferPool.release(buffer);
}
}
+
+ public ByteBuffer getRequestBuffer()
+ {
+ if (_requestBuffer == null)
+ _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
+ return _requestBuffer;
+ }
/**
* <p>Parses and handles HTTP messages.</p>
@@ -204,77 +169,53 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
LOG.debug("{} onFillable {}", this, _channel.getState());
setCurrentConnection(this);
+ int filled=Integer.MAX_VALUE;
+ boolean suspended=false;
try
{
- while (true)
+ // while not suspended and not upgraded
+ while (!suspended && getEndPoint().getConnection()==this)
{
- // Can the parser progress (even with an empty buffer)
- boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
-
- // Parse the buffer
- if (call_channel)
+ // Do we need some data to parse
+ if (BufferUtil.isEmpty(_requestBuffer))
{
- // Parse as much content as there is available before calling the channel
- // this is both efficient (may queue many chunks), will correctly set available for 100 continues
- // and will drive the parser to completion if all content is available.
- while (_parser.inContentState())
+ // If the previous iteration filled 0 bytes or saw a close, then break here
+ if (filled<=0)
+ break;
+
+ // Can we fill?
+ if(getEndPoint().isInputShutdown())
{
- if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
- break;
+ // No pretend we read -1
+ filled=-1;
+ _parser.atEOF();
}
+ else
+ {
+ // Get a buffer
+ if (_requestBuffer == null)
+ _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
- // The parser returned true, which indicates the channel is ready to handle a request.
- // Call the channel and this will either handle the request/response to completion OR,
- // if the request suspends, the request/response will be incomplete so the outer loop will exit.
- boolean handle=_channel.handle();
-
- // Return if suspended or upgraded
- if (!handle || getEndPoint().getConnection()!=this)
- return;
- }
- else if (BufferUtil.isEmpty(_requestBuffer))
- {
- if (_requestBuffer == null)
- _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
-
- int filled = getEndPoint().fill(_requestBuffer);
- if (filled==0) // Do a retry on fill 0 (optimisation for SSL connections)
+ // fill
filled = getEndPoint().fill(_requestBuffer);
-
- LOG.debug("{} filled {}", this, filled);
-
- // If we failed to fill
- if (filled == 0)
- {
- // Somebody wanted to read, we didn't so schedule another attempt
- releaseRequestBuffer();
- fillInterested();
- return;
- }
- else if (filled < 0)
- {
- _parser.shutdownInput();
- // We were only filling if fully consumed, so if we have
- // read -1 then we have nothing to parse and thus nothing that
- // will generate a response. If we had a suspended request pending
- // a response or a request waiting in the buffer, we would not be here.
- if (getEndPoint().isOutputShutdown())
- getEndPoint().close();
- else
- getEndPoint().shutdownOutput();
- // buffer must be empty and the channel must be idle, so we can release.
- releaseRequestBuffer();
- return;
+ if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
+ filled = getEndPoint().fill(_requestBuffer);
+
+ // tell parser
+ if (filled < 0)
+ _parser.atEOF();
}
}
- else
+
+ // Parse the buffer
+ if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
{
- // TODO work out how we can get here and a better way to handle it
- LOG.warn("Unexpected state: "+this+ " "+_channel+" "+_channel.getRequest());
- if (!_channel.getState().isSuspended())
- getEndPoint().close();
- return;
+ // The parser returned true, which indicates the channel is ready to handle a request.
+ // Call the channel and this will either handle the request/response to completion OR,
+ // if the request suspends, the request/response will be incomplete so the outer loop will exit.
+ suspended = !_channel.handle();
}
+
}
}
catch (EofException e)
@@ -290,10 +231,22 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
close();
}
finally
- {
+ {
setCurrentConnection(null);
+ if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
+ {
+ fillInterested();
+ }
}
}
+
+
+ @Override
+ protected void onFillInterestedFailed(Throwable cause)
+ {
+ _parser.close();
+ super.onFillInterestedFailed(cause);
+ }
@Override
public void onOpen()
@@ -308,32 +261,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
onFillable();
}
- @Override
- public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
- {
- try
- {
- if (info==null)
- new ContentCallback(content,lastContent,_writeBlocker).iterate();
- else
- {
- // If we are still expecting a 100 continues
- if (_channel.isExpecting100Continue())
- // then we can't be persistent
- _generator.setPersistent(false);
- new CommitCallback(info,content,lastContent,_writeBlocker).iterate();
- }
- _writeBlocker.block();
- }
- catch (ClosedChannelException e)
- {
- throw new EofException(e);
- }
- catch (IOException e)
- {
- throw e;
- }
- }
@Override
public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
@@ -359,11 +286,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public void completed()
{
- // Finish consuming the request
- if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue())
- // Complete reading the request
- _channel.getRequest().getHttpInput().consumeAll();
-
// Handle connection upgrades
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
@@ -374,34 +296,58 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
onClose();
getEndPoint().setConnection(connection);
connection.onOpen();
- reset();
+ _channel.reset();
+ _parser.reset();
+ _generator.reset();
+ releaseRequestBuffer();
return;
}
}
+
+ // Finish consuming the request
+ // If we are still expecting
+ if (_channel.isExpecting100Continue())
+ // close to seek EOF
+ _parser.close();
+ else if (_parser.inContentState() && _generator.isPersistent())
+ // Complete reading the request
+ _channel.getRequest().getHttpInput().consumeAll();
- reset();
+ // Reset the channel, parsers and generator
+ _channel.reset();
+ if (_generator.isPersistent() && !_parser.isClosed())
+ _parser.reset();
+ else
+ _parser.close();
+ releaseRequestBuffer();
+ if (_chunk!=null)
+ _bufferPool.release(_chunk);
+ _chunk=null;
+ _generator.reset();
// if we are not called from the onfillable thread, schedule completion
if (getCurrentConnection()!=this)
{
+ // If we are looking for the next request
if (_parser.isStart())
{
- // it wants to eat more
+ // if the buffer is empty
if (_requestBuffer == null)
{
+ // look for more data
fillInterested();
}
- else if (getConnector().isStarted())
+ // else if we are still running
+ else if (getConnector().isRunning())
{
- LOG.debug("{} pipelined", this);
-
+ // Dispatched to handle a pipelined request
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
- if (getConnector().isStarted())
+ if (getConnector().isRunning())
LOG.warn(e);
else
LOG.ignore(e);
@@ -413,139 +359,34 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
getEndPoint().close();
}
}
+ // else the parser must be closed, so seek the EOF if we are still open
+ else if (getEndPoint().isOpen())
+ fillInterested();
}
}
- public ByteBuffer getRequestBuffer()
- {
- return _requestBuffer;
- }
-
- private class Input extends ByteBufferHttpInput
+ private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
{
- @Override
- protected void blockForContent() throws IOException
- {
- /* We extend the blockForContent method to replace the
- default implementation of a blocking queue with an implementation
- that uses the calling thread to block on a readable callback and
- then to do the parsing before before attempting the read.
- */
- while (!_parser.isComplete())
- {
- // Can the parser progress (even with an empty buffer)
- boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
-
- // If there is more content to parse, loop so we can queue all content from this buffer now without the
- // need to call blockForContent again
- while (!event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
- event=_parser.parseNext(_requestBuffer);
-
- // If we have content, return
- if (_parser.isComplete() || available()>0)
- return;
-
- // Do we have content ready to parse?
- if (BufferUtil.isEmpty(_requestBuffer))
- {
- // If no more input
- if (getEndPoint().isInputShutdown())
- {
- _parser.shutdownInput();
- shutdown();
- return;
- }
-
- // Wait until we can read
- block(_readBlocker);
- LOG.debug("{} block readable on {}",this,_readBlocker);
- _readBlocker.block();
-
- // We will need a buffer to read into
- if (_requestBuffer==null)
- {
- long content_length=_channel.getRequest().getContentLength();
- int size=getInputBufferSize();
- if (size<content_length)
- size=size*4; // TODO tune this
- _requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
- }
-
- // read some data
- int filled=getEndPoint().fill(_requestBuffer);
- LOG.debug("{} block filled {}",this,filled);
- if (filled<0)
- {
- _parser.shutdownInput();
- return;
- }
- }
- }
- }
-
- @Override
- protected void onContentQueued(ByteBuffer ref)
- {
- /* This callback could be used to tell the connection
- * that the request did contain content and thus the request
- * buffer needs to be held until a call to #onAllContentConsumed
- *
- * However it turns out that nothing is needed here because either a
- * request will have content, in which case the request buffer will be
- * released by a call to onAllContentConsumed; or it will not have content.
- * If it does not have content, either it will complete quickly and the
- * buffers will be released in completed() or it will be suspended and
- * onReadable() contains explicit handling to release if it is suspended.
- *
- * We extend this method anyway, to turn off the notify done by the
- * default implementation as this is not needed by our implementation
- * of blockForContent
- */
- }
-
- @Override
- public void earlyEOF()
- {
- synchronized (lock())
- {
- _inputEOF=true;
- _earlyEOF = true;
- LOG.debug("{} early EOF", this);
- }
- }
-
- @Override
- public void shutdown()
+ public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
{
- synchronized (lock())
- {
- _inputEOF=true;
- LOG.debug("{} shutdown", this);
- }
+ super(connector,config,endPoint,transport,input);
}
@Override
- protected void onAllContentConsumed()
+ public void earlyEOF()
{
- /* This callback tells the connection that all content that has
- * been parsed has been consumed. Thus the request buffer may be
- * released if it is empty.
- */
- releaseRequestBuffer();
+ // If we have no request yet, just close
+ if (getRequest().getMethod()==null)
+ close();
+ else
+ super.earlyEOF();
}
@Override
- public String toString()
+ public boolean content(ByteBuffer item)
{
- return super.toString()+"{"+_channel+","+HttpConnection.this+"}";
- }
- }
-
- private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
- {
- public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
- {
- super(connector,config,endPoint,transport,input);
+ super.content(item);
+ return true;
}
@Override
@@ -612,7 +453,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
- private class CommitCallback extends IteratingCallback
+ private class CommitCallback extends IteratingNestedCallback
{
final ByteBuffer _content;
final boolean _lastContent;
@@ -733,7 +574,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
- private class ContentCallback extends IteratingCallback
+ private class ContentCallback extends IteratingNestedCallback
{
final ByteBuffer _content;
final boolean _lastContent;
@@ -814,4 +655,5 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
}
+
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
index bbed8bd2fa..a9dba922ad 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java
@@ -19,13 +19,11 @@
package org.eclipse.jetty.server;
import java.io.IOException;
-import java.io.InterruptedIOException;
-
+import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
-import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@@ -39,35 +37,82 @@ import org.eclipse.jetty.util.log.Logger;
* {@link #onContentConsumed(T)} and {@link #onAllContentConsumed()} that can be implemented so that the
* caller will know when buffers are queued and consumed.</p>
*/
-public abstract class HttpInput<T> extends ServletInputStream
+/**
+ * @author gregw
+ *
+ * @param <T>
+ */
+/**
+ * @author gregw
+ *
+ * @param <T>
+ */
+public abstract class HttpInput<T> extends ServletInputStream implements Runnable
{
private final static Logger LOG = Log.getLogger(HttpInput.class);
- private final ArrayQueue<T> _inputQ = new ArrayQueue<>();
- protected boolean _earlyEOF;
- protected boolean _inputEOF;
- public Object lock()
+ private HttpChannelState _channelState;
+ private Throwable _onError;
+ private ReadListener _listener;
+ private boolean _notReady;
+
+ protected State _state = BLOCKING;
+ private State _eof=null;
+ private final Object _lock;
+
+ protected HttpInput()
{
- return _inputQ.lock();
+ this(null);
+ }
+
+ protected HttpInput(Object lock)
+ {
+ _lock=lock==null?this:lock;
+ }
+
+ public final Object lock()
+ {
+ return _lock;
}
public void recycle()
{
synchronized (lock())
{
- T item = _inputQ.peekUnsafe();
- while (item != null)
- {
- _inputQ.pollUnsafe();
- onContentConsumed(item);
+ _state = BLOCKING;
+ _eof=null;
+ _onError=null;
+ }
+ }
- item = _inputQ.peekUnsafe();
- if (item == null)
- onAllContentConsumed();
- }
- _inputEOF = false;
- _earlyEOF = false;
+ /**
+ * Access the next content to be consumed from. Returning the next item does not consume it
+ * and it may be returned multiple times until it is consumed. Calls to {@link #get(Object, byte[], int, int)}
+ * or {@link #consume(Object, int)} are required to consume data from the content.
+ * @return Content or null if none available.
+ * @throws IOException
+ */
+ protected abstract T nextContent() throws IOException;
+
+ /**
+ * A convenience method to call nextContent and to check the return value, which if null then the
+ * a check is made for EOF and the state changed accordingly.
+ * @see #nextContent()
+ * @return Content or null if none available.
+ * @throws IOException
+ */
+ protected T getNextContent() throws IOException
+ {
+ T content=nextContent();
+
+ if (content==null && _eof!=null)
+ {
+ LOG.debug("{} eof {}",this,_eof);
+ _state=_eof;
+ _eof=null;
}
+
+ return content;
}
@Override
@@ -81,12 +126,17 @@ public abstract class HttpInput<T> extends ServletInputStream
@Override
public int available()
{
- synchronized (lock())
+ try
{
- T item = _inputQ.peekUnsafe();
- if (item == null)
- return 0;
- return remaining(item);
+ synchronized (lock())
+ {
+ T item = getNextContent();
+ return item==null?0:remaining(item);
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeIOException(e);
}
}
@@ -96,200 +146,299 @@ public abstract class HttpInput<T> extends ServletInputStream
T item = null;
synchronized (lock())
{
+ // System.err.printf("read s=%s q=%d e=%s%n",_state,_inputQ.size(),_eof);
+
// Get the current head of the input Q
- item = _inputQ.peekUnsafe();
-
- // Skip empty items at the head of the queue
- while (item != null && remaining(item) == 0)
- {
- _inputQ.pollUnsafe();
- onContentConsumed(item);
- LOG.debug("{} consumed {}", this, item);
- item = _inputQ.peekUnsafe();
-
- // If that was the last item then notify
- if (item==null)
- onAllContentConsumed();
- }
+ item = getNextContent();
// If we have no item
if (item == null)
{
- // Was it unexpectedly EOF'd?
- if (isEarlyEOF())
- throw new EofException();
-
- // check for EOF
- if (isShutdown())
- {
- onEOF();
- return -1;
- }
-
- // OK then block for some more content
- blockForContent();
-
- // If still not content, we must be closed
- item = _inputQ.peekUnsafe();
+ _state.waitForContent(this);
+ item=getNextContent();
if (item==null)
- {
- if (isEarlyEOF())
- throw new EofException();
-
- // blockForContent will only return with no
- // content if it is closed.
- if (!isShutdown())
- LOG.warn("Unexpected !EOF: "+this);
-
- onEOF();
- return -1;
- }
+ return _state.noContent();
}
}
+
return get(item, b, off, len);
}
protected abstract int remaining(T item);
protected abstract int get(T item, byte[] buffer, int offset, int length);
- protected abstract void onContentConsumed(T item);
+ protected abstract void consume(T item, int length);
+
+ protected abstract void blockForContent() throws IOException;
- protected void blockForContent() throws IOException
+ protected boolean onAsyncRead()
{
- synchronized (lock())
- {
- while (_inputQ.isEmpty() && !isShutdown() && !isEarlyEOF())
- {
- try
- {
- LOG.debug("{} waiting for content", this);
- lock().wait();
- }
- catch (InterruptedException e)
- {
- throw (IOException)new InterruptedIOException().initCause(e);
- }
- }
- }
+ if (_listener==null)
+ return false;
+ _channelState.onReadPossible();
+ return true;
}
/* ------------------------------------------------------------ */
- /** Called by this HttpInput to signal new content has been queued
+ /** Add some content to the input stream
* @param item
*/
- protected void onContentQueued(T item)
- {
- lock().notify();
- }
+ public abstract void content(T item);
+
/* ------------------------------------------------------------ */
- /** Called by this HttpInput to signal all available content has been consumed
+ /** This method should be called to signal to the HttpInput
+ * that an EOF has arrived before all the expected content.
+ * Typically this will result in an EOFException being thrown
+ * from a subsequent read rather than a -1 return.
*/
- protected void onAllContentConsumed()
+ public void earlyEOF()
{
+ synchronized (lock())
+ {
+ if (_eof==null || !_eof.isEOF())
+ {
+ LOG.debug("{} early EOF", this);
+ _eof=EARLY_EOF;
+ if (_listener!=null)
+ _channelState.onReadPossible();
+ }
+ }
}
/* ------------------------------------------------------------ */
- /** Called by this HttpInput to signal it has reached EOF
- */
- protected void onEOF()
+ public void messageComplete()
{
+ synchronized (lock())
+ {
+ if (_eof==null || !_eof.isEOF())
+ {
+ LOG.debug("{} EOF", this);
+ _eof=EOF;
+ if (_listener!=null)
+ _channelState.onReadPossible();
+ }
+ }
}
/* ------------------------------------------------------------ */
- /** Add some content to the input stream
- * @param item
- */
- public void content(T item)
+ public void consumeAll()
{
synchronized (lock())
{
- // The buffer is not copied here. This relies on the caller not recycling the buffer
- // until the it is consumed. The onAllContentConsumed() callback is the signal to the
- // caller that the buffers can be recycled.
- _inputQ.add(item);
- onContentQueued(item);
- LOG.debug("{} queued {}", this, item);
+ try
+ {
+ while (!isFinished())
+ {
+ T item = getNextContent();
+ if (item==null)
+ _state.waitForContent(this);
+ else
+ consume(item,remaining(item));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeIOException(e);
+ }
}
}
- /* ------------------------------------------------------------ */
- /** This method should be called to signal to the HttpInput
- * that an EOF has arrived before all the expected content.
- * Typically this will result in an EOFException being thrown
- * from a subsequent read rather than a -1 return.
- */
- public void earlyEOF()
+ @Override
+ public boolean isFinished()
{
synchronized (lock())
{
- _earlyEOF = true;
- _inputEOF = true;
- lock().notify();
- LOG.debug("{} early EOF", this);
+ return _state.isEOF();
}
}
- /* ------------------------------------------------------------ */
- public boolean isEarlyEOF()
+ @Override
+ public boolean isReady()
{
synchronized (lock())
{
- return _earlyEOF;
+ if (_listener==null)
+ return true;
+ int available = available();
+ if (available>0)
+ return true;
+ if (!_notReady)
+ {
+ _notReady=true;
+ if (_state.isEOF())
+ _channelState.onReadPossible();
+ else
+ unready();
+ }
+ return false;
}
}
- /* ------------------------------------------------------------ */
- public void shutdown()
+ protected void unready()
{
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener)
+ {
+ if (readListener==null)
+ throw new NullPointerException("readListener==null");
synchronized (lock())
{
- _inputEOF = true;
- lock().notify();
- LOG.debug("{} shutdown", this);
+ if (_state!=BLOCKING)
+ throw new IllegalStateException("state="+_state);
+ _state=ASYNC;
+ _listener=readListener;
+ _notReady=true;
+
+ _channelState.onReadPossible();
}
}
- /* ------------------------------------------------------------ */
- public boolean isShutdown()
+ public void failed(Throwable x)
{
synchronized (lock())
{
- return _inputEOF;
+ if (_onError==null)
+ LOG.warn(x);
+ else
+ _onError=x;
}
}
- /* ------------------------------------------------------------ */
- public void consumeAll()
+ @Override
+ public void run()
{
+ final boolean available;
+ final boolean eof;
+ final Throwable x;
+
synchronized (lock())
{
- T item = _inputQ.peekUnsafe();
- loop: while (!isShutdown() && !isEarlyEOF())
+ if (!_notReady || _listener==null)
+ return;
+
+ x=_onError;
+ T item;
+ try
{
- while (item != null)
- {
- _inputQ.pollUnsafe();
- onContentConsumed(item);
+ item = getNextContent();
+ }
+ catch(Exception e)
+ {
+ item=null;
+ failed(e);
+ }
+ available= item!=null && remaining(item)>0;
- item = _inputQ.peekUnsafe();
- if (item == null)
- onAllContentConsumed();
- }
+ eof = !available && _state.isEOF();
+ _notReady=!available&&!eof;
+ }
- try
- {
- blockForContent();
- item = _inputQ.peekUnsafe();
- if (item==null)
- break;
- }
- catch (IOException e)
- {
- LOG.warn(e);
- break loop;
- }
- }
+ try
+ {
+ if (x!=null)
+ _listener.onError(x);
+ else if (available)
+ _listener.onDataAvailable();
+ else if (eof)
+ _listener.onAllDataRead();
+ else
+ unready();
+ }
+ catch(Throwable e)
+ {
+ LOG.warn(e.toString());
+ LOG.debug(e);
+ _listener.onError(e);
+ }
+ }
+
+
+ protected static class State
+ {
+ public void waitForContent(HttpInput<?> in) throws IOException
+ {
+ }
+
+ public int noContent() throws IOException
+ {
+ return -1;
+ }
+
+ public boolean isEOF()
+ {
+ return false;
+ }
+ }
+
+ protected static final State BLOCKING= new State()
+ {
+ @Override
+ public void waitForContent(HttpInput<?> in) throws IOException
+ {
+ in.blockForContent();
+ }
+ public String toString()
+ {
+ return "OPEN";
+ }
+ };
+
+ protected static final State ASYNC= new State()
+ {
+ @Override
+ public int noContent() throws IOException
+ {
+ return 0;
+ }
+ @Override
+ public String toString()
+ {
+ return "ASYNC";
+ }
+ };
+
+ protected static final State EARLY_EOF= new State()
+ {
+ @Override
+ public int noContent() throws IOException
+ {
+ throw new EofException();
+ }
+ @Override
+ public boolean isEOF()
+ {
+ return true;
+ }
+ public String toString()
+ {
+ return "EARLY_EOF";
+ }
+ };
+
+ protected static final State EOF= new State()
+ {
+ @Override
+ public boolean isEOF()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return "EOF";
+ }
+ };
+
+
+ public void init(HttpChannelState state)
+ {
+ synchronized (lock())
+ {
+ _channelState=state;
}
}
+
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java
new file mode 100644
index 0000000000..53f2c1287c
--- /dev/null
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInputOverHTTP.java
@@ -0,0 +1,168 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.eclipse.jetty.util.BlockingCallback;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+class HttpInputOverHTTP extends HttpInput<ByteBuffer> implements Callback
+{
+ private static final Logger LOG = Log.getLogger(HttpInputOverHTTP.class);
+ private final BlockingCallback _readBlocker = new BlockingCallback();
+ private final HttpConnection _httpConnection;
+ private ByteBuffer _content;
+
+ /**
+ * @param httpConnection
+ */
+ HttpInputOverHTTP(HttpConnection httpConnection)
+ {
+ _httpConnection = httpConnection;
+ }
+
+ @Override
+ public void recycle()
+ {
+ synchronized (lock())
+ {
+ super.recycle();
+ _content=null;
+ }
+ }
+
+ @Override
+ protected void blockForContent() throws IOException
+ {
+ while(true)
+ {
+ _httpConnection.fillInterested(_readBlocker);
+ LOG.debug("{} block readable on {}",this,_readBlocker);
+ _readBlocker.block();
+
+ Object content=getNextContent();
+ if (content!=null || isFinished())
+ break;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s@%x",getClass().getSimpleName(),hashCode());
+ }
+
+ @Override
+ protected ByteBuffer nextContent() throws IOException
+ {
+ // If we have some content available, return it
+ if (BufferUtil.hasContent(_content))
+ return _content;
+
+ // No - then we are going to need to parse some more content
+ _content=null;
+ ByteBuffer requestBuffer = _httpConnection.getRequestBuffer();
+
+ while (!_httpConnection.getParser().isComplete())
+ {
+ // Can the parser progress (even with an empty buffer)
+ _httpConnection.getParser().parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
+
+ // If we got some content, that will do for now!
+ if (BufferUtil.hasContent(_content))
+ return _content;
+
+ // No, we can we try reading some content?
+ if (BufferUtil.isEmpty(requestBuffer) && _httpConnection.getEndPoint().isInputShutdown())
+ {
+ _httpConnection.getParser().atEOF();
+ continue;
+ }
+
+ // OK lets read some data
+ int filled=_httpConnection.getEndPoint().fill(requestBuffer);
+ if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
+ LOG.debug("{} filled {}",this,filled);
+ if (filled<=0)
+ {
+ if (filled<0)
+ {
+ _httpConnection.getParser().atEOF();
+ continue;
+ }
+ return null;
+ }
+ }
+
+ return null;
+
+ }
+
+ @Override
+ protected int remaining(ByteBuffer item)
+ {
+ return item.remaining();
+ }
+
+ @Override
+ protected int get(ByteBuffer item, byte[] buffer, int offset, int length)
+ {
+ int l = Math.min(item.remaining(), length);
+ item.get(buffer, offset, l);
+ return l;
+ }
+
+ @Override
+ protected void consume(ByteBuffer item, int length)
+ {
+ item.position(item.position()+length);
+ }
+
+ @Override
+ public void content(ByteBuffer item)
+ {
+ if (BufferUtil.hasContent(_content))
+ throw new IllegalStateException();
+ _content=item;
+ }
+
+ @Override
+ protected void unready()
+ {
+ _httpConnection.fillInterested(this);
+ }
+
+ @Override
+ public void succeeded()
+ {
+ _httpConnection.getHttpChannel().getState().onReadPossible();
+ }
+
+ @Override
+ public void failed(Throwable x)
+ {
+ super.failed(x);
+ _httpConnection.getHttpChannel().getState().onReadPossible();
+ }
+}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
index bbea718ffa..dd0fd8228a 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
@@ -18,26 +18,27 @@
package org.eclipse.jetty.server;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritePendingException;
+import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
+import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
-import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
+import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
-import org.eclipse.jetty.util.resource.Resource;
/**
* <p>{@link HttpOutput} implements {@link ServletOutputStream}
@@ -49,19 +50,34 @@ import org.eclipse.jetty.util.resource.Resource;
* via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
* close the stream, to be reopened after the inclusion ends.</p>
*/
-public class HttpOutput extends ServletOutputStream
+public class HttpOutput extends ServletOutputStream implements Runnable
{
private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel<?> _channel;
- private boolean _closed;
private long _written;
private ByteBuffer _aggregate;
private int _bufferSize;
+ private int _commitSize;
+ private WriteListener _writeListener;
+ private volatile Throwable _onError;
+
+ /*
+ ACTION OPEN ASYNC READY PENDING UNREADY
+ -------------------------------------------------------------------------------
+ setWriteListener() READY->owp ise ise ise ise
+ write() OPEN ise PENDING wpe wpe
+ flush() OPEN ise PENDING wpe wpe
+ isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false
+ write completed - - - ASYNC READY->owp
+ */
+ enum State { OPEN, ASYNC, READY, PENDING, UNREADY, CLOSED }
+ private final AtomicReference<State> _state=new AtomicReference<>(State.OPEN);
public HttpOutput(HttpChannel<?> channel)
{
_channel = channel;
_bufferSize = _channel.getHttpConfiguration().getOutputBufferSize();
+ _commitSize=_bufferSize/4;
}
public boolean isWritten()
@@ -82,49 +98,63 @@ public class HttpOutput extends ServletOutputStream
public void reopen()
{
- _closed = false;
+ _state.set(State.OPEN);
}
- /** Called by the HttpChannel if the output was closed
- * externally (eg by a 500 exception handling).
- */
- void closed()
+ public boolean isAllContentWritten()
{
- if (!_closed)
- {
- _closed = true;
- try
- {
- _channel.getResponse().closeOutput();
- }
- catch(IOException e)
- {
- _channel.failed();
- LOG.ignore(e);
- }
- releaseBuffer();
- }
+ return _channel.getResponse().isAllContentWritten(_written);
}
@Override
public void close()
{
- if (!isClosed())
+ State state=_state.get();
+ while(state!=State.CLOSED)
{
- try
+ if (_state.compareAndSet(state,State.CLOSED))
{
- if (BufferUtil.hasContent(_aggregate))
- _channel.write(_aggregate, !_channel.getResponse().isIncluding());
- else
- _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
+ try
+ {
+ if (BufferUtil.hasContent(_aggregate))
+ _channel.write(_aggregate, !_channel.getResponse().isIncluding());
+ else
+ _channel.write(BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
+ }
+ catch(IOException e)
+ {
+ LOG.debug(e);
+ _channel.failed();
+ }
+ releaseBuffer();
+ return;
}
- catch(IOException e)
+ state=_state.get();
+ }
+ }
+
+ /* Called to indicated that the output is already closed and the state needs to be updated to match */
+ void closed()
+ {
+ State state=_state.get();
+ while(state!=State.CLOSED)
+ {
+ if (_state.compareAndSet(state,State.CLOSED))
{
- _channel.failed();
- LOG.ignore(e);
+ try
+ {
+ _channel.getResponse().closeOutput();
+ }
+ catch(IOException e)
+ {
+ LOG.debug(e);
+ _channel.failed();
+ }
+ releaseBuffer();
+ return;
}
+ state=_state.get();
}
- closed();
}
private void releaseBuffer()
@@ -138,44 +168,109 @@ public class HttpOutput extends ServletOutputStream
public boolean isClosed()
{
- return _closed;
+ return _state.get()==State.CLOSED;
}
@Override
public void flush() throws IOException
{
- if (isClosed())
- return;
-
- if (BufferUtil.hasContent(_aggregate))
- _channel.write(_aggregate, false);
- else
- _channel.write(BufferUtil.EMPTY_BUFFER, false);
+ while(true)
+ {
+ switch(_state.get())
+ {
+ case OPEN:
+ if (BufferUtil.hasContent(_aggregate))
+ _channel.write(_aggregate, false);
+ else
+ _channel.write(BufferUtil.EMPTY_BUFFER, false);
+ return;
+
+ case ASYNC:
+ throw new IllegalStateException("isReady() not called");
+
+ case READY:
+ if (!_state.compareAndSet(State.READY, State.PENDING))
+ continue;
+ new AsyncFlush().process();
+ return;
+
+ case PENDING:
+ case UNREADY:
+ throw new WritePendingException();
+
+ case CLOSED:
+ return;
+ }
+ break;
+ }
}
- public boolean isAllContentWritten()
- {
- Response response=_channel.getResponse();
- return response.isAllContentWritten(_written);
- }
-
- public void closeOutput() throws IOException
- {
- _channel.getResponse().closeOutput();
- }
@Override
public void write(byte[] b, int off, int len) throws IOException
- {
- if (isClosed())
- throw new EofException("Closed");
-
+ {
_written+=len;
boolean complete=_channel.getResponse().isAllContentWritten(_written);
- int capacity = getBufferSize();
+
+ // Async or Blocking ?
+ while(true)
+ {
+ switch(_state.get())
+ {
+ case OPEN:
+ // process blocking below
+ break;
+
+ case ASYNC:
+ throw new IllegalStateException("isReady() not called");
+
+ case READY:
+ if (!_state.compareAndSet(State.READY, State.PENDING))
+ continue;
+
+ // Should we aggregate?
+ int capacity = getBufferSize();
+ if (!complete && len<=_commitSize)
+ {
+ if (_aggregate == null)
+ _aggregate = _channel.getByteBufferPool().acquire(capacity, false);
+
+ // YES - fill the aggregate with content from the buffer
+ int filled = BufferUtil.fill(_aggregate, b, off, len);
+
+ // return if we are not complete, not full and filled all the content
+ if (filled==len && !BufferUtil.isFull(_aggregate))
+ {
+ if (!_state.compareAndSet(State.PENDING, State.ASYNC))
+ throw new IllegalStateException();
+ return;
+ }
+
+ // adjust offset/length
+ off+=filled;
+ len-=filled;
+ }
+
+ // Do the asynchronous writing from the callback
+ new AsyncWrite(b,off,len,complete).process();
+ return;
+
+ case PENDING:
+ case UNREADY:
+ throw new WritePendingException();
+
+ case CLOSED:
+ throw new EofException("Closed");
+ }
+ break;
+ }
+
+
+ // handle blocking write
// Should we aggregate?
- if (!complete && len<=capacity/4)
+ int capacity = getBufferSize();
+ if (!complete && len<=_commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, false);
@@ -184,12 +279,12 @@ public class HttpOutput extends ServletOutputStream
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
- if (!complete && filled == len && !BufferUtil.isFull(_aggregate))
+ if (filled==len && !BufferUtil.isFull(_aggregate))
return;
// adjust offset/length
- off += filled;
- len -= filled;
+ off+=filled;
+ len-=filled;
}
// flush any content from the aggregate
@@ -198,7 +293,7 @@ public class HttpOutput extends ServletOutputStream
_channel.write(_aggregate, complete && len==0);
// should we fill aggregate again from the buffer?
- if (len>0 && !complete && len<=_aggregate.capacity()/4)
+ if (len>0 && !complete && len<=_commitSize)
{
BufferUtil.append(_aggregate, b, off, len);
return;
@@ -212,37 +307,128 @@ public class HttpOutput extends ServletOutputStream
_channel.write(BufferUtil.EMPTY_BUFFER,complete);
if (complete)
+ {
closed();
- }
+ }
+ }
- @Override
- public void write(int b) throws IOException
+ public void write(ByteBuffer buffer) throws IOException
{
- if (isClosed())
- throw new EOFException("Closed");
+ _written+=buffer.remaining();
+ boolean complete=_channel.getResponse().isAllContentWritten(_written);
- // Allocate an aggregate buffer.
- // Never direct as it is slow to do little writes to a direct buffer.
- if (_aggregate == null)
- _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
+ // Async or Blocking ?
+ while(true)
+ {
+ switch(_state.get())
+ {
+ case OPEN:
+ // process blocking below
+ break;
+
+ case ASYNC:
+ throw new IllegalStateException("isReady() not called");
+
+ case READY:
+ if (!_state.compareAndSet(State.READY, State.PENDING))
+ continue;
+
+ // Do the asynchronous writing from the callback
+ new AsyncWrite(buffer,complete).process();
+ return;
+
+ case PENDING:
+ case UNREADY:
+ throw new WritePendingException();
+
+ case CLOSED:
+ throw new EofException("Closed");
+ }
+ break;
+ }
+
+
+ // handle blocking write
+ int len=BufferUtil.length(buffer);
+
+ // flush any content from the aggregate
+ if (BufferUtil.hasContent(_aggregate))
+ _channel.write(_aggregate, complete && len==0);
- BufferUtil.append(_aggregate, (byte)b);
- _written++;
+ // write any remaining content in the buffer directly
+ if (len>0)
+ _channel.write(buffer, complete);
+ else if (complete)
+ _channel.write(BufferUtil.EMPTY_BUFFER,complete);
+
+ if (complete)
+ closed();
+ }
+ @Override
+ public void write(int b) throws IOException
+ {
+ _written+=1;
boolean complete=_channel.getResponse().isAllContentWritten(_written);
-
- // Check if all written or full
- if (complete || BufferUtil.isFull(_aggregate))
- {
- BlockingCallback callback = _channel.getWriteBlockingCallback();
- _channel.write(_aggregate, false, callback);
- callback.block();
- if (complete)
- closed();
+
+ // Async or Blocking ?
+ while(true)
+ {
+ switch(_state.get())
+ {
+ case OPEN:
+ if (_aggregate == null)
+ _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
+ BufferUtil.append(_aggregate, (byte)b);
+
+ // Check if all written or full
+ if (complete || BufferUtil.isFull(_aggregate))
+ {
+ BlockingCallback callback = _channel.getWriteBlockingCallback();
+ _channel.write(_aggregate, complete, callback);
+ callback.block();
+ if (complete)
+ closed();
+ }
+ break;
+
+ case ASYNC:
+ throw new IllegalStateException("isReady() not called");
+
+ case READY:
+ if (!_state.compareAndSet(State.READY, State.PENDING))
+ continue;
+
+ if (_aggregate == null)
+ _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);
+ BufferUtil.append(_aggregate, (byte)b);
+
+ // Check if all written or full
+ if (!complete && !BufferUtil.isFull(_aggregate))
+ {
+ if (!_state.compareAndSet(State.PENDING, State.ASYNC))
+ throw new IllegalStateException();
+ return;
+ }
+
+ // Do the asynchronous writing from the callback
+ new AsyncFlush().process();
+ return;
+
+ case PENDING:
+ case UNREADY:
+ throw new WritePendingException();
+
+ case CLOSED:
+ throw new EofException("Closed");
+ }
+ break;
}
}
+
+
@Override
public void print(String s) throws IOException
{
@@ -253,51 +439,6 @@ public class HttpOutput extends ServletOutputStream
}
/* ------------------------------------------------------------ */
- /** Set headers and send content.
- * @deprecated Use {@link Response#setHeaders(HttpContent)} and {@link #sendContent(HttpContent)} instead.
- * @param content
- * @throws IOException
- */
- @Deprecated
- public void sendContent(Object content) throws IOException
- {
- final BlockingCallback callback =_channel.getWriteBlockingCallback();
-
- if (content instanceof HttpContent)
- {
- _channel.getResponse().setHeaders((HttpContent)content);
- sendContent((HttpContent)content,callback);
- }
- else if (content instanceof Resource)
- {
- Resource resource = (Resource)content;
- _channel.getResponse().getHttpFields().putDateField(HttpHeader.LAST_MODIFIED, resource.lastModified());
-
- ReadableByteChannel in=((Resource)content).getReadableByteChannel();
- if (in!=null)
- sendContent(in,callback);
- else
- sendContent(resource.getInputStream(),callback);
- }
- else if (content instanceof ByteBuffer)
- {
- sendContent((ByteBuffer)content,callback);
- }
- else if (content instanceof ReadableByteChannel)
- {
- sendContent((ReadableByteChannel)content,callback);
- }
- else if (content instanceof InputStream)
- {
- sendContent((InputStream)content,callback);
- }
- else
- callback.failed(new IllegalArgumentException("unknown content type "+content.getClass()));
-
- callback.block();
- }
-
- /* ------------------------------------------------------------ */
/** Blocking send of content.
* @param content The content to send
* @throws IOException
@@ -332,7 +473,7 @@ public class HttpOutput extends ServletOutputStream
new ReadableByteChannelWritingCB(in,callback).iterate();
callback.block();
}
-
+
/* ------------------------------------------------------------ */
/** Blocking send of content.
@@ -345,7 +486,7 @@ public class HttpOutput extends ServletOutputStream
sendContent(content,callback);
callback.block();
}
-
+
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
@@ -367,13 +508,13 @@ public class HttpOutput extends ServletOutputStream
public void failed(Throwable x)
{
callback.failed(x);
- }
+ }
});
}
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
- * @param in The content to send as a stream. The stream will be closed
+ * @param in The content to send as a stream. The stream will be closed
* after reading all content.
* @param callback The callback to use to notify success or failure
*/
@@ -384,7 +525,7 @@ public class HttpOutput extends ServletOutputStream
/* ------------------------------------------------------------ */
/** Asynchronous send of content.
- * @param in The content to send as a channel. The channel will be closed
+ * @param in The content to send as a channel. The channel will be closed
* after reading all content.
* @param callback The callback to use to notify success or failure
*/
@@ -400,23 +541,36 @@ public class HttpOutput extends ServletOutputStream
*/
public void sendContent(HttpContent httpContent, Callback callback) throws IOException
{
- if (isClosed())
- throw new IOException("Closed");
if (BufferUtil.hasContent(_aggregate))
throw new IOException("written");
if (_channel.isCommitted())
throw new IOException("committed");
-
+
+ while (true)
+ {
+ switch(_state.get())
+ {
+ case OPEN:
+ if (!_state.compareAndSet(State.OPEN, State.PENDING))
+ continue;
+ break;
+ case CLOSED:
+ throw new EofException("Closed");
+ default:
+ throw new IllegalStateException();
+ }
+ break;
+ }
ByteBuffer buffer= _channel.useDirectBuffers()?httpContent.getDirectBuffer():null;
if (buffer == null)
buffer = httpContent.getIndirectBuffer();
-
+
if (buffer!=null)
{
sendContent(buffer,callback);
return;
}
-
+
ReadableByteChannel rbc=httpContent.getReadableByteChannel();
if (rbc!=null)
{
@@ -424,7 +578,7 @@ public class HttpOutput extends ServletOutputStream
sendContent(rbc,callback);
return;
}
-
+
InputStream in = httpContent.getInputStream();
if ( in!=null )
{
@@ -442,7 +596,8 @@ public class HttpOutput extends ServletOutputStream
public void setBufferSize(int size)
{
- this._bufferSize = size;
+ _bufferSize = size;
+ _commitSize = size;
}
public void resetBuffer()
@@ -450,24 +605,223 @@ public class HttpOutput extends ServletOutputStream
if (BufferUtil.hasContent(_aggregate))
BufferUtil.clear(_aggregate);
}
-
-
+
+ @Override
+ public void setWriteListener(WriteListener writeListener)
+ {
+ if (!_channel.getState().isAsync())
+ throw new IllegalStateException("!ASYNC");
+
+ if (_state.compareAndSet(State.OPEN, State.READY))
+ {
+ _writeListener = writeListener;
+ _channel.getState().onWritePossible();
+ }
+ else
+ throw new IllegalStateException();
+ }
+
+ /**
+ * @see javax.servlet.ServletOutputStream#isReady()
+ */
+ @Override
+ public boolean isReady()
+ {
+ while (true)
+ {
+ switch(_state.get())
+ {
+ case OPEN:
+ return true;
+ case ASYNC:
+ if (!_state.compareAndSet(State.ASYNC, State.READY))
+ continue;
+ return true;
+ case READY:
+ return true;
+ case PENDING:
+ if (!_state.compareAndSet(State.PENDING, State.UNREADY))
+ continue;
+ return false;
+ case UNREADY:
+ return false;
+ case CLOSED:
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ if(_onError!=null)
+ {
+ Throwable th=_onError;
+ _onError=null;
+ _writeListener.onError(th);
+ close();
+ }
+ if (_state.get()==State.READY)
+ {
+ try
+ {
+ _writeListener.onWritePossible();
+ }
+ catch (Throwable e)
+ {
+ _writeListener.onError(e);
+ close();
+ }
+ }
+ }
+
+ private class AsyncWrite extends AsyncFlush
+ {
+ private final ByteBuffer _buffer;
+ private final boolean _complete;
+ private final int _len;
+
+ public AsyncWrite(byte[] b, int off, int len, boolean complete)
+ {
+ _buffer=ByteBuffer.wrap(b, off, len);
+ _complete=complete;
+ _len=len;
+ }
+
+ public AsyncWrite(ByteBuffer buffer, boolean complete)
+ {
+ _buffer=buffer;
+ _complete=complete;
+ _len=buffer.remaining();
+ }
+
+ @Override
+ protected boolean process()
+ {
+ // flush any content from the aggregate
+ if (BufferUtil.hasContent(_aggregate))
+ {
+ _channel.write(_aggregate, _complete && _len==0, this);
+ return false;
+ }
+
+ if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
+ {
+ BufferUtil.put(_buffer,_aggregate);
+ }
+ else if (_len>0 && !_flushed)
+ {
+ _flushed=true;
+ _channel.write(_buffer, _complete, this);
+ return false;
+ }
+ else if (_len==0 && !_flushed)
+ {
+ _flushed=true;
+ _channel.write(BufferUtil.EMPTY_BUFFER, _complete, this);
+ return false;
+ }
+
+ if (_complete)
+ closed();
+ return true;
+ }
+ }
+
+ private class AsyncFlush extends IteratingCallback
+ {
+ protected boolean _flushed;
+
+ public AsyncFlush()
+ {
+ }
+
+ @Override
+ protected boolean process()
+ {
+ if (BufferUtil.hasContent(_aggregate))
+ {
+ _flushed=true;
+ _channel.write(_aggregate, false, this);
+ return false;
+ }
+
+ if (!_flushed)
+ {
+ _flushed=true;
+ _channel.write(BufferUtil.EMPTY_BUFFER,false,this);
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ protected void completed()
+ {
+ try
+ {
+ while(true)
+ {
+ State last=_state.get();
+ switch(last)
+ {
+ case PENDING:
+ if (!_state.compareAndSet(State.PENDING, State.ASYNC))
+ continue;
+ break;
+
+ case UNREADY:
+ if (!_state.compareAndSet(State.UNREADY, State.READY))
+ continue;
+ _channel.getState().onWritePossible();
+ break;
+
+ case CLOSED:
+ _onError=new EofException("Closed");
+ break;
+
+ default:
+ throw new IllegalStateException();
+ }
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ _onError=e;
+ _channel.getState().onWritePossible();
+ }
+ }
+
+ @Override
+ public void failed(Throwable e)
+ {
+ super.failed(e);
+ _onError=e;
+ _channel.getState().onWritePossible();
+ }
+
+
+ }
+
+
/* ------------------------------------------------------------ */
- /** An iterating callback that will take content from an
+ /** An iterating callback that will take content from an
* InputStream and write it to the associated {@link HttpChannel}.
- * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
+ * A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
- * be notified as each buffer is written and only once all the input is consumed will the
- * wrapped {@link Callback#succeeded()} method be called.
+ * be notified as each buffer is written and only once all the input is consumed will the
+ * wrapped {@link Callback#succeeded()} method be called.
*/
- private class InputStreamWritingCB extends IteratingCallback
+ private class InputStreamWritingCB extends IteratingNestedCallback
{
private final InputStream _in;
private final ByteBuffer _buffer;
private boolean _eof;
-
+
public InputStreamWritingCB(InputStream in, Callback callback)
- {
+ {
super(callback);
_in=in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
@@ -502,6 +856,7 @@ public class HttpOutput extends ServletOutputStream
_buffer.position(0);
_buffer.limit(len);
_channel.write(_buffer,_eof,this);
+
return false;
}
@@ -519,26 +874,26 @@ public class HttpOutput extends ServletOutputStream
LOG.ignore(e);
}
}
-
+
}
/* ------------------------------------------------------------ */
- /** An iterating callback that will take content from a
+ /** An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
- * be notified as each buffer is written and only once all the input is consumed will the
- * wrapped {@link Callback#succeeded()} method be called.
+ * be notified as each buffer is written and only once all the input is consumed will the
+ * wrapped {@link Callback#succeeded()} method be called.
*/
- private class ReadableByteChannelWritingCB extends IteratingCallback
+ private class ReadableByteChannelWritingCB extends IteratingNestedCallback
{
private final ReadableByteChannel _in;
private final ByteBuffer _buffer;
private boolean _eof;
-
+
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
- {
+ {
super(callback);
_in=in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
@@ -561,10 +916,11 @@ public class HttpOutput extends ServletOutputStream
_buffer.clear();
while (_buffer.hasRemaining() && !_eof)
_eof = (_in.read(_buffer)) < 0;
-
+
// write what we have
_buffer.flip();
_channel.write(_buffer,_eof,this);
+
return false;
}
@@ -583,4 +939,5 @@ public class HttpOutput extends ServletOutputStream
}
}
}
+
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java
index ef62bdc86f..0ab12b207e 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpTransport.java
@@ -18,7 +18,6 @@
package org.eclipse.jetty.server;
-import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.HttpGenerator;
@@ -26,9 +25,6 @@ import org.eclipse.jetty.util.Callback;
public interface HttpTransport
{
- @Deprecated
- void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException;
-
void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback);
void send(ByteBuffer content, boolean lastContent, Callback callback);
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java
index 83335c4dc3..0f4abd90fb 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java
@@ -35,10 +35,10 @@ public class Iso88591HttpWriter extends HttpWriter
public void write (char[] s,int offset, int length) throws IOException
{
HttpOutput out = _out;
- if (length==0)
+ if (length==0 && out.isAllContentWritten())
{
- if (_out.isAllContentWritten())
- close();
+ close();
+ return;
}
if (length==1)
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java
index 6d161def3a..acf1cf073f 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java
@@ -172,7 +172,6 @@ public class LocalConnector extends AbstractConnector
Connection connection = getDefaultConnectionFactory().newConnection(this, endPoint);
endPoint.setConnection(connection);
-// connectionOpened(connection);
connection.onOpen();
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java
index d7304d8fc3..df8ce174c6 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkConnector.java
@@ -39,7 +39,6 @@ public interface NetworkConnector extends Connector, AutoCloseable
* (for example, to stop accepting network connections).</p>
* Once a connector has been closed, it cannot be opened again without first
* calling {@link #stop()} and it will not be active again until a subsequent call to {@link #start()}
- * @throws IOException if this connector cannot be closed
*/
@Override
void close();
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java
new file mode 100644
index 0000000000..fb0c23a3fe
--- /dev/null
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/QueuedHttpInput.java
@@ -0,0 +1,160 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.server;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import javax.servlet.ServletInputStream;
+
+import org.eclipse.jetty.util.ArrayQueue;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
+
+/**
+ * <p>{@link QueuedHttpInput} provides an implementation of {@link ServletInputStream} for {@link HttpChannel}.</p>
+ * <p>{@link QueuedHttpInput} holds a queue of items passed to it by calls to {@link #content(Object)}.</p>
+ * <p>{@link QueuedHttpInput} stores the items directly; if the items contain byte buffers, it does not copy them
+ * but simply holds references to the item, thus the caller must organize for those buffers to valid while
+ * held by this class.</p>
+ * <p>To assist the caller, subclasses may override methods {@link #onAsyncRead()},
+ * {@link #onContentConsumed(Object)} and {@link #onAllContentConsumed()} that can be implemented so that the
+ * caller will know when buffers are queued and consumed.</p>
+ */
+public abstract class QueuedHttpInput<T> extends HttpInput<T>
+{
+ private final static Logger LOG = Log.getLogger(QueuedHttpInput.class);
+
+ private final ArrayQueue<T> _inputQ = new ArrayQueue<>(lock());
+
+ public QueuedHttpInput()
+ {}
+
+ public void recycle()
+ {
+ synchronized (lock())
+ {
+ T item = _inputQ.peekUnsafe();
+ while (item != null)
+ {
+ _inputQ.pollUnsafe();
+ onContentConsumed(item);
+
+ item = _inputQ.peekUnsafe();
+ if (item == null)
+ onAllContentConsumed();
+ }
+ super.recycle();
+ }
+ }
+
+ @Override
+ protected T nextContent()
+ {
+ T item = _inputQ.peekUnsafe();
+
+ // Skip empty items at the head of the queue
+ while (item != null && remaining(item) == 0)
+ {
+ _inputQ.pollUnsafe();
+ onContentConsumed(item);
+ LOG.debug("{} consumed {}", this, item);
+ item = _inputQ.peekUnsafe();
+
+ // If that was the last item then notify
+ if (item==null)
+ onAllContentConsumed();
+ }
+ return item;
+ }
+
+ protected abstract void onContentConsumed(T item);
+
+ protected void blockForContent() throws IOException
+ {
+ synchronized (lock())
+ {
+ while (_inputQ.isEmpty() && !_state.isEOF())
+ {
+ try
+ {
+ LOG.debug("{} waiting for content", this);
+ lock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw (IOException)new InterruptedIOException().initCause(e);
+ }
+ }
+ }
+ }
+
+
+ /* ------------------------------------------------------------ */
+ /** Called by this HttpInput to signal all available content has been consumed
+ */
+ protected void onAllContentConsumed()
+ {
+ }
+
+ /* ------------------------------------------------------------ */
+ /** Add some content to the input stream
+ * @param item
+ */
+ public void content(T item)
+ {
+ // The buffer is not copied here. This relies on the caller not recycling the buffer
+ // until the it is consumed. The onContentConsumed and onAllContentConsumed() callbacks are
+ // the signals to the caller that the buffers can be recycled.
+
+ synchronized (lock())
+ {
+ boolean empty=_inputQ.isEmpty();
+
+ _inputQ.add(item);
+
+ if (empty)
+ {
+ if (!onAsyncRead())
+ lock().notify();
+ }
+
+ LOG.debug("{} queued {}", this, item);
+ }
+ }
+
+
+ public void earlyEOF()
+ {
+ synchronized (lock())
+ {
+ super.earlyEOF();
+ lock().notify();
+ }
+ }
+
+ public void messageComplete()
+ {
+ synchronized (lock())
+ {
+ super.messageComplete();
+ lock().notify();
+ }
+ }
+
+}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java
index c6db08cc1e..2a7ba2574e 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java
@@ -57,6 +57,7 @@ import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.Part;
import org.eclipse.jetty.http.HttpCookie;
@@ -494,6 +495,16 @@ public class Request implements HttpServletRequest
{
return (int)_fields.getLongField(HttpHeader.CONTENT_LENGTH.toString());
}
+
+ /* ------------------------------------------------------------ */
+ /*
+ * @see javax.servlet.ServletRequest.getContentLengthLong()
+ */
+ @Override
+ public long getContentLengthLong()
+ {
+ return _fields.getLongField(HttpHeader.CONTENT_LENGTH.toString());
+ }
/* ------------------------------------------------------------ */
/*
@@ -532,7 +543,12 @@ public class Request implements HttpServletRequest
public Cookie[] getCookies()
{
if (_cookiesExtracted)
- return _cookies == null?null:_cookies.getCookies();
+ {
+ if (_cookies == null || _cookies.getCookies().length == 0)
+ return null;
+
+ return _cookies.getCookies();
+ }
_cookiesExtracted = true;
@@ -551,7 +567,11 @@ public class Request implements HttpServletRequest
}
}
- return _cookies == null?null:_cookies.getCookies();
+ //Javadoc for Request.getCookies() stipulates null for no cookies
+ if (_cookies == null || _cookies.getCookies().length == 0)
+ return null;
+
+ return _cookies.getCookies();
}
/* ------------------------------------------------------------ */
@@ -1607,9 +1627,7 @@ public class Request implements HttpServletRequest
/* ------------------------------------------------------------ */
/*
* Set a request attribute. if the attribute name is "org.eclipse.jetty.server.server.Request.queryEncoding" then the value is also passed in a call to
- * {@link #setQueryEncoding}. <p> if the attribute name is "org.eclipse.jetty.server.server.ResponseBuffer", then the response buffer is flushed with @{link
- * #flushResponseBuffer} <p> if the attribute name is "org.eclipse.jetty.io.EndPoint.maxIdleTime", then the value is passed to the associated {@link
- * EndPoint#setIdleTimeout}.
+ * {@link #setQueryEncoding}.
*
* @see javax.servlet.ServletRequest#setAttribute(java.lang.String, java.lang.Object)
*/
@@ -1618,35 +1636,11 @@ public class Request implements HttpServletRequest
{
Object old_value = _attributes == null?null:_attributes.getAttribute(name);
- if (name.startsWith("org.eclipse.jetty."))
- {
- if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name))
- setQueryEncoding(value == null?null:value.toString());
- else if ("org.eclipse.jetty.server.sendContent".equals(name))
- {
- try
- {
- ((HttpOutput)getServletResponse().getOutputStream()).sendContent(value);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- else if ("org.eclipse.jetty.server.ResponseBuffer".equals(name))
- {
- try
- {
- throw new IOException("not implemented");
- //((HttpChannel.Output)getServletResponse().getOutputStream()).sendResponse(byteBuffer);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
+ if ("org.eclipse.jetty.server.Request.queryEncoding".equals(name))
+ setQueryEncoding(value == null?null:value.toString());
+ else if ("org.eclipse.jetty.server.sendContent".equals(name))
+ LOG.warn("Deprecated: org.eclipse.jetty.server.sendContent");
+
if (_attributes == null)
_attributes = new AttributesMap();
_attributes.setAttribute(name,value);
@@ -2194,4 +2188,32 @@ public class Request implements HttpServletRequest
setParameters(parameters);
setQueryString(query);
}
+
+
+
+ /**
+ * @see javax.servlet.http.HttpServletRequest#upgrade(java.lang.Class)
+ */
+ @Override
+ public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException
+ {
+ if (getContext() == null)
+ throw new ServletException ("Unable to instantiate "+handlerClass);
+
+ try
+ {
+ //Instantiate an instance and inject it
+ T h = getContext().createInstance(handlerClass);
+
+ //TODO handle the rest of the upgrade process
+
+ return h;
+ }
+ catch (Exception e)
+ {
+ if (e instanceof ServletException)
+ throw (ServletException)e;
+ throw new ServletException(e);
+ }
+ }
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java
index c41ee2b03b..0d67712f3e 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResourceCache.java
@@ -284,26 +284,9 @@ public class ResourceCache
{
try
{
- int len=(int)resource.length();
- if (len<0)
- {
- LOG.warn("invalid resource: "+String.valueOf(resource)+" "+len);
- return null;
- }
- ByteBuffer buffer = BufferUtil.allocate(len);
- int pos=BufferUtil.flipToFill(buffer);
- if (resource.getFile()!=null)
- BufferUtil.readFrom(resource.getFile(),buffer);
- else
- {
- InputStream is = resource.getInputStream();
- BufferUtil.readFrom(is,len,buffer);
- is.close();
- }
- BufferUtil.flipToFlush(buffer,pos);
- return buffer;
+ return BufferUtil.toBuffer(resource,true);
}
- catch(IOException e)
+ catch(IOException|IllegalArgumentException e)
{
LOG.warn(e);
return null;
@@ -316,30 +299,11 @@ public class ResourceCache
try
{
if (_useFileMappedBuffer && resource.getFile()!=null)
- return BufferUtil.toBuffer(resource.getFile());
-
- int len=(int)resource.length();
- if (len<0)
- {
- LOG.warn("invalid resource: "+String.valueOf(resource)+" "+len);
- return null;
- }
- ByteBuffer buffer = BufferUtil.allocateDirect(len);
-
- int pos=BufferUtil.flipToFill(buffer);
- if (resource.getFile()!=null)
- BufferUtil.readFrom(resource.getFile(),buffer);
- else
- {
- InputStream is = resource.getInputStream();
- BufferUtil.readFrom(is,len,buffer);
- is.close();
- }
- BufferUtil.flipToFlush(buffer,pos);
+ return BufferUtil.toMappedBuffer(resource.getFile());
- return buffer;
+ return BufferUtil.toBuffer(resource,true);
}
- catch(IOException e)
+ catch(IOException|IllegalArgumentException e)
{
LOG.warn(e);
return null;
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
index b1fa612447..b32e681ec7 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java
@@ -99,6 +99,7 @@ public class Response implements HttpServletResponse
private Locale _locale;
private MimeTypes.Type _mimeType;
private String _characterEncoding;
+ private boolean _explicitEncoding;
private String _contentType;
private OutputType _outputType = OutputType.NONE;
private ResponseWriter _writer;
@@ -128,6 +129,7 @@ public class Response implements HttpServletResponse
_contentLength = -1;
_out.reset();
_fields.clear();
+ _explicitEncoding=false;
}
public void setHeaders(HttpContent httpContent)
@@ -173,6 +175,10 @@ public class Response implements HttpServletResponse
public void included()
{
_include.decrementAndGet();
+ if (_outputType == OutputType.WRITER)
+ {
+ _writer.reopen();
+ }
_out.reopen();
}
@@ -454,10 +460,18 @@ public class Response implements HttpServletResponse
_channel.sendResponse(HttpGenerator.PROGRESS_102_INFO, null, true);
}
}
-
- @Override
- public void sendRedirect(String location) throws IOException
+
+ /**
+ * Sends a response with one of the 300 series redirection codes.
+ * @param code
+ * @param location
+ * @throws IOException
+ */
+ public void sendRedirect(int code, String location) throws IOException
{
+ if ((code < HttpServletResponse.SC_MULTIPLE_CHOICES) || (code >= HttpServletResponse.SC_BAD_REQUEST))
+ throw new IllegalArgumentException("Not a 3xx redirect code");
+
if (isIncluding())
return;
@@ -467,7 +481,15 @@ public class Response implements HttpServletResponse
if (!URIUtil.hasScheme(location))
{
StringBuilder buf = _channel.getRequest().getRootURL();
- if (location.startsWith("/"))
+
+ if (location.startsWith("//"))
+ {
+ buf.delete(0, buf.length());
+ buf.append(_channel.getRequest().getScheme());
+ buf.append(":");
+ buf.append(location);
+ }
+ else if (location.startsWith("/"))
buf.append(location);
else
{
@@ -515,11 +537,17 @@ public class Response implements HttpServletResponse
resetBuffer();
setHeader(HttpHeader.LOCATION, location);
- setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY);
+ setStatus(code);
closeOutput();
}
@Override
+ public void sendRedirect(String location) throws IOException
+ {
+ sendRedirect(HttpServletResponse.SC_MOVED_TEMPORARILY, location);
+ }
+
+ @Override
public void setDateHeader(String name, long date)
{
if (!isIncluding())
@@ -723,7 +751,7 @@ public class Response implements HttpServletResponse
encoding = MimeTypes.inferCharsetFromContentType(_contentType);
if (encoding == null)
encoding = StringUtil.__ISO_8859_1;
- setCharacterEncoding(encoding);
+ setCharacterEncoding(encoding,false);
}
if (_writer != null && _writer.isFor(encoding))
@@ -787,6 +815,8 @@ public class Response implements HttpServletResponse
{
case WRITER:
_writer.close();
+ if (!_out.isClosed())
+ _out.close();
break;
case STREAM:
getOutputStream().close();
@@ -811,10 +841,21 @@ public class Response implements HttpServletResponse
_contentLength = len;
_fields.putLongField(HttpHeader.CONTENT_LENGTH.toString(), len);
}
+
+ @Override
+ public void setContentLengthLong(long length)
+ {
+ setLongContentLength(length);
+ }
@Override
public void setCharacterEncoding(String encoding)
{
+ setCharacterEncoding(encoding,true);
+ }
+
+ private void setCharacterEncoding(String encoding, boolean explicit)
+ {
if (isIncluding())
return;
@@ -822,6 +863,8 @@ public class Response implements HttpServletResponse
{
if (encoding == null)
{
+ _explicitEncoding=false;
+
// Clear any encoding.
if (_characterEncoding != null)
{
@@ -840,6 +883,7 @@ public class Response implements HttpServletResponse
else
{
// No, so just add this one to the mimetype
+ _explicitEncoding=explicit;
_characterEncoding = StringUtil.normalizeCharset(encoding);
if (_contentType != null)
{
@@ -900,6 +944,7 @@ public class Response implements HttpServletResponse
else
{
_characterEncoding = charset;
+ _explicitEncoding = true;
}
HttpField field = HttpField.CONTENT_TYPE.get(_contentType);
@@ -1040,8 +1085,8 @@ public class Response implements HttpServletResponse
String charset = _channel.getRequest().getContext().getContextHandler().getLocaleEncoding(locale);
- if (charset != null && charset.length() > 0 && _characterEncoding == null)
- setCharacterEncoding(charset);
+ if (charset != null && charset.length() > 0 && !_explicitEncoding)
+ setCharacterEncoding(charset,false);
}
@Override
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java
index 889e2ffefa..7fedc13f42 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java
@@ -67,7 +67,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* which are implemented to each use a NIO {@link Selector} instance to asynchronously
* schedule a set of accepted connections. It is the selector thread that will call the
* {@link Callback} instances passed in the {@link EndPoint#fillInterested(Callback)} or
- * {@link EndPoint#write(Object, Callback, java.nio.ByteBuffer...)} methods. It is expected
+ * {@link EndPoint#write(Callback, java.nio.ByteBuffer...)} methods. It is expected
* that these callbacks may do some non-blocking IO work, but will always dispatch to the
* {@link Executor} service any blocking, long running or application tasks.
* <p>
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java
index b13144840a..b0c6270e96 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletRequestHttpWrapper.java
@@ -31,10 +31,15 @@ import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.Part;
+
/* ------------------------------------------------------------ */
-/** Class to tunnel a ServletRequest via a HttpServletRequest
+/**
+ * ServletRequestHttpWrapper
+ *
+ * Class to tunnel a ServletRequest via a HttpServletRequest
*/
public class ServletRequestHttpWrapper extends ServletRequestWrapper implements HttpServletRequest
{
@@ -209,4 +214,25 @@ public class ServletRequestHttpWrapper extends ServletRequestWrapper implements
}
+ /**
+ * @see javax.servlet.http.HttpServletRequest#changeSessionId()
+ */
+ @Override
+ public String changeSessionId()
+ {
+ // TODO 3.1 Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * @see javax.servlet.http.HttpServletRequest#upgrade(java.lang.Class)
+ */
+ @Override
+ public <T extends HttpUpgradeHandler> T upgrade(Class<T> handlerClass) throws IOException, ServletException
+ {
+ // TODO 3.1 Auto-generated method stub
+ return null;
+ }
+
+
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java
index 1f1ffb4f42..4c62b9e692 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServletResponseHttpWrapper.java
@@ -28,7 +28,10 @@ import javax.servlet.http.HttpServletResponse;
/* ------------------------------------------------------------ */
-/** Wrapper to tunnel a ServletResponse via a HttpServletResponse
+/**
+ * ServletResponseHttpWrapper
+ *
+ * Wrapper to tunnel a ServletResponse via a HttpServletResponse
*/
public class ServletResponseHttpWrapper extends ServletResponseWrapper implements HttpServletResponse
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java
index 251e3d0fd4..6028109f8e 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java
@@ -43,10 +43,10 @@ public class Utf8HttpWriter extends HttpWriter
public void write (char[] s,int offset, int length) throws IOException
{
HttpOutput out = _out;
- if (length==0)
+ if (length==0 && out.isAllContentWritten())
{
- if (_out.isAllContentWritten())
- close();
+ close();
+ return;
}
while (length > 0)
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java
index 91d66baa23..2554437302 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandler.java
@@ -104,10 +104,16 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
public final static int SERVLET_MAJOR_VERSION=3;
public final static int SERVLET_MINOR_VERSION=0;
+ public static final Class[] SERVLET_LISTENER_TYPES = new Class[] {ServletContextListener.class,
+ ServletContextAttributeListener.class,
+ ServletRequestListener.class,
+ ServletRequestAttributeListener.class};
- final private static String __unimplmented="Unimplemented - use org.eclipse.jetty.servlet.ServletContextHandler";
+ public static final int DEFAULT_LISTENER_TYPE_INDEX = 1;
+ public static final int EXTENDED_LISTENER_TYPE_INDEX = 0;
+ final private static String __unimplmented="Unimplemented - use org.eclipse.jetty.servlet.ServletContextHandler";
private static final Logger LOG = Log.getLogger(ContextHandler.class);
@@ -550,7 +556,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
_requestListeners.clear();
_requestAttributeListeners.clear();
_eventListeners.clear();
-
+
if (eventListeners!=null)
for (EventListener listener : eventListeners)
addEventListener(listener);
@@ -628,7 +634,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
return _programmaticListeners.contains(listener);
}
-
+
/* ------------------------------------------------------------ */
/**
@@ -828,7 +834,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
for (int i = _contextListeners.size(); i-->0;)
callContextDestroyed(_contextListeners.get(i),event);
}
-
+
//retain only durable listeners
setEventListeners(_durableListeners.toArray(new EventListener[_durableListeners.size()]));
_durableListeners.clear();
@@ -968,7 +974,9 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
if (old_context != _scontext)
{
// check the target.
- if (DispatcherType.REQUEST.equals(dispatch) || DispatcherType.ASYNC.equals(dispatch) || (DispatcherType.ERROR.equals(dispatch) && baseRequest.getHttpChannelState().isExpired()))
+ if (DispatcherType.REQUEST.equals(dispatch) ||
+ DispatcherType.ASYNC.equals(dispatch) ||
+ DispatcherType.ERROR.equals(dispatch) && baseRequest.getHttpChannelState().isAsync())
{
if (_compactPath)
target = URIUtil.compactPath(target);
@@ -1303,13 +1311,13 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
LOG.warn(this+" contextPath ends with /");
contextPath=contextPath.substring(0,contextPath.length()-1);
}
-
+
if (contextPath.length()==0)
{
LOG.warn("Empty contextPath");
contextPath="/";
}
-
+
_contextPath = contextPath;
if (getServer() != null && (getServer().isStarting() || getServer().isStarted()))
@@ -1720,6 +1728,8 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
public class Context extends NoContext
{
protected boolean _enabled = true; //whether or not the dynamic API is enabled for callers
+ protected boolean _extendedListenerTypes = false;
+
/* ------------------------------------------------------------ */
protected Context()
@@ -2124,6 +2134,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
try
{
Class<? extends EventListener> clazz = _classLoader==null?Loader.loadClass(ContextHandler.class,className):_classLoader.loadClass(className);
+ checkListener(clazz);
addListener(clazz);
}
catch (ClassNotFoundException e)
@@ -2137,6 +2148,9 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
if (!_enabled)
throw new UnsupportedOperationException();
+
+ checkListener(t.getClass());
+
ContextHandler.this.addEventListener(t);
ContextHandler.this.addProgrammaticListener(t);
}
@@ -2147,6 +2161,8 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
if (!_enabled)
throw new UnsupportedOperationException();
+ checkListener(listenerClass);
+
try
{
EventListener e = createListener(listenerClass);
@@ -2164,18 +2180,42 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
try
{
- return clazz.newInstance();
+ return createInstance(clazz);
}
- catch (InstantiationException e)
+ catch (Exception e)
{
throw new ServletException(e);
}
- catch (IllegalAccessException e)
+ }
+
+
+ public void checkListener (Class<? extends EventListener> listener) throws IllegalStateException
+ {
+ boolean ok = false;
+ int startIndex = (isExtendedListenerTypes()?EXTENDED_LISTENER_TYPE_INDEX:DEFAULT_LISTENER_TYPE_INDEX);
+ for (int i=startIndex;i<SERVLET_LISTENER_TYPES.length;i++)
{
- throw new ServletException(e);
+ if (SERVLET_LISTENER_TYPES[i].isAssignableFrom(listener))
+ {
+ ok = true;
+ break;
+ }
}
+ if (!ok)
+ throw new IllegalArgumentException("Inappropriate listener class "+listener.getName());
}
+ public void setExtendedListenerTypes (boolean extended)
+ {
+ _extendedListenerTypes = extended;
+ }
+
+ public boolean isExtendedListenerTypes()
+ {
+ return _extendedListenerTypes;
+ }
+
+
@Override
public ClassLoader getClassLoader()
{
@@ -2213,6 +2253,14 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
return _enabled;
}
+
+
+
+ public <T> T createInstance (Class<T> clazz) throws Exception
+ {
+ T o = clazz.newInstance();
+ return o;
+ }
}
@@ -2553,6 +2601,16 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
LOG.warn(__unimplmented);
}
+
+ /**
+ * @see javax.servlet.ServletContext#getVirtualServerName()
+ */
+ @Override
+ public String getVirtualServerName()
+ {
+ // TODO 3.1 Auto-generated method stub
+ return null;
+ }
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java
index d1e13ce35b..451e3afae1 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ResourceHandler.java
@@ -533,7 +533,7 @@ public class ResourceHandler extends HandlerWrapper
resource.length()>_minMemoryMappedContentLength &&
resource instanceof FileResource)
{
- ByteBuffer buffer = BufferUtil.toBuffer(resource.getFile());
+ ByteBuffer buffer = BufferUtil.toMappedBuffer(resource.getFile());
((HttpOutput)out).sendContent(buffer,callback);
}
else // Do a blocking write of a channel (if available) or input stream
@@ -553,7 +553,7 @@ public class ResourceHandler extends HandlerWrapper
resource.length()>_minMemoryMappedContentLength &&
resource instanceof FileResource)
{
- ByteBuffer buffer = BufferUtil.toBuffer(resource.getFile());
+ ByteBuffer buffer = BufferUtil.toMappedBuffer(resource.getFile());
((HttpOutput)out).sendContent(buffer);
}
else // Do a blocking write of a channel (if available) or input stream
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java
index 78e8eb71ef..26a0d8dcf2 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java
@@ -81,7 +81,6 @@ public class StatisticsHandler extends HandlerWrapper
@Override
public void onComplete(AsyncEvent event) throws IOException
{
-
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();
Request request = state.getBaseRequest();
@@ -92,8 +91,7 @@ public class StatisticsHandler extends HandlerWrapper
updateResponse(request);
- if (!state.isDispatched())
- _asyncWaitStats.decrement();
+ _asyncWaitStats.decrement();
}
};
@@ -139,9 +137,7 @@ public class StatisticsHandler extends HandlerWrapper
{
// resumed request
start = System.currentTimeMillis();
- _asyncWaitStats.decrement();
- if (state.isDispatched())
- _asyncDispatches.incrementAndGet();
+ _asyncDispatches.incrementAndGet();
}
try
@@ -159,8 +155,10 @@ public class StatisticsHandler extends HandlerWrapper
if (state.isSuspended())
{
if (state.isInitial())
+ {
state.addListener(_onCompletion);
- _asyncWaitStats.increment();
+ _asyncWaitStats.increment();
+ }
}
else if (state.isInitial())
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java
index db6b465c5e..96db015f21 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSession.java
@@ -51,8 +51,8 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI
{
final static Logger LOG = SessionHandler.LOG;
public final static String SESSION_KNOWN_ONLY_TO_AUTHENTICATED="org.eclipse.jetty.security.sessionKnownOnlytoAuthenticated";
- private String _clusterId; // ID unique within cluster
- private String _nodeId; // ID unique within node
+ private String _clusterId; // ID without any node (ie "worker") id appended
+ private String _nodeId; // ID of session with node(ie "worker") id appended
private final AbstractSessionManager _manager;
private final Map<String,Object> _attributes=new HashMap<String, Object>();
private boolean _idChanged;
@@ -185,6 +185,7 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI
@Override
public long getCreationTime() throws IllegalStateException
{
+ checkValid();
return _created;
}
@@ -365,6 +366,7 @@ public abstract class AbstractSession implements AbstractSessionManager.SessionI
@Override
public void invalidate() throws IllegalStateException
{
+ checkValid();
// remove session from context and invalidate other sessions with same ID.
_manager.removeSession(this,true);
doInvalidate();
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java
index 4801d16dd4..b13419bc21 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/AbstractSessionManager.java
@@ -38,6 +38,7 @@ import javax.servlet.http.HttpSessionAttributeListener;
import javax.servlet.http.HttpSessionBindingEvent;
import javax.servlet.http.HttpSessionContext;
import javax.servlet.http.HttpSessionEvent;
+import javax.servlet.http.HttpSessionIdListener;
import javax.servlet.http.HttpSessionListener;
import org.eclipse.jetty.http.HttpCookie;
@@ -107,6 +108,7 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement
protected final List<HttpSessionAttributeListener> _sessionAttributeListeners = new CopyOnWriteArrayList<HttpSessionAttributeListener>();
protected final List<HttpSessionListener> _sessionListeners= new CopyOnWriteArrayList<HttpSessionListener>();
+ protected final List<HttpSessionIdListener> _sessionIdListeners = new CopyOnWriteArrayList<HttpSessionIdListener>();
protected ClassLoader _loader;
protected ContextHandler.Context _context;
@@ -191,6 +193,8 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement
_sessionAttributeListeners.add((HttpSessionAttributeListener)listener);
if (listener instanceof HttpSessionListener)
_sessionListeners.add((HttpSessionListener)listener);
+ if (listener instanceof HttpSessionIdListener)
+ _sessionIdListeners.add((HttpSessionIdListener)listener);
}
/* ------------------------------------------------------------ */
@@ -198,6 +202,7 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement
{
_sessionAttributeListeners.clear();
_sessionListeners.clear();
+ _sessionIdListeners.clear();
}
/* ------------------------------------------------------------ */
@@ -411,7 +416,6 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement
/* ------------------------------------------------------------ */
/**
- * @return if true, session cookie will be marked as secure only iff
* HTTPS request. Can be overridden by setting SessionCookieConfig.setSecure(true),
* in which case the session cookie will be marked as secure on both HTTPS and HTTP.
*/
@@ -1006,6 +1010,29 @@ public abstract class AbstractSessionManager extends AbstractLifeCycle implement
{
_checkingRemoteSessionIdEncoding=remote;
}
+
+
+ /* ------------------------------------------------------------ */
+ /**
+ * Tell the HttpSessionIdListeners the id changed.
+ * NOTE: this method must be called LAST in subclass overrides, after the session has been updated
+ * with the new id.
+ * @see org.eclipse.jetty.server.SessionManager#renewSessionId(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
+ */
+ @Override
+ public void renewSessionId(String oldClusterId, String oldNodeId, String newClusterId, String newNodeId)
+ {
+ if (!_sessionIdListeners.isEmpty())
+ {
+ AbstractSession session = getSession(newClusterId);
+ HttpSessionEvent event = new HttpSessionEvent(session);
+ for (HttpSessionIdListener l:_sessionIdListeners)
+ {
+ l.sessionIdChanged(event, oldClusterId);
+ }
+ }
+
+ }
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java
index 2f7e1b4f5e..65bcf1f071 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/HashSessionManager.java
@@ -83,7 +83,7 @@ public class HashSessionManager extends AbstractSessionManager
/* ------------------------------------------------------------ */
/**
- * @see org.eclipse.jetty.servlet.AbstractSessionManager#doStart()
+ * @see AbstractSessionManager#doStart()
*/
@Override
public void doStart() throws Exception
@@ -116,7 +116,7 @@ public class HashSessionManager extends AbstractSessionManager
/* ------------------------------------------------------------ */
/**
- * @see org.eclipse.jetty.servlet.AbstractSessionManager#doStop()
+ * @see AbstractSessionManager#doStop()
*/
@Override
public void doStop() throws Exception
@@ -440,6 +440,8 @@ public class HashSessionManager extends AbstractSessionManager
session.setNodeId(newNodeId);
session.save(); //save updated session: TODO consider only saving file if idled
sessions.put(newClusterId, session);
+
+ super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId);
}
catch (Exception e)
{
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java
index bf595d15bb..75c40e8a4e 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/JDBCSessionManager.java
@@ -647,6 +647,8 @@ public class JDBCSessionManager extends AbstractSessionManager
LOG.warn(e);
}
}
+
+ super.renewSessionId(oldClusterId, oldNodeId, newClusterId, newNodeId);
}
@@ -949,7 +951,7 @@ public class JDBCSessionManager extends AbstractSessionManager
/**
* Insert a session into the database.
*
- * @param data
+ * @param session
* @throws Exception
*/
protected void storeSession (Session session)
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java
index 0c8df262b7..44de98106a 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/session/SessionHandler.java
@@ -29,6 +29,9 @@ import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpSessionAttributeListener;
+import javax.servlet.http.HttpSessionIdListener;
+import javax.servlet.http.HttpSessionListener;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.server.Request;
@@ -46,6 +49,11 @@ public class SessionHandler extends ScopedHandler
final static Logger LOG = Log.getLogger("org.eclipse.jetty.server.session");
public final static EnumSet<SessionTrackingMode> DEFAULT_TRACKING = EnumSet.of(SessionTrackingMode.COOKIE,SessionTrackingMode.URL);
+
+ public static final Class[] SESSION_LISTENER_TYPES = new Class[] {HttpSessionAttributeListener.class,
+ HttpSessionIdListener.class,
+ HttpSessionListener.class};
+
/* -------------------------------------------------------------- */

Back to the top