diff options
author | Greg Wilkins | 2014-09-03 00:46:18 +0000 |
---|---|---|
committer | Greg Wilkins | 2014-09-03 00:46:18 +0000 |
commit | 0a2aeb54a1c2ad221cee2edb9b6f0f6960a10e6e (patch) | |
tree | e2ee661c0f22ba1eaab3d40bad16ebaed58f1e6f | |
parent | 7be9f0d7a46423015add430daea4069e9b10875b (diff) | |
download | org.eclipse.jetty.project-0a2aeb54a1c2ad221cee2edb9b6f0f6960a10e6e.tar.gz org.eclipse.jetty.project-0a2aeb54a1c2ad221cee2edb9b6f0f6960a10e6e.tar.xz org.eclipse.jetty.project-0a2aeb54a1c2ad221cee2edb9b6f0f6960a10e6e.zip |
435322 Added a idleTimeout to the SharedBlockerCallback
-rw-r--r-- | jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java | 52 | ||||
-rw-r--r-- | jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java | 88 |
2 files changed, 116 insertions, 24 deletions
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index 602e48463c..168a19d8b8 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -48,7 +48,7 @@ import org.eclipse.jetty.util.thread.NonBlockingThread; */ public class SharedBlockingCallback { - private static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); + static final Logger LOG = Log.getLogger(SharedBlockingCallback.class); final ReentrantLock _lock = new ReentrantLock(); final Condition _idle = _lock.newCondition(); @@ -128,6 +128,12 @@ public class SharedBlockingCallback return _blocker; } + protected void notComplete(Blocker blocker) + { + LOG.warn("Blocker not complete {}",blocker); + if (LOG.isDebugEnabled()) + LOG.debug(new Throwable()); + } /* ------------------------------------------------------------ */ /** A Closeable Callback. @@ -152,8 +158,8 @@ public class SharedBlockingCallback _state = SUCCEEDED; _complete.signalAll(); } - else if (_state == IDLE) - throw new IllegalStateException("IDLE"); + else + throw new IllegalStateException(_state); } finally { @@ -169,11 +175,17 @@ public class SharedBlockingCallback { if (_state == null) { - _state = cause==null?FAILED:cause; + if (cause==null) + _state=FAILED; + else if (cause instanceof BlockerTimeoutException) + // Not this blockers timeout + _state=new IOException(cause); + else + _state=cause; _complete.signalAll(); } - else if (_state == IDLE) - throw new IllegalStateException("IDLE",cause); + else + throw new IllegalStateException(_state); } finally { @@ -202,15 +214,9 @@ public class SharedBlockingCallback if (idle>0) { if (!_complete.await(idle,TimeUnit.MILLISECONDS)) - { // The callback has not arrived in sufficient time. - // We will synthesize a TimeoutException and then - // create a new Blocker, so that any late arriving callback - // does not cause a problem with the next cycle. - _state=new TimeoutException("No Blocker CB"); - LOG.warn(_state); - _blocker=new Blocker(); - } + // We will synthesize a TimeoutException + _state=new BlockerTimeoutException(); } else _complete.await(); @@ -260,17 +266,19 @@ public class SharedBlockingCallback if (_state == IDLE) throw new IllegalStateException("IDLE"); if (_state == null) - { - LOG.warn("Blocker not complete {}",this); - if (LOG.isDebugEnabled()) - LOG.debug(new Throwable()); - } + notComplete(this); } finally { try { - _state = IDLE; + // If the blocker timed itself out, remember the state + if (_state instanceof BlockerTimeoutException) + // and create a new Blocker + _blocker=new Blocker(); + else + // else reuse Blocker + _state = IDLE; _idle.signalAll(); _complete.signalAll(); } @@ -295,4 +303,8 @@ public class SharedBlockingCallback } } } + + private class BlockerTimeoutException extends TimeoutException + { + } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java index 4ee93a6b19..c06e5070d1 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/SharedBlockingCallbackTest.java @@ -18,9 +18,17 @@ package org.eclipse.jetty.util; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.hamcrest.Matchers; @@ -29,7 +37,23 @@ import org.junit.Test; public class SharedBlockingCallbackTest { - final SharedBlockingCallback sbcb= new SharedBlockingCallback(); + final AtomicInteger notComplete = new AtomicInteger(); + final SharedBlockingCallback sbcb= new SharedBlockingCallback() + { + @Override + protected long getIdleTimeout() + { + return 150; + } + + @Override + protected void notComplete(Blocker blocker) + { + super.notComplete(blocker); + notComplete.incrementAndGet(); + } + + }; public SharedBlockingCallbackTest() { @@ -46,7 +70,8 @@ public class SharedBlockingCallbackTest start=System.currentTimeMillis(); blocker.block(); } - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(500L)); + Assert.assertEquals(0,notComplete.get()); } @Test @@ -74,6 +99,7 @@ public class SharedBlockingCallbackTest } Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + Assert.assertEquals(0,notComplete.get()); } @Test @@ -95,7 +121,8 @@ public class SharedBlockingCallbackTest start=System.currentTimeMillis(); Assert.assertEquals(ex,ee.getCause()); } - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(100L)); + Assert.assertEquals(0,notComplete.get()); } @Test @@ -133,6 +160,7 @@ public class SharedBlockingCallbackTest } Assert.assertThat(System.currentTimeMillis()-start,Matchers.greaterThan(10L)); Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(1000L)); + Assert.assertEquals(0,notComplete.get()); } @@ -174,6 +202,58 @@ public class SharedBlockingCallbackTest blocker.succeeded(); blocker.block(); }; - Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); + Assert.assertThat(System.currentTimeMillis()-start,Matchers.lessThan(600L)); + Assert.assertEquals(0,notComplete.get()); + } + + @Test + public void testBlockerClose() throws Exception + { + try (Blocker blocker=sbcb.acquire()) + { + SharedBlockingCallback.LOG.info("Blocker not complete "+blocker+" warning is expected..."); + } + + Assert.assertEquals(1,notComplete.get()); + } + + @Test + public void testBlockerTimeout() throws Exception + { + Blocker b0=null; + try + { + try (Blocker blocker=sbcb.acquire()) + { + b0=blocker; + Thread.sleep(400); + blocker.block(); + } + fail(); + } + catch(IOException e) + { + Throwable cause = e.getCause(); + assertThat(cause,instanceOf(TimeoutException.class)); + } + + Assert.assertEquals(0,notComplete.get()); + + + try (Blocker blocker=sbcb.acquire()) + { + assertThat(blocker,not(equalTo(b0))); + try + { + b0.succeeded(); + fail(); + } + catch(Exception e) + { + assertThat(e,instanceOf(IllegalStateException.class)); + assertThat(e.getCause(),instanceOf(TimeoutException.class)); + } + blocker.succeeded(); + } } } |