// // ======================================================================== // Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. // ------------------------------------------------------------------------ // All rights reserved. This program and the accompanying materials // are made available under the terms of the Eclipse Public License v1.0 // and Apache License v2.0 which accompanies this distribution. // // The Eclipse Public License is available at // http://www.eclipse.org/legal/epl-v10.html // // The Apache License v2.0 is available at // http://www.opensource.org/licenses/apache2.0.php // // You may elect to redistribute this code under either of these licenses. // ======================================================================== // package org.eclipse.jetty.server; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritePendingException; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.WriteListener; import org.eclipse.jetty.http.HttpContent; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; /** *

{@link HttpOutput} implements {@link ServletOutputStream} * as required by the Servlet specification.

*

{@link HttpOutput} buffers content written by the application until a * further write will overflow the buffer, at which point it triggers a commit * of the response.

*

{@link HttpOutput} can be closed and reopened, to allow requests included * via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to * close the stream, to be reopened after the inclusion ends.

*/ public class HttpOutput extends ServletOutputStream implements Runnable { public interface Interceptor { void write(ByteBuffer content, boolean complete, Callback callback); Interceptor getNextInterceptor(); boolean isOptimizedForDirectBuffers(); } private static Logger LOG = Log.getLogger(HttpOutput.class); private final HttpChannel _channel; private final SharedBlockingCallback _writeBlock; private Interceptor _interceptor; /** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */ private long _written; private ByteBuffer _aggregate; private int _bufferSize; private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; /* ACTION OPEN ASYNC READY PENDING UNREADY CLOSED ------------------------------------------------------------------------------------------- setWriteListener() READY->owp ise ise ise ise ise write() OPEN ise PENDING wpe wpe eof flush() OPEN ise PENDING wpe wpe eof close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true write completed - - - ASYNC READY->owp - */ private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } private final AtomicReference _state=new AtomicReference<>(OutputState.OPEN); public HttpOutput(HttpChannel channel) { _channel = channel; _interceptor = channel; _writeBlock = new SharedBlockingCallback() { @Override protected long getIdleTimeout() { long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout(); if (bto>0) return bto; if (bto<0) return -1; return _channel.getIdleTimeout(); } }; HttpConfiguration config = channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); _commitSize = config.getOutputAggregationSize(); if (_commitSize>_bufferSize) { LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize); _commitSize=_bufferSize; } } public HttpChannel getHttpChannel() { return _channel; } public Interceptor getInterceptor() { return _interceptor; } public void setInterceptor(Interceptor filter) { _interceptor=filter; } public boolean isWritten() { return _written > 0; } public long getWritten() { return _written; } public void reopen() { _state.set(OutputState.OPEN); } public boolean isAllContentWritten() { return _channel.getResponse().isAllContentWritten(_written); } protected Blocker acquireWriteBlockingCallback() throws IOException { return _writeBlock.acquire(); } private void write(ByteBuffer content, boolean complete) throws IOException { try (Blocker blocker = _writeBlock.acquire()) { write(content, complete, blocker); blocker.block(); } catch (Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug(failure); abort(failure); throw failure; } } protected void write(ByteBuffer content, boolean complete, Callback callback) { _interceptor.write(content, complete, callback); } private void abort(Throwable failure) { closed(); _channel.abort(failure); } @Override public void close() { while(true) { OutputState state=_state.get(); switch (state) { case CLOSED: { return; } case UNREADY: { if (_state.compareAndSet(state,OutputState.ERROR)) _writeListener.onError(_onError==null?new EofException("Async close"):_onError); break; } default: { if (!_state.compareAndSet(state,OutputState.CLOSED)) break; try { write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); } catch (IOException x) { // Ignore it, it's been already logged in write(). } finally { releaseBuffer(); } // Return even if an exception is thrown by write(). return; } } } } /** * Called to indicate that the last write has been performed. * It updates the state and performs cleanup operations. */ void closed() { while(true) { OutputState state=_state.get(); switch (state) { case CLOSED: { return; } case UNREADY: { if (_state.compareAndSet(state,OutputState.ERROR)) _writeListener.onError(_onError==null?new EofException("Async closed"):_onError); break; } default: { if (!_state.compareAndSet(state, OutputState.CLOSED)) break; try { _channel.getResponse().closeOutput(); } catch (Throwable x) { if (LOG.isDebugEnabled()) LOG.debug(x); abort(x); } finally { releaseBuffer(); } // Return even if an exception is thrown by closeOutput(). return; } } } } private void releaseBuffer() { if (_aggregate != null) { _channel.getConnector().getByteBufferPool().release(_aggregate); _aggregate = null; } } public boolean isClosed() { return _state.get()==OutputState.CLOSED; } public boolean isAsync() { switch(_state.get()) { case ASYNC: case READY: case PENDING: case UNREADY: return true; default: return false; } } @Override public void flush() throws IOException { while(true) { switch(_state.get()) { case OPEN: write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false); return; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING)) continue; new AsyncFlush().iterate(); return; case PENDING: case UNREADY: throw new WritePendingException(); case ERROR: throw new EofException(_onError); case CLOSED: return; default: throw new IllegalStateException(); } } } @Override public void write(byte[] b, int off, int len) throws IOException { _written+=len; boolean complete=_channel.getResponse().isAllContentWritten(_written); // Async or Blocking ? while(true) { switch(_state.get()) { case OPEN: // process blocking below break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING)) continue; // Should we aggregate? if (!complete && len<=_commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); // YES - fill the aggregate with content from the buffer int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not complete, not full and filled all the content if (filled==len && !BufferUtil.isFull(_aggregate)) { if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) throw new IllegalStateException(); return; } // adjust offset/length off+=filled; len-=filled; } // Do the asynchronous writing from the callback new AsyncWrite(b,off,len,complete).iterate(); return; case PENDING: case UNREADY: throw new WritePendingException(); case ERROR: throw new EofException(_onError); case CLOSED: throw new EofException("Closed"); default: throw new IllegalStateException(); } break; } // handle blocking write // Should we aggregate? int capacity = getBufferSize(); if (!complete && len<=_commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers()); // YES - fill the aggregate with content from the buffer int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not complete, not full and filled all the content if (filled==len && !BufferUtil.isFull(_aggregate)) return; // adjust offset/length off+=filled; len-=filled; } // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { write(_aggregate, complete && len==0); // should we fill aggregate again from the buffer? if (len>0 && !complete && len<=_commitSize && len<=BufferUtil.space(_aggregate)) { BufferUtil.append(_aggregate, b, off, len); return; } } // write any remaining content in the buffer directly if (len>0) { ByteBuffer wrap = ByteBuffer.wrap(b, off, len); ByteBuffer view = wrap.duplicate(); // write a buffer capacity at a time to avoid JVM pooling large direct buffers // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 while (len>getBufferSize()) { int p=view.position(); int l=p+getBufferSize(); view.limit(p+getBufferSize()); write(view,false); len-=getBufferSize(); view.limit(l+Math.min(len,getBufferSize())); view.position(l); } write(view,complete); } else if (complete) { write(BufferUtil.EMPTY_BUFFER,true); } if (complete) closed(); } public void write(ByteBuffer buffer) throws IOException { _written+=buffer.remaining(); boolean complete=_channel.getResponse().isAllContentWritten(_written); // Async or Blocking ? while(true) { switch(_state.get()) { case OPEN: // process blocking below break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING)) continue; // Do the asynchronous writing from the callback new AsyncWrite(buffer,complete).iterate(); return; case PENDING: case UNREADY: throw new WritePendingException(); case ERROR: throw new EofException(_onError); case CLOSED: throw new EofException("Closed"); default: throw new IllegalStateException(); } break; } // handle blocking write int len=BufferUtil.length(buffer); // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) write(_aggregate, complete && len==0); // write any remaining content in the buffer directly if (len>0) write(buffer, complete); else if (complete) write(BufferUtil.EMPTY_BUFFER, true); if (complete) closed(); } @Override public void write(int b) throws IOException { _written+=1; boolean complete=_channel.getResponse().isAllContentWritten(_written); // Async or Blocking ? while(true) { switch(_state.get()) { case OPEN: if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); BufferUtil.append(_aggregate, (byte)b); // Check if all written or full if (complete || BufferUtil.isFull(_aggregate)) { write(_aggregate, complete); if (complete) closed(); } break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING)) continue; if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); BufferUtil.append(_aggregate, (byte)b); // Check if all written or full if (!complete && !BufferUtil.isFull(_aggregate)) { if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) throw new IllegalStateException(); return; } // Do the asynchronous writing from the callback new AsyncFlush().iterate(); return; case PENDING: case UNREADY: throw new WritePendingException(); case ERROR: throw new EofException(_onError); case CLOSED: throw new EofException("Closed"); default: throw new IllegalStateException(); } break; } } @Override public void print(String s) throws IOException { if (isClosed()) throw new IOException("Closed"); write(s.getBytes(_channel.getResponse().getCharacterEncoding())); } /** * Blocking send of whole content. * * @param content The whole content to send * @throws IOException if the send fails */ public void sendContent(ByteBuffer content) throws IOException { if (LOG.isDebugEnabled()) LOG.debug("sendContent({})",BufferUtil.toDetailString(content)); write(content, true); closed(); } /** * Blocking send of stream content. * * @param in The stream content to send * @throws IOException if the send fails */ public void sendContent(InputStream in) throws IOException { try(Blocker blocker = _writeBlock.acquire()) { new InputStreamWritingCB(in, blocker).iterate(); blocker.block(); } catch (Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug(failure); abort(failure); throw failure; } } /** * Blocking send of channel content. * * @param in The channel content to send * @throws IOException if the send fails */ public void sendContent(ReadableByteChannel in) throws IOException { try(Blocker blocker = _writeBlock.acquire()) { new ReadableByteChannelWritingCB(in, blocker).iterate(); blocker.block(); } catch (Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug(failure); abort(failure); throw failure; } } /** * Blocking send of HTTP content. * * @param content The HTTP content to send * @throws IOException if the send fails */ public void sendContent(HttpContent content) throws IOException { try(Blocker blocker = _writeBlock.acquire()) { sendContent(content, blocker); blocker.block(); } catch (Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug(failure); abort(failure); throw failure; } } /** * Asynchronous send of whole content. * @param content The whole content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ByteBuffer content, final Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback); write(content, true, new Callback() { @Override public void succeeded() { closed(); callback.succeeded(); } @Override public void failed(Throwable x) { abort(x); callback.failed(x); } }); } /** * Asynchronous send of stream content. * The stream will be closed after reading all content. * * @param in The stream content to send * @param callback The callback to use to notify success or failure */ public void sendContent(InputStream in, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(stream={},{})",in,callback); new InputStreamWritingCB(in, callback).iterate(); } /** * Asynchronous send of channel content. * The channel will be closed after reading all content. * * @param in The channel content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ReadableByteChannel in, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(channel={},{})",in,callback); new ReadableByteChannelWritingCB(in, callback).iterate(); } /** * Asynchronous send of HTTP content. * * @param httpContent The HTTP content to send * @param callback The callback to use to notify success or failure */ public void sendContent(HttpContent httpContent, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("sendContent(http={},{})",httpContent,callback); if (BufferUtil.hasContent(_aggregate)) { callback.failed(new IOException("cannot sendContent() after write()")); return; } if (_channel.isCommitted()) { callback.failed(new IOException("committed")); return; } while (true) { switch(_state.get()) { case OPEN: if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING)) continue; break; case ERROR: callback.failed(new EofException(_onError)); return; case CLOSED: callback.failed(new EofException("Closed")); return; default: throw new IllegalStateException(); } break; } ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; if (buffer == null) buffer = httpContent.getIndirectBuffer(); if (buffer!=null) { sendContent(buffer,callback); return; } try { ReadableByteChannel rbc=httpContent.getReadableByteChannel(); if (rbc!=null) { // Close of the rbc is done by the async sendContent sendContent(rbc,callback); return; } InputStream in = httpContent.getInputStream(); if (in!=null) { sendContent(in,callback); return; } throw new IllegalArgumentException("unknown content for "+httpContent); } catch(Throwable th) { abort(th); callback.failed(th); } } public int getBufferSize() { return _bufferSize; } public void setBufferSize(int size) { _bufferSize = size; _commitSize = size; } public void recycle() { resetBuffer(); _interceptor=_channel; } public void resetBuffer() { _written = 0; if (BufferUtil.hasContent(_aggregate)) BufferUtil.clear(_aggregate); reopen(); } @Override public void setWriteListener(WriteListener writeListener) { if (!_channel.getState().isAsync()) throw new IllegalStateException("!ASYNC"); if (_state.compareAndSet(OutputState.OPEN, OutputState.READY)) { _writeListener = writeListener; if (_channel.getState().onWritePossible()) _channel.execute(_channel); } else throw new IllegalStateException(); } /** * @see javax.servlet.ServletOutputStream#isReady() */ @Override public boolean isReady() { while (true) { switch(_state.get()) { case OPEN: return true; case ASYNC: if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY)) continue; return true; case READY: return true; case PENDING: if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY)) continue; return false; case UNREADY: return false; case ERROR: return true; case CLOSED: return true; default: throw new IllegalStateException(); } } } @Override public void run() { loop: while (true) { OutputState state = _state.get(); if(_onError!=null) { switch(state) { case CLOSED: case ERROR: { _onError=null; break loop; } default: { if (_state.compareAndSet(state, OutputState.ERROR)) { Throwable th=_onError; _onError=null; if (LOG.isDebugEnabled()) LOG.debug("onError",th); _writeListener.onError(th); close(); break loop; } } } continue; } switch(_state.get()) { case CLOSED: // Even though a write is not possible, because a close has // occurred, we need to call onWritePossible to tell async // producer that the last write completed. // So fall through case READY: try { _writeListener.onWritePossible(); break loop; } catch (Throwable e) { _onError = e; } break; default: _onError=new IllegalStateException("state="+_state.get()); } } } private void close(Closeable resource) { try { resource.close(); } catch (Throwable x) { LOG.ignore(x); } } @Override public String toString() { return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get()); } private abstract class AsyncICB extends IteratingCallback { @Override protected void onCompleteSuccess() { while(true) { OutputState last=_state.get(); switch(last) { case PENDING: if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) continue; break; case UNREADY: if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY)) continue; if (_channel.getState().onWritePossible()) _channel.execute(_channel); break; case CLOSED: break; default: throw new IllegalStateException(); } break; } } @Override public void onCompleteFailure(Throwable e) { _onError=e==null?new IOException():e; if (_channel.getState().onWritePossible()) _channel.execute(_channel); } } private class AsyncFlush extends AsyncICB { protected volatile boolean _flushed; public AsyncFlush() { } @Override protected Action process() { if (BufferUtil.hasContent(_aggregate)) { _flushed=true; write(_aggregate, false, this); return Action.SCHEDULED; } if (!_flushed) { _flushed=true; write(BufferUtil.EMPTY_BUFFER,false,this); return Action.SCHEDULED; } return Action.SUCCEEDED; } } private class AsyncWrite extends AsyncICB { private final ByteBuffer _buffer; private final ByteBuffer _slice; private final boolean _complete; private final int _len; protected volatile boolean _completed; public AsyncWrite(byte[] b, int off, int len, boolean complete) { _buffer=ByteBuffer.wrap(b, off, len); _len=len; // always use a view for large byte arrays to avoid JVM pooling large direct buffers _slice=_len