aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2012-08-02 08:38:54 (EDT)
committerThomas Becker2012-08-02 08:38:54 (EDT)
commit4e9460161944b5ccfa4bf1b5494d96ef395a0e76 (patch)
tree0d61e04770fbd4643a34f5cd328ee49a175a4df5
parenta480e2c94d8bea836a6fe190782cc3e52045c001 (diff)
downloadorg.eclipse.jetty.project-4e9460161944b5ccfa4bf1b5494d96ef395a0e76.zip
org.eclipse.jetty.project-4e9460161944b5ccfa4bf1b5494d96ef395a0e76.tar.gz
org.eclipse.jetty.project-4e9460161944b5ccfa4bf1b5494d96ef395a0e76.tar.bz2
jetty-9: Make WriteFlusher threadsafe.
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java2
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java10
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java362
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java18
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java12
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java246
-rw-r--r--jetty-io/src/test/resources/jetty-logging.properties3
-rw-r--r--jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties5
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java1
9 files changed, 473 insertions, 186 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java
index 52d4b92..b1fcd44 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncByteArrayEndPoint.java
@@ -85,7 +85,7 @@ public class AsyncByteArrayEndPoint extends ByteArrayEndPoint implements AsyncEn
long idleElapsed = System.currentTimeMillis() - idleTimestamp;
long idleLeft = idleTimeout - idleElapsed;
- if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
+ if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWritePending())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index f373163..683b950 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -170,7 +170,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
LOG.debug("{} idle timeout check, elapsed: {} ms, remaining: {} ms", this, idleElapsed, idleLeft);
- if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWriting())
+ if (isOutputShutdown() || _readInterest.isInterested() || _writeFlusher.isWritePending())
{
if (idleTimestamp != 0 && idleTimeout > 0)
{
@@ -178,13 +178,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
LOG.debug("{} idle timeout expired", this);
- if (isOutputShutdown())
- close();
- notIdle();
-
TimeoutException timeout = new TimeoutException("Idle timeout expired: " + idleElapsed + "/" + idleTimeout + " ms");
_readInterest.failed(timeout);
_writeFlusher.failed(timeout);
+
+ if (isOutputShutdown())
+ close();
+ notIdle();
}
}
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
index 868846c..72d9b7e 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
@@ -1,98 +1,239 @@
+// ========================================================================
+// Copyright (c) 2012-2012 Mort Bay Consulting Pty. Ltd.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritePendingException;
-import java.util.ConcurrentModificationException;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.log.Log;
+import org.eclipse.jetty.util.log.Logger;
-/* ------------------------------------------------------------ */
/**
* A Utility class to help implement {@link AsyncEndPoint#write(Object, Callback, ByteBuffer...)}
* by calling {@link EndPoint#flush(ByteBuffer...)} until all content is written.
* The abstract method {@link #onIncompleteFlushed()} is called when not all content has been
* written after a call to flush and should organise for the {@link #completeWrite()}
* method to be called when a subsequent call to flush should be able to make more progress.
- *
- * TODO remove synchronisation
*/
abstract public class WriteFlusher
{
- private final static ByteBuffer[] NO_BUFFERS= new ByteBuffer[0];
- private final AtomicBoolean _writing = new AtomicBoolean(false);
+ private static final Logger logger = Log.getLogger(WriteFlusher.class);
+ private final static ByteBuffer[] NO_BUFFERS = new ByteBuffer[0];
private final EndPoint _endp;
-
- private ByteBuffer[] _buffers;
- private Object _context;
- private Callback<Object> _callback;
+ private final AtomicReference<State> _state = new AtomicReference<>();
+ private final EnumMap<StateType, Set<StateType>> __stateTransitions = new EnumMap<>(StateType.class); //TODO: static
+ private final State idleState = new IdleState(); //TODO: static all of them
+ private final State writingState = new WritingState();
+ private final State failedState = new FailedState();
+ private final State completingState = new CompletedState();
+ private volatile Throwable failure;
protected WriteFlusher(EndPoint endp)
{
- _endp=endp;
+ _state.set(idleState);
+ _endp = endp;
+
+ // fill the state machine
+ __stateTransitions.put(StateType.IDLE, new HashSet<StateType>());
+ __stateTransitions.put(StateType.WRITING, new HashSet<StateType>());
+ __stateTransitions.put(StateType.PENDING, new HashSet<StateType>());
+ __stateTransitions.put(StateType.COMPLETING, new HashSet<StateType>());
+ __stateTransitions.put(StateType.FAILED, new HashSet<StateType>());
+
+ __stateTransitions.get(StateType.IDLE).add(StateType.WRITING);
+ __stateTransitions.get(StateType.WRITING).add(StateType.IDLE);
+ __stateTransitions.get(StateType.WRITING).add(StateType.PENDING);
+ __stateTransitions.get(StateType.WRITING).add(StateType.FAILED);
+ __stateTransitions.get(StateType.PENDING).add(StateType.IDLE);
+ __stateTransitions.get(StateType.PENDING).add(StateType.COMPLETING);
+ __stateTransitions.get(StateType.PENDING).add(StateType.FAILED);
+ __stateTransitions.get(StateType.COMPLETING).add(StateType.IDLE);
+ __stateTransitions.get(StateType.COMPLETING).add(StateType.PENDING);
+ __stateTransitions.get(StateType.COMPLETING).add(StateType.FAILED);
+
+ __stateTransitions.get(StateType.IDLE).add(StateType.IDLE); // TODO: should never happen?! Probably remove this transition and just throw as this indicates a bug
}
- private enum State
+ private enum StateType
{
IDLE,
WRITING,
- CLOSED
+ PENDING,
+ COMPLETING,
+ FAILED
+ }
+
+ private State updateState(State newState)
+ {
+ State currentState = _state.get();
+ boolean updated = false;
+
+ while (!updated)
+ {
+ if(!isTransitionAllowed(newState, currentState))
+ return null; // return false + currentState
+
+ updated = _state.compareAndSet(currentState, newState);
+ logger.debug("StateType update: {} -> {} {}", currentState, newState, updated ? "" : "failed");
+ if (!updated)
+ currentState = _state.get();
+ }
+ // We need to return true and currentState
+ return currentState;
+ }
+
+ private boolean isTransitionAllowed(State newState, State currentState)
+ {
+ Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
+ if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING)
+ {
+ logger.debug("WRITE PENDING EXCEPTION"); //TODO: thomas remove, we don't log and throw
+ throw new WritePendingException();
+ }
+ if (!allowedNewStateTypes.contains(newState.getType()))
+ {
+ logger.debug("{} -> {} not allowed.", currentState.getType(), newState.getType()); //thomas remove
+ return false;
+ }
+ return true;
}
- private abstract class WriteFlusherState
+ private abstract class State
{
- private State _state;
- private ByteBuffer[] _buffers;
- private Object _context;
- private Callback<Object> _callback;
+ protected StateType _type;
+ protected ByteBuffer[] _buffers;
+ protected Object _context;
+ protected Callback<Object> _callback;
- private WriteFlusherState(State state, ByteBuffer[] buffers, Object context, Callback<Object> callback)
+ private State(StateType stateType, ByteBuffer[] buffers, Object context, Callback<Object> callback)
{
- _state = state;
+ _type = stateType;
_buffers = buffers;
_context = context;
_callback = callback;
}
+
+ /**
+ * In most States this is a noop. In others it needs to be overwritten.
+ *
+ * @param cause
+ */
+ protected void fail(Throwable cause)
+ {
+ }
+
+ /**
+ * In most States this is a noop. In others it needs to be overwritten.
+ */
+ protected void complete()
+ {
+ }
+
+ public StateType getType()
+ {
+ return _type;
+ }
+
+ public void compactBuffers()
+ {
+ this._buffers = compact(_buffers);
+ }
+
+ public ByteBuffer[] getBuffers()
+ {
+ return _buffers;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s", _type);
+ }
}
- private class WriteFlusherIdleState extends WriteFlusherState
+ private class IdleState extends State
{
- private WriteFlusherIdleState()
+ private IdleState()
{
- super(null,null,null,null);
+ super(StateType.IDLE, null, null, null);
}
}
- private class WriteFlusherWritingState extends WriteFlusherState
+ private class WritingState extends State
{
- private WriteFlusherWritingState(State state, ByteBuffer[] buffers, Object context, Callback<Object> callback)
+ private WritingState()
{
- super(state, buffers, context, callback);
+ super(StateType.WRITING, null, null, null);
}
}
- private class WriteFlusherClosingState extends WriteFlusherState
+ private class FailedState extends State
{
- private WriteFlusherClosingState()
+ private FailedState()
{
- super(null,null,null,null);
+ super(StateType.FAILED, null, null, null);
}
}
- /* ------------------------------------------------------------ */
- public synchronized <C> void write(C context, Callback<C> callback, ByteBuffer... buffers)
+ private class CompletedState extends State
{
- if (callback==null)
+ private CompletedState()
+ {
+ super(StateType.COMPLETING, null, null, null);
+ }
+ }
+
+ private class PendingState extends State
+ {
+ private PendingState(ByteBuffer[] buffers, Object context, Callback<Object> callback)
+ {
+ super(StateType.PENDING, buffers, context, callback);
+ }
+
+ @Override
+ protected void fail(Throwable cause)
+ {
+ _callback.failed(_context, cause);
+ }
+
+ @Override
+ protected void complete()
+ {
+ _callback.completed(_context);
+ }
+ }
+
+ public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers)
+ {
+ logger.debug("write: starting write. {}", _state); //thomas
+ if (callback == null)
throw new IllegalArgumentException();
- if (!_writing.compareAndSet(false,true))
- throw new WritePendingException();
+ if(updateState(writingState) == null)
+ {
+ callback.failed(context, failure);
+ return;
+ }
try
{
-
_endp.flush(buffers);
// Are we complete?
@@ -100,160 +241,123 @@ abstract public class WriteFlusher
{
if (b.hasRemaining())
{
- _buffers=buffers;
- _context=context;
- _callback=(Callback<Object>)callback;
- _writing.set(true); // Needed as memory barrier
- onIncompleteFlushed();
+ if(updateState(new PendingState(buffers, context, (Callback<Object>)callback)) == null)
+ callback.failed(context, failure);
+ else
+ onIncompleteFlushed();
return;
}
}
-
- if (!_writing.compareAndSet(true,false))
- throw new ConcurrentModificationException();
+ // If updateState didn't succeed, we don't care as our buffers have been written
+ updateState(idleState);
callback.completed(context);
}
catch (IOException e)
{
- if (!_writing.compareAndSet(true,false))
- throw new ConcurrentModificationException(e);
- callback.failed(context,e);
+ // If updateState didn't succeed, we don't care as writing our buffers failed
+ updateState(failedState);
+ callback.failed(context, e);
}
}
- /* ------------------------------------------------------------ */
/**
- * Abstract call to be implemented by specific WriteFlushers.
+ * Abstract call to be implemented by specific WriteFlushers.
* It should schedule a call to {@link #completeWrite()} or
* {@link #failed(Throwable)} when appropriate.
+ *
* @return true if a flush can proceed.
*/
abstract protected void onIncompleteFlushed();
- /* ------------------------------------------------------------ */
/* Remove empty buffers from the start of a multi buffer array
*/
- private synchronized ByteBuffer[] compact(ByteBuffer[] buffers)
+ private ByteBuffer[] compact(ByteBuffer[] buffers)
{
- if (buffers.length<2)
+ if (buffers.length < 2)
return buffers;
- int b=0;
- while (b<buffers.length && BufferUtil.isEmpty(buffers[b]))
+ int b = 0;
+ while (b < buffers.length && BufferUtil.isEmpty(buffers[b]))
b++;
- if (b==0)
+ if (b == 0)
return buffers;
- if (b==buffers.length)
+ if (b == buffers.length)
return NO_BUFFERS;
- ByteBuffer[] compact=new ByteBuffer[buffers.length-b];
- System.arraycopy(buffers,b,compact,0,compact.length);
+ ByteBuffer[] compact = new ByteBuffer[buffers.length - b];
+ System.arraycopy(buffers, b, compact, 0, compact.length);
return compact;
}
- /* ------------------------------------------------------------ */
/**
* Complete a write that has not completed and that called
* {@link #onIncompleteFlushed()} to request a call to this
* method when a call to {@link EndPoint#flush(ByteBuffer...)}
* is likely to be able to progress.
*/
- public synchronized void completeWrite()
+ public void completeWrite()
{
- if (!isWriting())
- return; // TODO throw?
+ State currentState = updateState(completingState);
+ if (currentState == null || currentState.getType() != StateType.PENDING)
+ return;
try
{
- while(true)
- {
- _buffers=compact(_buffers);
- _endp.flush(_buffers);
+ currentState.compactBuffers(); //TODO: do we need it?
+ _endp.flush(currentState.getBuffers());
- // Are we complete?
- for (ByteBuffer b : _buffers)
+ // Are we complete?
+ for (ByteBuffer b : currentState.getBuffers())
+ {
+ if (b.hasRemaining())
{
- if (b.hasRemaining())
- {
+ if(updateState(currentState)==null)
+ currentState.fail(failure);
+ else
onIncompleteFlushed();
- return;
- }
+ return;
}
- break;
}
- // we are complete and ready
- Callback<Object> callback=_callback;
- Object context=_context;
- _buffers=null;
- _callback=null;
- _context=null;
- if (!_writing.compareAndSet(true,false))
- throw new ConcurrentModificationException();
- callback.completed(context);
+ // If updateState didn't succeed, we don't care as our buffers have been written
+ updateState(idleState);
+ currentState.complete();
}
catch (IOException e)
{
- Callback<Object> callback=_callback;
- Object context=_context;
- _buffers=null;
- _callback=null;
- _context=null;
- if (!_writing.compareAndSet(true,false))
- throw new ConcurrentModificationException();
- callback.failed(context,e);
+ // If updateState didn't succeed, we don't care as writing our buffers failed
+ updateState(failedState);
+ currentState.fail(e);
}
- return;
}
- /* ------------------------------------------------------------ */
- /**
- * Fail the write in progress and cause any calls to get to throw
- * the cause wrapped as an execution exception.
- * @return true if a write was in progress
- */
- public synchronized boolean failed(Throwable cause)
+ public void failed(Throwable cause)
{
- if (!_writing.compareAndSet(true,false))
- return false;
- Callback<Object> callback=_callback;
- Object context=_context;
- _buffers=null;
- _callback=null;
- _context=null;
- callback.failed(context,cause);
- return true;
+ failure = cause;
+ State currentState = _state.get();
+ logger.debug("failed: s={} e={}", _state, cause);
+ updateState(failedState);
+ currentState.fail(cause);
}
- /* ------------------------------------------------------------ */
- /**
- * Fail the write with a {@link ClosedChannelException}. This is similar
- * to a call to {@link #failed(Throwable)}, except that the exception is
- * not instantiated unless a write was in progress.
- * @return true if a write was in progress
- */
- public synchronized boolean close()
+ public void close()
{
- if (!_writing.compareAndSet(true,false))
- return false;
- Callback<Object> callback=_callback;
- Object context=_context;
- _buffers=null;
- _callback=null;
- _context=null;
- callback.failed(context,new ClosedChannelException());
- return true;
+ failed(new ClosedChannelException());
+ }
+
+ public boolean isWritePending()
+ {
+ return _state.get().getType() == StateType.PENDING;
}
- /* ------------------------------------------------------------ */
- public synchronized boolean isWriting()
+ //TODO: remove
+ State getState()
{
- return _writing.get();
+ return _state.get();
}
- /* ------------------------------------------------------------ */
@Override
public String toString()
{
- return String.format("WriteFlusher@%x{%b,%s,%s}",hashCode(),isWriting(),_callback,_context);
+ return String.format("WriteFlusher@%x{%s}", hashCode(), _state.get());
}
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java
deleted file mode 100644
index e49693d..0000000
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusherTest.java
+++ /dev/null
@@ -1,18 +0,0 @@
-//========================================================================
-//Copyright 2012 Mort Bay Consulting Pty. Ltd.
-//------------------------------------------------------------------------
-//All rights reserved. This program and the accompanying materials
-//are made available under the terms of the Eclipse Public License v1.0
-//and Apache License v2.0 which accompanies this distribution.
-//The Eclipse Public License is available at
-//http://www.eclipse.org/legal/epl-v10.html
-//The Apache License v2.0 is available at
-//http://www.opensource.org/licenses/apache2.0.php
-//You may elect to redistribute this code under either of these licenses.
-//========================================================================
-package org.eclipse.jetty.io;
-
-@RunWith(Mockit)
-public class WriteFlusherTest
-{
-}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
index 025e7aa..a6e5618 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
@@ -109,7 +109,7 @@ public class SslConnection extends AbstractAsyncConnection
_appEndPoint._readInterest.readable();
// If we are handshaking, then wake up any waiting write as well as it may have been blocked on the read
- if (_appEndPoint._writeFlusher.isWriting() && _appEndPoint._flushUnwrap)
+ if (_appEndPoint._writeFlusher.isWritePending() && _appEndPoint._flushUnwrap)
{
_appEndPoint._flushUnwrap = false;
_appEndPoint._writeFlusher.completeWrite();
@@ -125,7 +125,7 @@ public class SslConnection extends AbstractAsyncConnection
if (_appEndPoint._readInterest.isInterested())
_appEndPoint._readInterest.failed(cause);
- if (_appEndPoint._writeFlusher.isWriting() && _appEndPoint._flushUnwrap)
+ if (_appEndPoint._writeFlusher.isWritePending() && _appEndPoint._flushUnwrap)
{
_appEndPoint._flushUnwrap = false;
_appEndPoint._writeFlusher.failed(cause);
@@ -141,7 +141,7 @@ public class SslConnection extends AbstractAsyncConnection
hashCode(),
_sslEngine.getHandshakeStatus(),
_appEndPoint._readInterest.isInterested() ? "R" : "",
- _appEndPoint._writeFlusher.isWriting() ? "W" : "");
+ _appEndPoint._writeFlusher.isWritePending() ? "W" : "");
}
/* ------------------------------------------------------------ */
@@ -183,7 +183,7 @@ public class SslConnection extends AbstractAsyncConnection
_readInterest.readable();
}
- if (_writeFlusher.isWriting())
+ if (_writeFlusher.isWritePending())
_writeFlusher.completeWrite();
}
}
@@ -204,7 +204,7 @@ public class SslConnection extends AbstractAsyncConnection
_readInterest.failed(x);
}
- if (_writeFlusher.isWriting())
+ if (_writeFlusher.isWritePending())
_writeFlusher.failed(x);
// TODO release all buffers??? or may in onClose
@@ -640,7 +640,7 @@ public class SslConnection extends AbstractAsyncConnection
@Override
public String toString()
{
- return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWriting() ? "W" : "", _netWriting ? "w" : "");
+ return String.format("%s{%s%s%s}", super.toString(), _readInterest.isInterested() ? "R" : "", _writeFlusher.isWritePending() ? "W" : "", _netWriting ? "w" : "");
}
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
index 1a4702f..1bf3234 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
@@ -1,11 +1,13 @@
package org.eclipse.jetty.io;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.assertFalse;
-
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritePendingException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -13,25 +15,41 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
public class WriteFlusherTest
{
+ @Mock
+ EndPoint _endPointMock;
+
ByteArrayEndPoint _endp;
final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
WriteFlusher _flusher;
final String _context = new String("Context");
-
+
@Before
public void before()
{
_endp = new ByteArrayEndPoint(new byte[]{},10);
_flushIncomplete.set(false);
_flusher = new WriteFlusher(_endp)
- {
+ {
@Override
protected void onIncompleteFlushed()
{
@@ -39,18 +57,12 @@ public class WriteFlusherTest
}
};
}
-
- @After
- public void after()
- {
-
- }
-
+
@Test
public void testCompleteNoBlocking() throws Exception
{
_endp.setGrowOutput(true);
-
+
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertTrue(callback.isDone());
@@ -58,12 +70,12 @@ public class WriteFlusherTest
assertEquals(_context,callback.get());
assertEquals("How now brown cow!",_endp.takeOutputString());
}
-
+
@Test
public void testClosedNoBlocking() throws Exception
{
_endp.close();
-
+
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertTrue(callback.isDone());
@@ -81,16 +93,16 @@ public class WriteFlusherTest
}
assertEquals("",_endp.takeOutputString());
}
-
+
@Test
public void testCompleteBlocking() throws Exception
- {
+ {
FutureCallback<String> callback = new FutureCallback<>();
_flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
-
+
assertTrue(_flushIncomplete.get());
try
{
@@ -109,7 +121,7 @@ public class WriteFlusherTest
assertEquals("own cow!",_endp.takeOutputString());
assertFalse(_flushIncomplete.get());
}
-
+
@Test
public void testCloseWhileBlocking() throws Exception
{
@@ -118,7 +130,7 @@ public class WriteFlusherTest
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
-
+
assertTrue(_flushIncomplete.get());
try
{
@@ -157,7 +169,7 @@ public class WriteFlusherTest
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
-
+
assertTrue(_flushIncomplete.get());
try
{
@@ -169,7 +181,7 @@ public class WriteFlusherTest
_flushIncomplete.set(false);
}
- assertEquals("How now br",_endp.takeOutputString());
+ assertEquals("How now br", _endp.takeOutputString());
_flusher.failed(new IOException("Failure"));
_flusher.completeWrite();
assertTrue(callback.isDone());
@@ -185,7 +197,189 @@ public class WriteFlusherTest
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure"));
}
- assertEquals("",_endp.takeOutputString());
+ assertEquals("", _endp.takeOutputString());
+ }
+
+ @Test
+ public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(16);
+ final CountDownLatch failedCalledLatch = new CountDownLatch(1);
+ final CountDownLatch writeCalledLatch = new CountDownLatch(1);
+
+ final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
+ {
+ @Override
+ protected void onIncompleteFlushed()
+ {
+ }
+ };
+
+ endPointFlushExpectation(writeCalledLatch);
+
+ executor.submit(new Writer(writeFlusher, new FutureCallback()));
+ assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
+ executor.submit(new FailedCaller(writeFlusher, failedCalledLatch)).get();
+ }
+
+ @Test(expected = WritePendingException.class)
+ public void testConcurrentAccessToWrite() throws Throwable, InterruptedException, ExecutionException
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(16);
+ final CountDownLatch writeCalledLatch = new CountDownLatch(2);
+
+ final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
+ {
+ @Override
+ protected void onIncompleteFlushed()
+ {
+ }
+ };
+
+ endPointFlushExpectation(writeCalledLatch);
+
+ executor.submit(new Writer(writeFlusher, new FutureCallback()));
+ try
+ {
+ executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
+ }
+ catch (ExecutionException e)
+ {
+ throw e.getCause();
+ }
+ }
+
+ private void endPointFlushExpectation(final CountDownLatch writeCalledLatch) throws IOException
+ {
+ // add a small delay to make concurrent access more likely
+ when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ Object[] arguments = invocation.getArguments();
+ ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
+ BufferUtil.flipToFill(byteBuffer); // pretend everything has written
+ writeCalledLatch.countDown();
+ Thread.sleep(1000);
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(16);
+ final CountDownLatch failedCalledLatch = new CountDownLatch(1);
+ final CountDownLatch writeCalledLatch = new CountDownLatch(1);
+ final CountDownLatch completeWrite = new CountDownLatch(1);
+
+ final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
+ {
+ protected void onIncompleteFlushed()
+ {
+ writeCalledLatch.countDown();
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " onIncompleteFlushed: calling completeWrite " + writeCalledLatch.getCount()); //thomas
+ try
+ {
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " going to sleep " + getState());
+ Thread.sleep(1000);
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " woken up");
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " completeWrite call");
+ completeWrite();
+ completeWrite.countDown();
+ }
+ };
+
+ endPointFlushExpectationPendingWrite();
+
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " SUBMITTING WRITE");
+ executor.submit(new Writer(writeFlusher, new FutureCallback()));
+ assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " SUBMITTING FAILED " + writeFlusher.getState());
+ executor.submit(new FailedCaller(writeFlusher, failedCalledLatch));
+ assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " Calling write again " + writeFlusher.getState());
+ writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar"));
+ assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
+ }
+
+
+ //TODO: combine with endPointFlushExpectation
+ private void endPointFlushExpectationPendingWrite() throws IOException
+ {
+ when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
+ {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
+ {
+ Object[] arguments = invocation.getArguments();
+ ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
+ int oldPos = byteBuffer.position();
+ if (byteBuffer.remaining() == 2)
+ {
+ Thread.sleep(1000);
+ BufferUtil.flipToFill(byteBuffer);
+ }
+ else if (byteBuffer.remaining() == 3)
+ {
+ byteBuffer.position(1); // pretend writing one byte
+ return 1;
+ }
+ else
+ {
+ byteBuffer.position(byteBuffer.limit());
+ }
+ return byteBuffer.limit() - oldPos;
+ }
+ });
+ }
+
+ private static class FailedCaller implements Callable
+ {
+ private final WriteFlusher writeFlusher;
+ private CountDownLatch failedCalledLatch;
+
+ public FailedCaller(WriteFlusher writeFlusher, CountDownLatch failedCalledLatch)
+ {
+ this.writeFlusher = writeFlusher;
+ this.failedCalledLatch = failedCalledLatch;
+ }
+
+ @Override
+ public FutureCallback call()
+ {
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " Calling writeFlusher.failed()");
+ writeFlusher.failed(new IllegalStateException());
+ System.out.println(System.currentTimeMillis() + ":" + Thread.currentThread().getName() + " COUNTING FAILED DOWN");
+ failedCalledLatch.countDown();
+ return null;
+ }
+ }
+
+ private class Writer implements Callable
+ {
+ private final WriteFlusher writeFlusher;
+ private FutureCallback<String> callback;
+
+ public Writer(WriteFlusher writeFlusher, FutureCallback callback)
+ {
+ this.writeFlusher = writeFlusher;
+ this.callback = callback;
+ }
+
+ @Override
+ public FutureCallback call()
+ {
+ writeFlusher.write(_context, callback, BufferUtil.toBuffer("foo"));
+ return callback;
+ }
}
-
}
diff --git a/jetty-io/src/test/resources/jetty-logging.properties b/jetty-io/src/test/resources/jetty-logging.properties
new file mode 100644
index 0000000..5e19468
--- /dev/null
+++ b/jetty-io/src/test/resources/jetty-logging.properties
@@ -0,0 +1,3 @@
+org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
+org.eclipse.jetty.io.LEVEL=DEBUG
+#thomas
diff --git a/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties b/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties
index 5250a08..6e1c0df 100644
--- a/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties
+++ b/jetty-spdy/spdy-jetty/src/test/resources/jetty-logging.properties
@@ -1,2 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
-org.eclipse.jetty.spdy.LEVEL=WARN
+org.eclipse.jetty.spdy.LEVEL=DEBUG
+#org.eclipse.jetty.io.LEVEL=DEBUG
+
+# thomas
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java
index 8b5cabe..d425ce3 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java
@@ -10,6 +10,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+//TODO: Simplify, get rid of DOING. Probably replace states with AtomicBoolean
public class FutureCallback<C> implements Future<C>,Callback<C>
{
// TODO investigate use of a phasor