diff options
author | Greg Wilkins | 2012-09-03 05:24:36 +0000 |
---|---|---|
committer | Greg Wilkins | 2012-09-03 05:24:36 +0000 |
commit | c3776764f4ab0820c7fd252d49181664000aa277 (patch) | |
tree | 01af27b02dd22988d5a3f51695ce9bcf205e85a8 | |
parent | bb3dcc1c5b075cdedcdda6cf37e781e0c743d49a (diff) | |
download | org.eclipse.jetty.project-c3776764f4ab0820c7fd252d49181664000aa277.tar.gz org.eclipse.jetty.project-c3776764f4ab0820c7fd252d49181664000aa277.tar.xz org.eclipse.jetty.project-c3776764f4ab0820c7fd252d49181664000aa277.zip |
jetty-9 ConcurrentScheduler refinements
15 files changed, 152 insertions, 90 deletions
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java index 5a329cd2a8..a1028d0b9e 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,7 +51,7 @@ public class ByteArrayEndPointTest @Before public void before() throws Exception { - _scheduler = new SimpleScheduler(); + _scheduler = new TimerScheduler(); _scheduler.start(); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java index f8f30e90cd..3667fe75ed 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -54,7 +54,7 @@ public class SelectChannelEndPointInterestsTest threadPool = new QueuedThreadPool(); threadPool.start(); - scheduler = new SimpleScheduler(); + scheduler = new TimerScheduler(); scheduler.start(); connector = ServerSocketChannel.open(); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 01f22ca4d0..3d092ba4de 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -44,7 +44,7 @@ import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -56,7 +56,7 @@ public class SelectChannelEndPointTest protected volatile EndPoint _lastEndPoint; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); - protected Scheduler _scheduler = new SimpleScheduler(); + protected Scheduler _scheduler = new TimerScheduler(); protected SelectorManager _manager = new SelectorManager() { @Override diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index d271061d50..2613994358 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -43,7 +43,7 @@ import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -60,7 +60,7 @@ public class SslConnectionTest private volatile FutureCallback<Void> _writeCallback; protected ServerSocketChannel _connector; protected QueuedThreadPool _threadPool = new QueuedThreadPool(); - protected Scheduler _scheduler = new SimpleScheduler(); + protected Scheduler _scheduler = new TimerScheduler(); protected SelectorManager _manager = new SelectorManager() { @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index bfc8bdfb58..1cdd2c0bd7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; /** * <p>Partial implementation of {@link Connector}</p> @@ -79,7 +79,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co { _server=server; _executor=executor!=null?executor:_server.getThreadPool(); - _scheduler=scheduler!=null?scheduler:new SimpleScheduler(); + _scheduler=scheduler!=null?scheduler:new TimerScheduler(); _byteBufferPool = pool!=null?pool:new MappedByteBufferPool(); _sslContextFactory = sslContextFactory; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 9069c5cff0..25c48069f3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -233,7 +233,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable { try { - _request.setHandled(false); // TODO: is this right here ? + _request.setHandled(false); _response.getHttpOutput().reopen(); if (_state.isInitial()) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java index 54cedc7df2..fc7907e20f 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java @@ -48,7 +48,7 @@ import org.eclipse.jetty.server.session.HashSessionIdManager; import org.eclipse.jetty.server.session.HashSessionManager; import org.eclipse.jetty.server.session.HashedSession; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; @@ -65,7 +65,7 @@ public class ResponseTest public void init() throws Exception { _server = new Server(); - _scheduler = new SimpleScheduler(); + _scheduler = new TimerScheduler(); LocalConnector connector = new LocalConnector(_server, null, _scheduler, null, null, 1); _server.addConnector(connector); _server.setHandler(new DumpHandler()); diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java index 60d60ac917..42965962ef 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java @@ -38,7 +38,7 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -55,7 +55,7 @@ public class AsyncTimeoutTest ByteBufferPool bufferPool = new MappedByteBufferPool(); Executor threadPool = Executors.newCachedThreadPool(); - Scheduler scheduler = new SimpleScheduler(); + Scheduler scheduler = new TimerScheduler(); scheduler.start(); // TODO need to use jetty lifecycles better here Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None()) @@ -102,7 +102,7 @@ public class AsyncTimeoutTest ByteBufferPool bufferPool = new MappedByteBufferPool(); Executor threadPool = Executors.newCachedThreadPool(); - Scheduler scheduler = new SimpleScheduler(); + Scheduler scheduler = new TimerScheduler(); scheduler.start(); Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None()) diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java index 45ee882874..e795facd08 100644 --- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java +++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java @@ -49,7 +49,7 @@ import org.eclipse.jetty.spdy.frames.SynStreamFrame; import org.eclipse.jetty.spdy.generator.Generator; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -87,7 +87,7 @@ public class StandardSessionTest { bufferPool = new MappedByteBufferPool(); threadPool = Executors.newCachedThreadPool(); - scheduler = new SimpleScheduler(); + scheduler = new TimerScheduler(); scheduler.start(); generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor()); session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator,new FlowControlStrategy.None()); diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index 11e3b13b0c..635d370f5d 100644 --- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -51,7 +51,7 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; public class SPDYClient { @@ -167,7 +167,7 @@ public class SPDYClient { private final Queue<Session> sessions = new ConcurrentLinkedQueue<>(); private final ByteBufferPool bufferPool = new MappedByteBufferPool(); - private final Scheduler scheduler = new SimpleScheduler(); + private final Scheduler scheduler = new TimerScheduler(); private final Executor executor; private final SslContextFactory sslContextFactory; private final SelectorManager selector; diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java index c09fc1551e..29fd950ce9 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java @@ -59,15 +59,26 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, }); private final Queue _delayQ; + public ConcurrentScheduler() + { + this(null,8192); + } + public ConcurrentScheduler(Executor executor) { this(executor,8192); } + public ConcurrentScheduler(int delayQms) + { + this(null,delayQms); + } + public ConcurrentScheduler(Executor executor,int delayQms) { _executor = executor; - addBean(_executor,false); + if (_executor!=null) + addBean(_executor,false); _delayQ=new Queue(delayQms); } @@ -75,7 +86,10 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, protected void doStart() throws Exception { super.doStart(); - _executor.execute(this); + if (_executor==null) + new Thread(this).start(); + else + _executor.execute(this); } @@ -110,10 +124,9 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, long interval=event._executeAt-now; // Should we execute this event? - if (interval<=0 && event._state.compareAndSet(State.NEW,State.DONE)) - { - _executor.execute(event._task); - } + if (interval<=0 && event.compareAndSet(State.NEW,State.DONE)) + event.execute(); + // Should we delay this event else if (_delayQ._delay>0 && interval>_delayQ._delay) { @@ -121,7 +134,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, _delayQ.add(event,dequeue_at); } // else we schedule the event - else if (event._state.compareAndSet(State.NEW,State.SCHEDULED)) + else if (event.compareAndSet(State.NEW,State.SCHEDULED)) { _timerQ.add(event); if (interval<=MAX_SLEEP) @@ -160,7 +173,15 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, { Event event=next.dequeue(); if (event!=null) - _timerQ.add(event); + { + if (event._executeAt<=now) + { + if (event.compareAndSet(State.SCHEDULED,State.DONE)) + event.execute(); + } + else + _timerQ.add(event); + } } else { @@ -185,10 +206,9 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, else if (event._executeAt<=now) { i.remove(); - if (event._state.compareAndSet(State.SCHEDULED,State.DONE)) - { - _executor.execute(event._task); - } + event.execute(); + if (event.compareAndSet(State.SCHEDULED,State.DONE)) + event.execute(); } // else how long do we need to wait? else @@ -218,27 +238,33 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, } } + + @Override + public String toString() + { + return String.format("%s@%x{%d,%s}",this.getClass().getSimpleName(),hashCode(),_delayQ._delay,_executor); + } enum State { NEW, DELAYED, SCHEDULED, CANCELLED, DONE }; - private class Event implements Scheduler.Task + private class Event extends AtomicReference<State> implements Scheduler.Task { + /* extends AtomicReference as a minor optimisation rather than holding a _state field */ final Runnable _task; final long _executeAt; - final AtomicReference<State> _state=new AtomicReference<>(State.NEW); volatile QNode _node; public Event(Runnable task, long executeAt) { - super(); + super(State.NEW); _task = task; _executeAt = executeAt; } public boolean isScheduled() { - return _state.get()==State.SCHEDULED; + return get()==State.SCHEDULED; } @Override @@ -246,7 +272,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, { while(true) { - switch(_state.get()) + switch(get()) { case NEW: throw new IllegalStateException(); @@ -255,14 +281,14 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, case CANCELLED: return false; case DELAYED: - if (_state.compareAndSet(State.DELAYED,State.CANCELLED)) + if (compareAndSet(State.DELAYED,State.CANCELLED)) { _node.cancel(); return true; } break; case SCHEDULED: - if (_state.compareAndSet(State.SCHEDULED,State.CANCELLED)) + if (compareAndSet(State.SCHEDULED,State.CANCELLED)) { _timerQ.remove(this); return true; @@ -272,10 +298,18 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, } } + public void execute() + { + if (_executor==null) + _task.run(); + else + _executor.execute(_task); + } + @Override public String toString() { - return String.format("Event@%x{%s,%d,%s}",hashCode(),_state,_executeAt,_task); + return String.format("Event@%x{%s,%d,%s}",hashCode(),get(),_executeAt,_task); } } @@ -304,19 +338,19 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, Queue(int delay) { _delay=delay; - _head._next.set(_tail); + _head.set(_tail); _tail._prev=_head; } void clear() { - _head._next.set(_tail); + _head.set(_tail); _tail._prev=_head; } void add(Event event, long dequeue_at) { - if (event._state.compareAndSet(State.NEW,State.DELAYED)) + if (event.compareAndSet(State.NEW,State.DELAYED)) { while (true) { @@ -325,7 +359,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, if (prev!=null) { QNode node = new QNode(event,dequeue_at,prev,_tail); - if (prev._next.compareAndSet(_tail,node)) + if (prev.compareAndSet(_tail,node)) { _tail._prev=node; event._node=node; @@ -368,19 +402,19 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, * Roughly based on public domain lock free queue algorithm from: * http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm */ - private static class QNode + private static class QNode extends AtomicReference<QNode> { + /* extends AtomicReference as a minor optimisation rather than holding a _next field */ final Event _event; final long _dequeueAt; - final AtomicReference<QNode> _next=new AtomicReference<>(); volatile QNode _prev; QNode(Event event, long dequeue_at, QNode prev, QNode next) { + super(next); _event=event; _dequeueAt=dequeue_at; _prev=prev; - _next.set(next); } /** @@ -400,7 +434,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, return event.scanForPrevOf(this); // If the previous next is this (still linked normally) - QNode prev_next = prev._next.get(); + QNode prev_next = prev.get(); if (prev_next==this) return prev; @@ -448,7 +482,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, */ QNode next() { - QNode next = _next.get(); + QNode next = get(); while (true) { if (next == null) @@ -459,21 +493,21 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, next._prev=this; return next; } - QNode next_next = next._next.get(); - _next.compareAndSet(next, next_next); + QNode next_next = next.get(); + compareAndSet(next, next_next); next = next_next; } } public boolean cancel() { - if (_event._state.compareAndSet(State.DELAYED,State.CANCELLED)) + if (_event.compareAndSet(State.DELAYED,State.CANCELLED)) { QNode prev = _prev; - QNode next = _next.get(); + QNode next = get(); if (prev != null && next != null && next.isDelayed()) { - if (prev._next.compareAndSet(this, next)) + if (prev.compareAndSet(this, next)) next._prev=prev; } return true; @@ -483,13 +517,13 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, public Event dequeue() { - if (_event._state.compareAndSet(State.DELAYED,State.SCHEDULED)) + if (_event.compareAndSet(State.DELAYED,State.SCHEDULED)) { QNode prev = _prev; - QNode next = _next.get(); + QNode next = get(); if (prev != null && next != null && next.isDelayed()) { - if (prev._next.compareAndSet(this, next)) + if (prev.compareAndSet(this, next)) next._prev=prev; } return _event; @@ -499,12 +533,12 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, public boolean isDelayed() { - return _event!=null && _event._state.get()==State.DELAYED; + return _event!=null && _event.get()==State.DELAYED; } public boolean isTail() { - return _event==null && _next.get()==null; + return _event==null && get()==null; } @@ -512,7 +546,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable, public String toString() { QNode p=_prev; - QNode n=_next.get(); + QNode n=get(); return String.format("QNode@%x{%x<-%s->%x}",hashCode(),p==null?0:p.hashCode(),_event,n==null?0:n.hashCode()); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SimpleScheduler.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java index 76bc09b604..9ced2ed04d 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SimpleScheduler.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.util.component.AbstractLifeCycle; -public class SimpleScheduler extends AbstractLifeCycle implements Scheduler +public class TimerScheduler extends AbstractLifeCycle implements Scheduler { /* this class uses the Timer class rather than an ScheduledExecutionService because * it uses the same algorithm internally and the signature is cheaper to use as there are no @@ -37,12 +37,12 @@ public class SimpleScheduler extends AbstractLifeCycle implements Scheduler Timer _timer; final String _name; - public SimpleScheduler() + public TimerScheduler() { this(null); } - public SimpleScheduler(String name) + public TimerScheduler(String name) { _name=name; } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java index 4dd42bc82a..5a0559b763 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java @@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.eclipse.jetty.util.BenchmarkHelper; import org.eclipse.jetty.util.statistic.SampleStatistic; import org.hamcrest.Matchers; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -41,15 +43,18 @@ import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class SchedulerTest { - public static Executor executor = Executors.newFixedThreadPool(256); + private static final BenchmarkHelper benchmark = new BenchmarkHelper(); + private static final Executor executor = Executors.newFixedThreadPool(256); @Parameterized.Parameters public static Collection<Object[]> data() { Object[][] data = new Object[][]{ - {new SimpleScheduler()}, - {new ConcurrentScheduler(executor,0)}, - {new ConcurrentScheduler(executor,2000)} + {new TimerScheduler()}, + {new ScheduledExecutionServiceScheduler()}, + {new ConcurrentScheduler(0)}, + {new ConcurrentScheduler(1500)}, + {new ConcurrentScheduler(executor,1500)} }; return Arrays.asList(data); } @@ -132,14 +137,30 @@ public class SchedulerTest Thread.sleep(1500); Assert.assertEquals(0,executed.get()); } - - + @Test @Slow public void testManySchedulesAndCancels() throws Exception { - final Random random = new Random(); - Thread[] test = new Thread[500]; + schedule(500,5000,2000,50); + } + + @Test + @Slow + @Ignore + public void testBenchmark() throws Exception + { + schedule(2000,10000,2000,50); + benchmark.startStatistics(); + System.err.println(_scheduler); + schedule(2000,30000,2000,50); + benchmark.stopStatistics(); + } + + private void schedule(int threads,final int duration, final int delay, final int interval) throws Exception + { + final Random random = new Random(1); + Thread[] test = new Thread[threads]; final AtomicInteger schedules = new AtomicInteger(); final SampleStatistic executions = new SampleStatistic(); @@ -156,18 +177,18 @@ public class SchedulerTest { long now = System.currentTimeMillis(); long start=now; - long end=start+5000; + long end=start+duration; - while (now<end) + while (now+interval<end) { - final int delay=random.nextInt((int)(end-now)); - final long expected = now+delay; - - int cancel=random.nextInt(200); + final long expected=now+delay; + int cancel=random.nextInt(interval); + boolean expected_to_execute=false; if (cancel==0) - cancel=(int)(end-now)+1000; - else - cancel=cancel/4; + { + expected_to_execute=true; + cancel=delay+1000; + } schedules.incrementAndGet(); Scheduler.Task task=_scheduler.schedule(new Runnable() @@ -184,10 +205,17 @@ public class SchedulerTest now = System.currentTimeMillis(); if (task.cancel()) { - long lateness=now-expected; - cancellations.set(lateness); + if (expected_to_execute) + cancellations.set(now-expected); + else + cancellations.set(0); + } + else if (!expected_to_execute) + { + cancellations.set(9999); // flags failure } + Thread.yield(); } } catch (InterruptedException e) @@ -205,9 +233,9 @@ public class SchedulerTest for (Thread thread : test) thread.join(); - //System.err.println(schedules); - //System.err.println(executions); - //System.err.println(cancellations); + // System.err.println(schedules); + // System.err.println(executions); + // System.err.println(cancellations); // there were some executions and cancellations Assert.assertThat(executions.getCount(),Matchers.greaterThan(0L)); diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java index 00e93929f9..45bcfe048a 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java @@ -35,7 +35,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.eclipse.jetty.websocket.api.Extension; import org.eclipse.jetty.websocket.api.ExtensionRegistry; import org.eclipse.jetty.websocket.api.WebSocketPolicy; @@ -69,7 +69,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle public WebSocketClientFactory(Executor threadPool) { - this(threadPool,new SimpleScheduler()); + this(threadPool,new TimerScheduler()); } public WebSocketClientFactory(Executor threadPool, Scheduler scheduler) @@ -110,7 +110,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle public WebSocketClientFactory(SslContextFactory sslContextFactory) { - this(new QueuedThreadPool(),new SimpleScheduler(),sslContextFactory); + this(new QueuedThreadPool(),new TimerScheduler(),sslContextFactory); } @Override diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 4e28360bef..b6cca0c292 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -43,7 +43,7 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.SimpleScheduler; +import org.eclipse.jetty.util.thread.TimerScheduler; import org.eclipse.jetty.websocket.annotations.WebSocket; import org.eclipse.jetty.websocket.api.Extension; import org.eclipse.jetty.websocket.api.ExtensionRegistry; @@ -78,7 +78,7 @@ public class WebSocketServerFactory extends AggregateLifeCycle implements WebSoc /** * Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler. */ - private final Scheduler scheduler = new SimpleScheduler(); + private final Scheduler scheduler = new TimerScheduler(); private final String supportedVersions; private final WebSocketPolicy basePolicy; private final EventMethodsCache methodsCache; |