Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Becker2012-08-03 11:57:17 +0000
committerThomas Becker2012-08-03 17:10:18 +0000
commit9f2d1586ca808dbb82bb5d8897e875e06af2ff48 (patch)
treea7a6cccfa3b5dc8f7fb697a2c81f5c03349c42d4
parent2d7b6c9c06f7e7ee73a2cb9ec8ae3b0e94adac2c (diff)
downloadorg.eclipse.jetty.project-9f2d1586ca808dbb82bb5d8897e875e06af2ff48.tar.gz
org.eclipse.jetty.project-9f2d1586ca808dbb82bb5d8897e875e06af2ff48.tar.xz
org.eclipse.jetty.project-9f2d1586ca808dbb82bb5d8897e875e06af2ff48.zip
jetty9 - WriteFlusher minor changes. Some ConcurrentTests for WriteFlusher added. Cleanup. Javadoc.
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java23
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java206
2 files changed, 115 insertions, 114 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
index 91f367c07a..9e51aa5dbb 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
@@ -62,11 +62,10 @@ abstract public class WriteFlusher
// IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE
//
// If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure.
+ // If a failure happens while in WRITING, but the the write has finished successfully or with an IOExceptions,
+ // the callback's complete or respectively failed methods will be called.
// If a failure happens in PENDING state, then the fail method calls the pending callback and moves to IDLE state
- // Otherwise if a fail happens, the state is set to FAIL, so that a subsequent attempt to move out of WRITING or COMPLETING
- // will discover the failure and call the callbacks before returning to IDLE
- // Thus the possible paths for a failure are:
- //
+ //
// IDLE--(fail)-->IDLE
// IDLE-->WRITING--(fail)-->FAILED-->IDLE
// IDLE-->WRITING-->PENDING--(fail)-->IDLE
@@ -94,7 +93,8 @@ abstract public class WriteFlusher
/**
* Tries to update the current state to the given new state.
- * @param nextState the desired new state
+ * @param previous the expected current state
+ * @param next the desired new state
* @return the previous state or null if the state transition failed
* @throws WritePendingException if currentState is WRITING and new state is WRITING (api usage error)
*/
@@ -135,10 +135,6 @@ abstract public class WriteFlusher
private boolean isTransitionAllowed(State currentState, State newState)
{
Set<StateType> allowedNewStateTypes = __stateTransitions.get(currentState.getType());
- if (currentState.getType() == StateType.WRITING && newState.getType() == StateType.WRITING)
- {
- throw new WritePendingException();
- }
if (!allowedNewStateTypes.contains(newState.getType()))
{
LOG.debug("StateType update: {} -> {} not allowed", currentState, newState);
@@ -288,7 +284,7 @@ abstract public class WriteFlusher
if (!updateState(__IDLE,__WRITING))
throw new WritePendingException();
-
+
try
{
_endPoint.flush(buffers);
@@ -333,12 +329,11 @@ abstract public class WriteFlusher
public void completeWrite()
{
State previous = _state.get();
- PendingState<?> pending=null;
-
+
if (previous.getType()!=StateType.PENDING)
return; // failure already handled.
- pending=(PendingState<?>)previous;
+ PendingState<?> pending = (PendingState<?>)previous;
if (!updateState(pending,__COMPLETING))
return; // failure already handled.
@@ -412,7 +407,7 @@ abstract public class WriteFlusher
onFail(new ClosedChannelException());
}
- public boolean isIdle()
+ boolean isIdle()
{
return _state.get().getType() == StateType.IDLE;
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
index cab02a6da9..b24022b305 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
@@ -1,6 +1,5 @@
package org.eclipse.jetty.io;
-import static junit.framework.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
@@ -21,7 +20,6 @@ import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -47,13 +45,14 @@ public class WriteFlusherTest
private WriteFlusher _flusher;
private final AtomicBoolean _flushIncomplete = new AtomicBoolean(false);
- private final String _context = new String("Context");
+ private final String _context = "Context";
+ private final ExecutorService executor = Executors.newFixedThreadPool(16);
private ByteArrayEndPoint _endp;
@Before
public void before()
{
- _endp = new ByteArrayEndPoint(new byte[]{},10);
+ _endp = new ByteArrayEndPoint(new byte[]{}, 10);
_flushIncomplete.set(false);
_flusher = new WriteFlusher(_endp)
{
@@ -72,7 +71,7 @@ public class WriteFlusherTest
FutureCallback<String> callback = new FutureCallback<>();
_flusher.onFail(new IOException("Ignored because no operation in progress"));
- _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
+ _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
@@ -80,14 +79,14 @@ public class WriteFlusherTest
equalTo(_endp.takeOutputString()));
assertTrue(_flusher.isIdle());
}
-
+
@Test
public void testCompleteNoBlocking() throws Exception
{
_endp.setGrowOutput(true);
FutureCallback<String> callback = new FutureCallback<>();
- _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
+ _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
assertThat("context and callback.get() are equal", _context, equalTo(callback.get()));
@@ -112,21 +111,21 @@ public class WriteFlusherTest
_endp.close();
FutureCallback<String> callback = new FutureCallback<>();
- _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
+ _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
- assertEquals(_context,callback.get());
+ assertEquals(_context, callback.get());
Assert.fail();
}
- catch(ExecutionException e)
+ catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
- Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED"));
+ Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
}
- assertEquals("",_endp.takeOutputString());
+ assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle());
}
@@ -135,14 +134,14 @@ public class WriteFlusherTest
public void testCompleteBlocking() throws Exception
{
FutureCallback<String> callback = new FutureCallback<>();
- _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
+ _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
assertTrue(_flushIncomplete.get());
try
{
- assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS));
+ assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
Assert.fail();
}
catch (TimeoutException to)
@@ -150,11 +149,11 @@ public class WriteFlusherTest
_flushIncomplete.set(false);
}
- assertEquals("How now br",_endp.takeOutputString());
+ assertEquals("How now br", _endp.takeOutputString());
_flusher.completeWrite();
assertCallbackIsDone(callback);
- assertEquals(_context,callback.get());
- assertEquals("own cow!",_endp.takeOutputString());
+ assertEquals(_context, callback.get());
+ assertEquals("own cow!", _endp.takeOutputString());
assertFlushIsComplete();
assertTrue(_flusher.isIdle());
}
@@ -163,7 +162,7 @@ public class WriteFlusherTest
public void testCloseWhileBlocking() throws Exception
{
FutureCallback<String> callback = new FutureCallback<>();
- _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
+ _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
@@ -171,7 +170,7 @@ public class WriteFlusherTest
assertTrue(_flushIncomplete.get());
try
{
- assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS));
+ assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
Assert.fail();
}
catch (TimeoutException to)
@@ -179,23 +178,23 @@ public class WriteFlusherTest
_flushIncomplete.set(false);
}
- assertEquals("How now br",_endp.takeOutputString());
+ assertEquals("How now br", _endp.takeOutputString());
_endp.close();
_flusher.completeWrite();
assertCallbackIsDone(callback);
assertFlushIsComplete();
try
{
- assertEquals(_context,callback.get());
+ assertEquals(_context, callback.get());
Assert.fail();
}
- catch(ExecutionException e)
+ catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
- Assert.assertThat(cause.getMessage(),Matchers.containsString("CLOSED"));
+ Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
}
- assertEquals("",_endp.takeOutputString());
+ assertEquals("", _endp.takeOutputString());
assertTrue(_flusher.isIdle());
}
@@ -203,7 +202,7 @@ public class WriteFlusherTest
public void testFailWhileBlocking() throws Exception
{
FutureCallback<String> callback = new FutureCallback<>();
- _flusher.write(_context,callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow!"));
+ _flusher.write(_context, callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
assertFalse(callback.isDone());
assertFalse(callback.isCancelled());
@@ -211,7 +210,7 @@ public class WriteFlusherTest
assertTrue(_flushIncomplete.get());
try
{
- assertEquals(_context,callback.get(10,TimeUnit.MILLISECONDS));
+ assertEquals(_context, callback.get(10, TimeUnit.MILLISECONDS));
Assert.fail();
}
catch (TimeoutException to)
@@ -226,17 +225,17 @@ public class WriteFlusherTest
assertFlushIsComplete();
try
{
- assertEquals(_context,callback.get());
+ assertEquals(_context, callback.get());
Assert.fail();
}
- catch(ExecutionException e)
+ catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
- Assert.assertThat(cause.getMessage(),Matchers.containsString("Failure"));
+ Assert.assertThat(cause.getMessage(), Matchers.containsString("Failure"));
}
assertEquals("", _endp.takeOutputString());
-
+
assertTrue(_flusher.isIdle());
}
@@ -245,29 +244,29 @@ public class WriteFlusherTest
final ByteArrayEndPoint _endp;
final SecureRandom _random;
final ScheduledThreadPoolExecutor _scheduler;
- final StringBuilder _content=new StringBuilder();
-
- ConcurrentFlusher(ByteArrayEndPoint endp,SecureRandom random, ScheduledThreadPoolExecutor scheduler)
+ final StringBuilder _content = new StringBuilder();
+
+ ConcurrentFlusher(ByteArrayEndPoint endp, SecureRandom random, ScheduledThreadPoolExecutor scheduler)
{
super(endp);
- _endp=endp;
- _random=random;
- _scheduler=scheduler;
+ _endp = endp;
+ _random = random;
+ _scheduler = scheduler;
}
-
+
@Override
protected void onIncompleteFlushed()
{
- _scheduler.schedule(this,1+_random.nextInt(9),TimeUnit.MILLISECONDS);
+ _scheduler.schedule(this, 1 + _random.nextInt(9), TimeUnit.MILLISECONDS);
}
-
+
@Override
public synchronized void run()
{
_content.append(_endp.takeOutputString());
completeWrite();
}
-
+
@Override
public synchronized String toString()
{
@@ -275,25 +274,25 @@ public class WriteFlusherTest
return _content.toString();
}
}
-
+
@Test
public void testConcurrent() throws Exception
{
final SecureRandom random = new SecureRandom();
final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
-
-
+
+
ConcurrentFlusher[] flushers = new ConcurrentFlusher[50000];
FutureCallback<?>[] futures = new FutureCallback<?>[flushers.length];
- for (int i=0;i<flushers.length;i++)
+ for (int i = 0; i < flushers.length; i++)
{
- int size=5+random.nextInt(15);
- ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{},size);
+ int size = 5 + random.nextInt(15);
+ ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[]{}, size);
- final ConcurrentFlusher flusher = new ConcurrentFlusher(endp,random,scheduler);
- flushers[i]=flusher;
+ final ConcurrentFlusher flusher = new ConcurrentFlusher(endp, random, scheduler);
+ flushers[i] = flusher;
final FutureCallback<String> callback = new FutureCallback<>();
- futures[i]=callback;
+ futures[i] = callback;
scheduler.schedule(new Runnable()
{
@Override
@@ -302,39 +301,37 @@ public class WriteFlusherTest
flusher.onFail(new Throwable("THE CAUSE"));
}
}
- ,random.nextInt(75)+1,TimeUnit.MILLISECONDS);
- flusher.write(_context,callback,BufferUtil.toBuffer("How Now Brown Cow."),BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
+ , random.nextInt(75) + 1, TimeUnit.MILLISECONDS);
+ flusher.write(_context, callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
}
- int completed=0;
- int failed=0;
-
- for (int i=0;i<flushers.length;i++)
+ int completed = 0;
+ int failed = 0;
+
+ for (int i = 0; i < flushers.length; i++)
{
try
{
futures[i].get();
- assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!",flushers[i].toString());
+ assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!", flushers[i].toString());
completed++;
}
catch (Exception e)
{
- assertThat(e.getMessage(),Matchers.containsString("THE CAUSE"));
+ assertThat(e.getMessage(), Matchers.containsString("THE CAUSE"));
failed++;
- }
+ }
}
-
- assertThat(completed,Matchers.greaterThan(0));
- assertThat(failed,Matchers.greaterThan(0));
-
+
+ assertThat(completed, Matchers.greaterThan(0));
+ assertThat(failed, Matchers.greaterThan(0));
+
scheduler.shutdown();
}
-
+
@Test
- @Ignore
public void testConcurrentAccessToWriteAndFailed() throws IOException, InterruptedException, ExecutionException
{
- ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCompleteLatch = new CountDownLatch(1);
@@ -400,11 +397,10 @@ public class WriteFlusherTest
}
}
- @Ignore("Intermittent failures.") //TODO: fixme
@Test(expected = WritePendingException.class)
public void testConcurrentAccessToWrite() throws Throwable
{
- ExecutorService executor = Executors.newFixedThreadPool(16);
+ final CountDownLatch flushCalledLatch = new CountDownLatch(1);
final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
{
@@ -420,13 +416,16 @@ public class WriteFlusherTest
@Override
public Object answer(InvocationOnMock invocation) throws Throwable
{
+ flushCalledLatch.countDown();
// make sure we stay here, so write is called twice at the same time
Thread.sleep(5000);
return null;
}
});
- executor.submit(new Writer(writeFlusher, new FutureCallback()));
+ executor.submit(new Writer(writeFlusher, new FutureCallback<String>()));
+ // make sure that we call .get() on the write that executed second by waiting on this latch
+ assertThat("Flush has been called once", flushCalledLatch.await(5, TimeUnit.SECONDS), is(true));
try
{
executor.submit(new Writer(writeFlusher, new FutureCallback())).get();
@@ -459,16 +458,14 @@ public class WriteFlusherTest
}
@Test
- @Ignore
- public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException
+ public void testConcurrentAccessToIncompleteWriteAndFailed() throws IOException, InterruptedException, ExecutionException, TimeoutException
{
- ExecutorService executor = Executors.newFixedThreadPool(16);
final CountDownLatch failedCalledLatch = new CountDownLatch(1);
final CountDownLatch onIncompleteFlushedCalledLatch = new CountDownLatch(1);
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1);
- final WriteFlusher writeFlusher = new WriteFlusher(_endPointMock)
+ final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch))
{
protected void onIncompleteFlushed()
{
@@ -478,8 +475,6 @@ public class WriteFlusherTest
}
};
- endPointFlushExpectationPendingWrite(writeCalledLatch, failedCalledLatch);
-
ExposingStateCallback callback = new ExposingStateCallback();
executor.submit(new Writer(writeFlusher, callback));
assertThat("Write has been called.", writeCalledLatch.await(5, TimeUnit.SECONDS), is(true));
@@ -487,44 +482,55 @@ public class WriteFlusherTest
assertThat("Failed has been called.", failedCalledLatch.await(5, TimeUnit.SECONDS), is(true));
writeFlusher.write(_context, new FutureCallback<String>(), BufferUtil.toBuffer("foobar"));
assertThat("completeWrite done", completeWrite.await(5, TimeUnit.SECONDS), is(true));
+ callback.get(5, TimeUnit.SECONDS);
+ assertThat("callback failed has not been called", callback.isFailed(), is(false));
+ assertThat("callback complete has been called", callback.isCompleted(), is(true));
}
-
- //TODO: combine with endPointFlushExpectation
- private void endPointFlushExpectationPendingWrite(final CountDownLatch writeCalledLatch, final CountDownLatch
- failedCalledLatch)
- throws
- IOException
+ private static class EndPointMock extends ByteArrayEndPoint
{
- when(_endPointMock.flush(any(ByteBuffer[].class))).thenAnswer(new Answer<Object>()
+ private final CountDownLatch writeCalledLatch;
+ private final CountDownLatch failedCalledLatch;
+
+ public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
{
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable
+ this.writeCalledLatch = writeCalledLatch;
+ this.failedCalledLatch = failedCalledLatch;
+ }
+
+ @Override
+ public int flush(ByteBuffer... buffers) throws IOException
+ {
+ writeCalledLatch.countDown();
+ ByteBuffer byteBuffer = buffers[0];
+ int oldPos = byteBuffer.position();
+ if (byteBuffer.remaining() == 2)
{
- writeCalledLatch.countDown();
- Object[] arguments = invocation.getArguments();
- ByteBuffer byteBuffer = (ByteBuffer)arguments[0];
- int oldPos = byteBuffer.position();
- if (byteBuffer.remaining() == 2)
+ // make sure failed is called before we go on
+ try
{
- // make sure failed is called before we go on
failedCalledLatch.await(5, TimeUnit.SECONDS);
- BufferUtil.flipToFill(byteBuffer);
- }
- else if (byteBuffer.remaining() == 3)
- {
- byteBuffer.position(1); // pretend writing one byte
- return 1;
}
- else
+ catch (InterruptedException e)
{
- byteBuffer.position(byteBuffer.limit());
+ e.printStackTrace();
}
- return byteBuffer.limit() - oldPos;
+ BufferUtil.flipToFill(byteBuffer);
}
- });
+ else if (byteBuffer.remaining() == 3)
+ {
+ byteBuffer.position(1); // pretend writing one byte
+ return 1;
+ }
+ else
+ {
+ byteBuffer.position(byteBuffer.limit());
+ }
+ return byteBuffer.limit() - oldPos;
+ }
}
+
private static class FailedCaller implements Callable
{
private final WriteFlusher writeFlusher;
@@ -550,7 +556,7 @@ public class WriteFlusherTest
private final WriteFlusher writeFlusher;
private FutureCallback<String> callback;
- public Writer(WriteFlusher writeFlusher, FutureCallback callback)
+ public Writer(WriteFlusher writeFlusher, FutureCallback<String> callback)
{
this.writeFlusher = writeFlusher;
this.callback = callback;

Back to the top