diff options
Diffstat (limited to 'jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java')
-rw-r--r-- | jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java | 362 |
1 files changed, 233 insertions, 129 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 868846ce64..72d9b7e630 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -1,98 +1,239 @@ +// ======================================================================== +// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd. +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + package org.eclipse.jetty.io; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.WritePendingException; -import java.util.ConcurrentModificationException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; -/* ------------------------------------------------------------ */ /** * A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)} * by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written. * The abstract method {@link #onIncompleteFlushed()} is called when not all content has been * written after a call to flush and should organise for the {@link #completeWrite()} * method to be called when a subsequent call to flush should be able to make more progress. - * - * TODO remove synchronisation */ abstract public class WriteFlusher { - private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0]; - private final AtomicBoolean _writing = new AtomicBoolean(false); + private static final Logger logger = Log.getLogger(WriteFlusher.class); + private final static ByteBuffer[] NO_BUFFERS = new ByteBuffer[0]; private final EndPoint _endp; - - private ByteBuffer[] _buffers; - private Object _context; - private Callback<Object> _callback; + private final AtomicReference<State> _state = new AtomicReference<>(); + private final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class); //TODO: static + private final State idleState = new IdleState(); //TODO: static all of them + private final State writingState = new WritingState(); + private final State failedState = new FailedState(); + private final State completingState = new CompletedState(); + private volatile Throwable failure; protected WriteFlusher(EndPoint endp) { - _endp=endp; + _state.set(idleState); + _endp = endp; + + // fill the state machine + __stateTransitions.put(StateType.IDLE, new HashSet<StateType>()); + __stateTransitions.put(StateType.WRITING, new HashSet<StateType>()); + __stateTransitions.put(StateType.PENDING, new HashSet<StateType>()); + __stateTransitions.put(StateType.COMPLETING, new HashSet<StateType>()); + __stateTransitions.put(StateType.FAILED, new HashSet<StateType>()); + + __stateTransitions.get(StateType.IDLE).add(StateType.WRITING); + __stateTransitions.get(StateType.WRITING).add(StateType.IDLE); + __stateTransitions.get(StateType.WRITING).add(StateType.PENDING); + __stateTransitions.get(StateType.WRITING).add(StateType.FAILED); + __stateTransitions.get(StateType.PENDING).add(StateType.IDLE); + __stateTransitions.get(StateType.PENDING).add(StateType.COMPLETING); + __stateTransitions.get(StateType.PENDING).add(StateType.FAILED); + __stateTransitions.get(StateType.COMPLETING).add(StateType.IDLE); + __stateTransitions.get(StateType.COMPLETING).add(StateType.PENDING); + __stateTransitions.get(StateType.COMPLETING).add(StateType.FAILED); + + __stateTransitions.get(StateType.IDLE).add(StateType.IDLE); // TODO: should never happen?! Probably remove this transition and just throw as this indicates a bug } - private enum State + private enum StateType { IDLE, WRITING, - CLOSED + PENDING, + COMPLETING, + FAILED + } + + private State updateState(State newState) + { + State currentState = _state.get(); + boolean updated = false; + + while (!updated) + { + if(!isTransitionAllowed(newState, currentState)) + return null; // return false + currentState + + updated = _state.compareAndSet(currentState, newState); + logger.debug("StateType update: {} -> {} {}", currentState, newState, updated ? "" : "failed"); + if (!updated) + currentState = _state.get(); + } + // We need to return true and currentState + return currentState; + } + + private boolean isTransitionAllowed(State newState, State currentState) + { + Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType()); + if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING) + { + logger.debug("WRITE PENDING EXCEPTION"); //TODO: thomas remove, we don't log and throw + throw new WritePendingException(); + } + if (!allowedNewStateTypes.contains(newState.getType())) + { + logger.debug("{} -> {} not allowed.", currentState.getType(), newState.getType()); //thomas remove + return false; + } + return true; } - private abstract class WriteFlusherState + private abstract class State { - private State _state; - private ByteBuffer[] _buffers; - private Object _context; - private Callback<Object> _callback; + protected StateType _type; + protected ByteBuffer[] _buffers; + protected Object _context; + protected Callback<Object> _callback; - private WriteFlusherState(State state, ByteBuffer[] buffers, Object context, Callback<Object> callback) + private State(StateType stateType, ByteBuffer[] buffers, Object context, Callback<Object> callback) { - _state = state; + _type = stateType; _buffers = buffers; _context = context; _callback = callback; } + + /** + * In most States this is a noop. In others it needs to be overwritten. + * + * @param cause + */ + protected void fail(Throwable cause) + { + } + + /** + * In most States this is a noop. In others it needs to be overwritten. + */ + protected void complete() + { + } + + public StateType getType() + { + return _type; + } + + public void compactBuffers() + { + this._buffers = compact(_buffers); + } + + public ByteBuffer[] getBuffers() + { + return _buffers; + } + + @Override + public String toString() + { + return String.format("%s", _type); + } } - private class WriteFlusherIdleState extends WriteFlusherState + private class IdleState extends State { - private WriteFlusherIdleState() + private IdleState() { - super(null,null,null,null); + super(StateType.IDLE, null, null, null); } } - private class WriteFlusherWritingState extends WriteFlusherState + private class WritingState extends State { - private WriteFlusherWritingState(State state, ByteBuffer[] buffers, Object context, Callback<Object> callback) + private WritingState() { - super(state, buffers, context, callback); + super(StateType.WRITING, null, null, null); } } - private class WriteFlusherClosingState extends WriteFlusherState + private class FailedState extends State { - private WriteFlusherClosingState() + private FailedState() { - super(null,null,null,null); + super(StateType.FAILED, null, null, null); } } - /* ------------------------------------------------------------ */ - public synchronized <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) + private class CompletedState extends State { - if (callback==null) + private CompletedState() + { + super(StateType.COMPLETING, null, null, null); + } + } + + private class PendingState extends State + { + private PendingState(ByteBuffer[] buffers, Object context, Callback<Object> callback) + { + super(StateType.PENDING, buffers, context, callback); + } + + @Override + protected void fail(Throwable cause) + { + _callback.failed(_context, cause); + } + + @Override + protected void complete() + { + _callback.completed(_context); + } + } + + public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) + { + logger.debug("write: starting write. {}", _state); //thomas + if (callback == null) throw new IllegalArgumentException(); - if (!_writing.compareAndSet(false,true)) - throw new WritePendingException(); + if(updateState(writingState) == null) + { + callback.failed(context, failure); + return; + } try { - _endp.flush(buffers); // Are we complete? @@ -100,160 +241,123 @@ abstract public class WriteFlusher { if (b.hasRemaining()) { - _buffers=buffers; - _context=context; - _callback=(Callback<Object>)callback; - _writing.set(true); // Needed as memory barrier - onIncompleteFlushed(); + if(updateState(new PendingState(buffers, context, (Callback<Object>)callback)) == null) + callback.failed(context, failure); + else + onIncompleteFlushed(); return; } } - - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(); + // If updateState didn't succeed, we don't care as our buffers have been written + updateState(idleState); callback.completed(context); } catch (IOException e) { - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(e); - callback.failed(context,e); + // If updateState didn't succeed, we don't care as writing our buffers failed + updateState(failedState); + callback.failed(context, e); } } - /* ------------------------------------------------------------ */ /** - * Abstract call to be implemented by specific WriteFlushers. + * Abstract call to be implemented by specific WriteFlushers. * It should schedule a call to {@link #completeWrite()} or * {@link #failed(Throwable)} when appropriate. + * * @return true if a flush can proceed. */ abstract protected void onIncompleteFlushed(); - /* ------------------------------------------------------------ */ /* Remove empty buffers from the start of a multi buffer array */ - private synchronized ByteBuffer[] compact(ByteBuffer[] buffers) + private ByteBuffer[] compact(ByteBuffer[] buffers) { - if (buffers.length<2) + if (buffers.length < 2) return buffers; - int b=0; - while (b<buffers.length && BufferUtil.isEmpty(buffers[b])) + int b = 0; + while (b < buffers.length && BufferUtil.isEmpty(buffers[b])) b++; - if (b==0) + if (b == 0) return buffers; - if (b==buffers.length) + if (b == buffers.length) return NO_BUFFERS; - ByteBuffer[] compact=new ByteBuffer[buffers.length-b]; - System.arraycopy(buffers,b,compact,0,compact.length); + ByteBuffer[] compact = new ByteBuffer[buffers.length - b]; + System.arraycopy(buffers, b, compact, 0, compact.length); return compact; } - /* ------------------------------------------------------------ */ /** * Complete a write that has not completed and that called * {@link #onIncompleteFlushed()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} * is likely to be able to progress. */ - public synchronized void completeWrite() + public void completeWrite() { - if (!isWriting()) - return; // TODO throw? + State currentState = updateState(completingState); + if (currentState == null || currentState.getType() != StateType.PENDING) + return; try { - while(true) - { - _buffers=compact(_buffers); - _endp.flush(_buffers); + currentState.compactBuffers(); //TODO: do we need it? + _endp.flush(currentState.getBuffers()); - // Are we complete? - for (ByteBuffer b : _buffers) + // Are we complete? + for (ByteBuffer b : currentState.getBuffers()) + { + if (b.hasRemaining()) { - if (b.hasRemaining()) - { + if(updateState(currentState)==null) + currentState.fail(failure); + else onIncompleteFlushed(); - return; - } + return; } - break; } - // we are complete and ready - Callback<Object> callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(); - callback.completed(context); + // If updateState didn't succeed, we don't care as our buffers have been written + updateState(idleState); + currentState.complete(); } catch (IOException e) { - Callback<Object> callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - if (!_writing.compareAndSet(true,false)) - throw new ConcurrentModificationException(); - callback.failed(context,e); + // If updateState didn't succeed, we don't care as writing our buffers failed + updateState(failedState); + currentState.fail(e); } - return; } - /* ------------------------------------------------------------ */ - /** - * Fail the write in progress and cause any calls to get to throw - * the cause wrapped as an execution exception. - * @return true if a write was in progress - */ - public synchronized boolean failed(Throwable cause) + public void failed(Throwable cause) { - if (!_writing.compareAndSet(true,false)) - return false; - Callback<Object> callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - callback.failed(context,cause); - return true; + failure = cause; + State currentState = _state.get(); + logger.debug("failed: s={} e={}", _state, cause); + updateState(failedState); + currentState.fail(cause); } - /* ------------------------------------------------------------ */ - /** - * Fail the write with a {@link ClosedChannelException}. This is similar - * to a call to {@link #failed(Throwable)}, except that the exception is - * not instantiated unless a write was in progress. - * @return true if a write was in progress - */ - public synchronized boolean close() + public void close() { - if (!_writing.compareAndSet(true,false)) - return false; - Callback<Object> callback=_callback; - Object context=_context; - _buffers=null; - _callback=null; - _context=null; - callback.failed(context,new ClosedChannelException()); - return true; + failed(new ClosedChannelException()); + } + + public boolean isWritePending() + { + return _state.get().getType() == StateType.PENDING; } - /* ------------------------------------------------------------ */ - public synchronized boolean isWriting() + //TODO: remove + State getState() { - return _writing.get(); + return _state.get(); } - /* ------------------------------------------------------------ */ @Override public String toString() { - return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context); + return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get()); } } |