Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2014-09-18 05:31:59 +0000
committerGreg Wilkins2014-09-18 05:31:59 +0000
commit3b066ca2aef2ccbd27e4ac6ca216d8551a1ece3f (patch)
treec4bb6b3f77c343b3684921f48a2def9637c1ef66
parent88f4e5ab86168b8d6edbb5ed033cd18c6542ae9a (diff)
downloadorg.eclipse.jetty.project-3b066ca2aef2ccbd27e4ac6ca216d8551a1ece3f.tar.gz
org.eclipse.jetty.project-3b066ca2aef2ccbd27e4ac6ca216d8551a1ece3f.tar.xz
org.eclipse.jetty.project-3b066ca2aef2ccbd27e4ac6ca216d8551a1ece3f.zip
444415 iterative WriteFlusher
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java60
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java44
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java101
3 files changed, 175 insertions, 30 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
index b2daffb218..b2910dda39 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
@@ -334,13 +334,13 @@ abstract public class WriteFlusher
try
{
- boolean flushed=_endPoint.flush(buffers);
- if (DEBUG)
- LOG.debug("flushed {}", flushed);
+ buffers=flush(buffers);
// if we are incomplete?
- if (!flushed)
+ if (buffers!=null)
{
+ if (DEBUG)
+ LOG.debug("flushed incomplete");
PendingState pending=new PendingState(buffers, callback);
if (updateState(__WRITING,pending))
onIncompleteFlushed();
@@ -379,7 +379,7 @@ abstract public class WriteFlusher
* {@link #onFail(Throwable)} or {@link #onClose()}
*/
public void completeWrite()
- {
+ {
if (DEBUG)
LOG.debug("completeWrite: {}", this);
@@ -396,13 +396,15 @@ abstract public class WriteFlusher
{
ByteBuffer[] buffers = pending.getBuffers();
- boolean flushed=_endPoint.flush(buffers);
- if (DEBUG)
- LOG.debug("flushed {}", flushed);
+ buffers=flush(buffers);
// if we are incomplete?
- if (!flushed)
+ if (buffers!=null)
{
+ if (DEBUG)
+ LOG.debug("flushed incomplete {}",BufferUtil.toDetailString(buffers));
+ if (buffers!=pending.getBuffers())
+ pending=new PendingState(buffers, pending._callback);
if (updateState(__COMPLETING,pending))
onIncompleteFlushed();
else
@@ -427,6 +429,46 @@ abstract public class WriteFlusher
}
/* ------------------------------------------------------------ */
+ /** Flush the buffers iteratively until no progress is made
+ * @param buffers The buffers to flush
+ * @return The unflushed buffers, or null if all flushed
+ * @throws IOException
+ */
+ protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
+ {
+ // try the simple direct flush first, which also ensures that any null buffer
+ // flushes are passed through (for commits etc.)
+ if (_endPoint.flush(buffers))
+ return null;
+
+ // We were not fully flushed, so let's try again iteratively while we can make
+ // some progress
+ boolean progress=true;
+ while(true)
+ {
+ // Compact buffers array?
+ int not_empty=0;
+ while(not_empty<buffers.length && BufferUtil.isEmpty(buffers[not_empty]))
+ not_empty++;
+ if (not_empty==buffers.length)
+ return null;
+ if (not_empty>0)
+ buffers=Arrays.copyOfRange(buffers,not_empty,buffers.length);
+
+ if (!progress)
+ break;
+
+ // try to flush the remainder
+ int r=buffers[0].remaining();
+ if (_endPoint.flush(buffers))
+ return null;
+ progress=r!=buffers[0].remaining();
+ }
+
+ return buffers;
+ }
+
+ /* ------------------------------------------------------------ */
/** Notify the flusher of a failure
* @param cause The cause of the failure
* @return true if the flusher passed the failure to a {@link Callback} instance
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
index 7ac32a0ef3..4610b51525 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
@@ -346,12 +346,12 @@ public class SslConnection extends AbstractConnection
@Override
protected void onIncompleteFlush()
- {
+ {
// This means that the decrypted endpoint write method was called and not
// all data could be wrapped. So either we need to write some encrypted data,
// OR if we are handshaking we need to read some encrypted data OR
// if neither then we should just try the flush again.
- boolean flush = false;
+ boolean try_again = false;
synchronized (DecryptedEndPoint.this)
{
if (DEBUG)
@@ -372,10 +372,17 @@ public class SslConnection extends AbstractConnection
}
else
{
- flush = true;
+ // We can get here because the WriteFlusher might not see progress
+ // when it has just flushed the encrypted data, but not consumed anymore
+ // of the application buffers. This is mostly avoided by another iteration
+ // within DecryptedEndPoint flush(), but I cannot convince myself that
+ // this is never ever the case.
+ try_again = true;
}
}
- if (flush)
+
+
+ if (try_again)
{
// If the output is closed,
if (isOutputShutdown())
@@ -387,7 +394,9 @@ public class SslConnection extends AbstractConnection
else
{
// try to flush what is pending
- getWriteFlusher().completeWrite();
+ // because this is a special case (see above) we could probably
+ // avoid the dispatch, but best to be sure
+ getExecutor().execute(_runCompletWrite);
}
}
}
@@ -715,18 +724,12 @@ public class SslConnection extends AbstractConnection
BufferUtil.flipToFlush(_encryptedOutput, pos);
if (wrapResult.bytesConsumed()>0)
consumed+=wrapResult.bytesConsumed();
-
+ Status wrapResultStatus = wrapResult.getStatus();
+
boolean allConsumed=true;
- // clear empty buffers to prevent position creeping up the buffer
for (ByteBuffer b : appOuts)
- {
- if (BufferUtil.isEmpty(b))
- BufferUtil.clear(b);
- else
- allConsumed=false;
- }
-
- Status wrapResultStatus = wrapResult.getStatus();
+ if (BufferUtil.hasContent(b))
+ allConsumed=false;
// and deal with the results returned from the sslEngineWrap
switch (wrapResultStatus)
@@ -779,13 +782,20 @@ public class SslConnection extends AbstractConnection
// if we have net bytes, let's try to flush them
if (BufferUtil.hasContent(_encryptedOutput))
- getEndPoint().flush(_encryptedOutput);
+ if (!getEndPoint().flush(_encryptedOutput));
+ getEndPoint().flush(_encryptedOutput); // one retry
// But we also might have more to do for the handshaking state.
switch (handshakeStatus)
{
case NOT_HANDSHAKING:
- // Return with the number of bytes consumed (which may be 0)
+ // If we have not consumed all and had just finished handshaking, then we may
+ // have just flushed the last handshake in the encrypted buffers, so we should
+ // try again.
+ if (!allConsumed && wrapResult.getHandshakeStatus()==HandshakeStatus.FINISHED && BufferUtil.isEmpty(_encryptedOutput))
+ continue;
+
+ // Return true if we consumed all the bytes and encrypted are all flushed
return allConsumed && BufferUtil.isEmpty(_encryptedOutput);
case NEED_TASK:
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
index 20e08d1eb2..572ff79a6e 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java
@@ -30,10 +30,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
+import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -41,7 +43,9 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
@@ -509,7 +513,7 @@ public class WriteFlusherTest
BufferUtil.flipToFill(byteBuffer); // pretend everything has been written
writeCalledLatch.countDown();
failedCalledLatch.await(5, TimeUnit.SECONDS);
- return null;
+ return Boolean.TRUE;
}
});
}
@@ -522,7 +526,7 @@ public class WriteFlusherTest
final CountDownLatch writeCalledLatch = new CountDownLatch(1);
final CountDownLatch completeWrite = new CountDownLatch(1);
- final WriteFlusher writeFlusher = new WriteFlusher(new EndPointMock(writeCalledLatch, failedCalledLatch))
+ final WriteFlusher writeFlusher = new WriteFlusher(new EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(writeCalledLatch, failedCalledLatch))
{
@Override
protected void onIncompleteFlushed()
@@ -555,12 +559,13 @@ public class WriteFlusherTest
assertThat("callback complete has not been called", callback.isCompleted(), is(false));
}
- private static class EndPointMock extends ByteArrayEndPoint
+ private static class EndPointConcurrentAccessToIncompleteWriteAndOnFailMock extends ByteArrayEndPoint
{
private final CountDownLatch writeCalledLatch;
private final CountDownLatch failedCalledLatch;
+ private final AtomicBoolean stalled=new AtomicBoolean(false);
- public EndPointMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
+ public EndPointConcurrentAccessToIncompleteWriteAndOnFailMock(CountDownLatch writeCalledLatch, CountDownLatch failedCalledLatch)
{
this.writeCalledLatch = writeCalledLatch;
this.failedCalledLatch = failedCalledLatch;
@@ -574,6 +579,13 @@ public class WriteFlusherTest
int oldPos = byteBuffer.position();
if (byteBuffer.remaining() == 2)
{
+ // make sure we stall at least once
+ if (!stalled.get())
+ {
+ stalled.set(true);
+ return false;
+ }
+
// make sure failed is called before we go on
try
{
@@ -601,6 +613,87 @@ public class WriteFlusherTest
}
}
+ @Test
+ public void testIterationOnNonBlockedStall() throws Exception
+ {
+ final Exchanger<Integer> exchange = new Exchanger<>();
+ final AtomicInteger window = new AtomicInteger(10);
+ EndPointIterationOnNonBlockedStallMock endp=new EndPointIterationOnNonBlockedStallMock(window);
+ final WriteFlusher writeFlusher = new WriteFlusher(endp)
+ {
+ @Override
+ protected void onIncompleteFlushed()
+ {
+ executor.submit(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ while(window.get()==0)
+ window.addAndGet(exchange.exchange(0));
+ completeWrite();
+ }
+ catch(Throwable th)
+ {
+ th.printStackTrace();
+ }
+ }
+ });
+
+ }
+ };
+
+ BlockingCallback callback = new BlockingCallback();
+ writeFlusher.write(callback,BufferUtil.toBuffer("How "),BufferUtil.toBuffer("now "),BufferUtil.toBuffer("brown "),BufferUtil.toBuffer("cow."));
+ exchange.exchange(0);
+
+ Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("How now br"));
+
+ exchange.exchange(1);
+ exchange.exchange(0);
+
+ Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("o"));
+
+ exchange.exchange(8);
+ callback.block();
+
+ Assert.assertThat(endp.takeOutputString(StandardCharsets.US_ASCII),Matchers.equalTo("wn cow."));
+
+ }
+
+ private static class EndPointIterationOnNonBlockedStallMock extends ByteArrayEndPoint
+ {
+ final AtomicInteger _window;
+
+ public EndPointIterationOnNonBlockedStallMock(AtomicInteger window)
+ {
+ _window=window;
+ }
+
+ @Override
+ public boolean flush(ByteBuffer... buffers) throws IOException
+ {
+ ByteBuffer byteBuffer = buffers[0];
+
+ if (_window.get()>0 && byteBuffer.hasRemaining())
+ {
+ // consume 1 byte
+ byte one = byteBuffer.get(byteBuffer.position());
+ if (super.flush(ByteBuffer.wrap(new byte[]{one})))
+ {
+ _window.decrementAndGet();
+ byteBuffer.position(byteBuffer.position()+1);
+ }
+ }
+ for (ByteBuffer b: buffers)
+ if (BufferUtil.hasContent(b))
+ return false;
+ return true;
+ }
+ }
+
+
private static class FailedCaller implements Callable<FutureCallback>
{
private final WriteFlusher writeFlusher;

Back to the top