Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2012-09-03 05:24:36 +0000
committerGreg Wilkins2012-09-03 05:24:36 +0000
commitc3776764f4ab0820c7fd252d49181664000aa277 (patch)
tree01af27b02dd22988d5a3f51695ce9bcf205e85a8
parentbb3dcc1c5b075cdedcdda6cf37e781e0c743d49a (diff)
downloadorg.eclipse.jetty.project-c3776764f4ab0820c7fd252d49181664000aa277.tar.gz
org.eclipse.jetty.project-c3776764f4ab0820c7fd252d49181664000aa277.tar.xz
org.eclipse.jetty.project-c3776764f4ab0820c7fd252d49181664000aa277.zip
jetty-9 ConcurrentScheduler refinements
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java4
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java4
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java4
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java4
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java4
-rw-r--r--jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java2
-rw-r--r--jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java4
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java6
-rw-r--r--jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java4
-rw-r--r--jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java4
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java114
-rw-r--r--jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java (renamed from jetty-util/src/main/java/org/eclipse/jetty/util/thread/SimpleScheduler.java)6
-rw-r--r--jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java72
-rw-r--r--jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java6
-rw-r--r--jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java4
15 files changed, 152 insertions, 90 deletions
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java
index 5a329cd2a8..a1028d0b9e 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ByteArrayEndPointTest.java
@@ -37,7 +37,7 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +51,7 @@ public class ByteArrayEndPointTest
@Before
public void before() throws Exception
{
- _scheduler = new SimpleScheduler();
+ _scheduler = new TimerScheduler();
_scheduler.start();
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java
index f8f30e90cd..3667fe75ed 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java
@@ -37,7 +37,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -54,7 +54,7 @@ public class SelectChannelEndPointInterestsTest
threadPool = new QueuedThreadPool();
threadPool.start();
- scheduler = new SimpleScheduler();
+ scheduler = new TimerScheduler();
scheduler.start();
connector = ServerSocketChannel.open();
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
index 01f22ca4d0..3d092ba4de 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java
@@ -44,7 +44,7 @@ import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -56,7 +56,7 @@ public class SelectChannelEndPointTest
protected volatile EndPoint _lastEndPoint;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
- protected Scheduler _scheduler = new SimpleScheduler();
+ protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager()
{
@Override
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java
index d271061d50..2613994358 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java
@@ -43,7 +43,7 @@ import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -60,7 +60,7 @@ public class SslConnectionTest
private volatile FutureCallback<Void> _writeCallback;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
- protected Scheduler _scheduler = new SimpleScheduler();
+ protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager()
{
@Override
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
index bfc8bdfb58..1cdd2c0bd7 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java
@@ -39,7 +39,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
/**
* <p>Partial implementation of {@link Connector}</p>
@@ -79,7 +79,7 @@ public abstract class AbstractConnector extends AggregateLifeCycle implements Co
{
_server=server;
_executor=executor!=null?executor:_server.getThreadPool();
- _scheduler=scheduler!=null?scheduler:new SimpleScheduler();
+ _scheduler=scheduler!=null?scheduler:new TimerScheduler();
_byteBufferPool = pool!=null?pool:new MappedByteBufferPool();
_sslContextFactory = sslContextFactory;
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
index 9069c5cff0..25c48069f3 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java
@@ -233,7 +233,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{
try
{
- _request.setHandled(false); // TODO: is this right here ?
+ _request.setHandled(false);
_response.getHttpOutput().reopen();
if (_state.isInitial())
diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java
index 54cedc7df2..fc7907e20f 100644
--- a/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java
+++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ResponseTest.java
@@ -48,7 +48,7 @@ import org.eclipse.jetty.server.session.HashSessionIdManager;
import org.eclipse.jetty.server.session.HashSessionManager;
import org.eclipse.jetty.server.session.HashedSession;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
@@ -65,7 +65,7 @@ public class ResponseTest
public void init() throws Exception
{
_server = new Server();
- _scheduler = new SimpleScheduler();
+ _scheduler = new TimerScheduler();
LocalConnector connector = new LocalConnector(_server, null, _scheduler, null, null, 1);
_server.addConnector(connector);
_server.setHandler(new DumpHandler());
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
index 60d60ac917..42965962ef 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java
@@ -38,7 +38,7 @@ import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -55,7 +55,7 @@ public class AsyncTimeoutTest
ByteBufferPool bufferPool = new MappedByteBufferPool();
Executor threadPool = Executors.newCachedThreadPool();
- Scheduler scheduler = new SimpleScheduler();
+ Scheduler scheduler = new TimerScheduler();
scheduler.start(); // TODO need to use jetty lifecycles better here
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())
@@ -102,7 +102,7 @@ public class AsyncTimeoutTest
ByteBufferPool bufferPool = new MappedByteBufferPool();
Executor threadPool = Executors.newCachedThreadPool();
- Scheduler scheduler = new SimpleScheduler();
+ Scheduler scheduler = new TimerScheduler();
scheduler.start();
Generator generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, bufferPool, threadPool, scheduler, new TestController(), null, 1, null, generator, new FlowControlStrategy.None())
diff --git a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
index 45ee882874..e795facd08 100644
--- a/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
+++ b/jetty-spdy/spdy-core/src/test/java/org/eclipse/jetty/spdy/StandardSessionTest.java
@@ -49,7 +49,7 @@ import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -87,7 +87,7 @@ public class StandardSessionTest
{
bufferPool = new MappedByteBufferPool();
threadPool = Executors.newCachedThreadPool();
- scheduler = new SimpleScheduler();
+ scheduler = new TimerScheduler();
scheduler.start();
generator = new Generator(bufferPool, new StandardCompressionFactory.StandardCompressor());
session = new StandardSession(SPDY.V2,bufferPool,threadPool,scheduler,controller,null,1,null,generator,new FlowControlStrategy.None());
diff --git a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java
index 11e3b13b0c..635d370f5d 100644
--- a/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java
+++ b/jetty-spdy/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java
@@ -51,7 +51,7 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
public class SPDYClient
{
@@ -167,7 +167,7 @@ public class SPDYClient
{
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new MappedByteBufferPool();
- private final Scheduler scheduler = new SimpleScheduler();
+ private final Scheduler scheduler = new TimerScheduler();
private final Executor executor;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java
index c09fc1551e..29fd950ce9 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ConcurrentScheduler.java
@@ -59,15 +59,26 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
});
private final Queue _delayQ;
+ public ConcurrentScheduler()
+ {
+ this(null,8192);
+ }
+
public ConcurrentScheduler(Executor executor)
{
this(executor,8192);
}
+ public ConcurrentScheduler(int delayQms)
+ {
+ this(null,delayQms);
+ }
+
public ConcurrentScheduler(Executor executor,int delayQms)
{
_executor = executor;
- addBean(_executor,false);
+ if (_executor!=null)
+ addBean(_executor,false);
_delayQ=new Queue(delayQms);
}
@@ -75,7 +86,10 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
protected void doStart() throws Exception
{
super.doStart();
- _executor.execute(this);
+ if (_executor==null)
+ new Thread(this).start();
+ else
+ _executor.execute(this);
}
@@ -110,10 +124,9 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
long interval=event._executeAt-now;
// Should we execute this event?
- if (interval<=0 && event._state.compareAndSet(State.NEW,State.DONE))
- {
- _executor.execute(event._task);
- }
+ if (interval<=0 && event.compareAndSet(State.NEW,State.DONE))
+ event.execute();
+
// Should we delay this event
else if (_delayQ._delay>0 && interval>_delayQ._delay)
{
@@ -121,7 +134,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
_delayQ.add(event,dequeue_at);
}
// else we schedule the event
- else if (event._state.compareAndSet(State.NEW,State.SCHEDULED))
+ else if (event.compareAndSet(State.NEW,State.SCHEDULED))
{
_timerQ.add(event);
if (interval<=MAX_SLEEP)
@@ -160,7 +173,15 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
{
Event event=next.dequeue();
if (event!=null)
- _timerQ.add(event);
+ {
+ if (event._executeAt<=now)
+ {
+ if (event.compareAndSet(State.SCHEDULED,State.DONE))
+ event.execute();
+ }
+ else
+ _timerQ.add(event);
+ }
}
else
{
@@ -185,10 +206,9 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
else if (event._executeAt<=now)
{
i.remove();
- if (event._state.compareAndSet(State.SCHEDULED,State.DONE))
- {
- _executor.execute(event._task);
- }
+ event.execute();
+ if (event.compareAndSet(State.SCHEDULED,State.DONE))
+ event.execute();
}
// else how long do we need to wait?
else
@@ -218,27 +238,33 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
}
}
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s@%x{%d,%s}",this.getClass().getSimpleName(),hashCode(),_delayQ._delay,_executor);
+ }
enum State { NEW, DELAYED, SCHEDULED, CANCELLED, DONE };
- private class Event implements Scheduler.Task
+ private class Event extends AtomicReference<State> implements Scheduler.Task
{
+ /* extends AtomicReference as a minor optimisation rather than holding a _state field */
final Runnable _task;
final long _executeAt;
- final AtomicReference<State> _state=new AtomicReference<>(State.NEW);
volatile QNode _node;
public Event(Runnable task, long executeAt)
{
- super();
+ super(State.NEW);
_task = task;
_executeAt = executeAt;
}
public boolean isScheduled()
{
- return _state.get()==State.SCHEDULED;
+ return get()==State.SCHEDULED;
}
@Override
@@ -246,7 +272,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
{
while(true)
{
- switch(_state.get())
+ switch(get())
{
case NEW:
throw new IllegalStateException();
@@ -255,14 +281,14 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
case CANCELLED:
return false;
case DELAYED:
- if (_state.compareAndSet(State.DELAYED,State.CANCELLED))
+ if (compareAndSet(State.DELAYED,State.CANCELLED))
{
_node.cancel();
return true;
}
break;
case SCHEDULED:
- if (_state.compareAndSet(State.SCHEDULED,State.CANCELLED))
+ if (compareAndSet(State.SCHEDULED,State.CANCELLED))
{
_timerQ.remove(this);
return true;
@@ -272,10 +298,18 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
}
}
+ public void execute()
+ {
+ if (_executor==null)
+ _task.run();
+ else
+ _executor.execute(_task);
+ }
+
@Override
public String toString()
{
- return String.format("Event@%x{%s,%d,%s}",hashCode(),_state,_executeAt,_task);
+ return String.format("Event@%x{%s,%d,%s}",hashCode(),get(),_executeAt,_task);
}
}
@@ -304,19 +338,19 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
Queue(int delay)
{
_delay=delay;
- _head._next.set(_tail);
+ _head.set(_tail);
_tail._prev=_head;
}
void clear()
{
- _head._next.set(_tail);
+ _head.set(_tail);
_tail._prev=_head;
}
void add(Event event, long dequeue_at)
{
- if (event._state.compareAndSet(State.NEW,State.DELAYED))
+ if (event.compareAndSet(State.NEW,State.DELAYED))
{
while (true)
{
@@ -325,7 +359,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
if (prev!=null)
{
QNode node = new QNode(event,dequeue_at,prev,_tail);
- if (prev._next.compareAndSet(_tail,node))
+ if (prev.compareAndSet(_tail,node))
{
_tail._prev=node;
event._node=node;
@@ -368,19 +402,19 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
* Roughly based on public domain lock free queue algorithm from:
* http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm
*/
- private static class QNode
+ private static class QNode extends AtomicReference<QNode>
{
+ /* extends AtomicReference as a minor optimisation rather than holding a _next field */
final Event _event;
final long _dequeueAt;
- final AtomicReference<QNode> _next=new AtomicReference<>();
volatile QNode _prev;
QNode(Event event, long dequeue_at, QNode prev, QNode next)
{
+ super(next);
_event=event;
_dequeueAt=dequeue_at;
_prev=prev;
- _next.set(next);
}
/**
@@ -400,7 +434,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
return event.scanForPrevOf(this);
// If the previous next is this (still linked normally)
- QNode prev_next = prev._next.get();
+ QNode prev_next = prev.get();
if (prev_next==this)
return prev;
@@ -448,7 +482,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
*/
QNode next()
{
- QNode next = _next.get();
+ QNode next = get();
while (true)
{
if (next == null)
@@ -459,21 +493,21 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
next._prev=this;
return next;
}
- QNode next_next = next._next.get();
- _next.compareAndSet(next, next_next);
+ QNode next_next = next.get();
+ compareAndSet(next, next_next);
next = next_next;
}
}
public boolean cancel()
{
- if (_event._state.compareAndSet(State.DELAYED,State.CANCELLED))
+ if (_event.compareAndSet(State.DELAYED,State.CANCELLED))
{
QNode prev = _prev;
- QNode next = _next.get();
+ QNode next = get();
if (prev != null && next != null && next.isDelayed())
{
- if (prev._next.compareAndSet(this, next))
+ if (prev.compareAndSet(this, next))
next._prev=prev;
}
return true;
@@ -483,13 +517,13 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
public Event dequeue()
{
- if (_event._state.compareAndSet(State.DELAYED,State.SCHEDULED))
+ if (_event.compareAndSet(State.DELAYED,State.SCHEDULED))
{
QNode prev = _prev;
- QNode next = _next.get();
+ QNode next = get();
if (prev != null && next != null && next.isDelayed())
{
- if (prev._next.compareAndSet(this, next))
+ if (prev.compareAndSet(this, next))
next._prev=prev;
}
return _event;
@@ -499,12 +533,12 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
public boolean isDelayed()
{
- return _event!=null && _event._state.get()==State.DELAYED;
+ return _event!=null && _event.get()==State.DELAYED;
}
public boolean isTail()
{
- return _event==null && _next.get()==null;
+ return _event==null && get()==null;
}
@@ -512,7 +546,7 @@ public class ConcurrentScheduler extends AggregateLifeCycle implements Runnable,
public String toString()
{
QNode p=_prev;
- QNode n=_next.get();
+ QNode n=get();
return String.format("QNode@%x{%x<-%s->%x}",hashCode(),p==null?0:p.hashCode(),_event,n==null?0:n.hashCode());
}
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SimpleScheduler.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java
index 76bc09b604..9ced2ed04d 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/SimpleScheduler.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TimerScheduler.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
-public class SimpleScheduler extends AbstractLifeCycle implements Scheduler
+public class TimerScheduler extends AbstractLifeCycle implements Scheduler
{
/* this class uses the Timer class rather than an ScheduledExecutionService because
* it uses the same algorithm internally and the signature is cheaper to use as there are no
@@ -37,12 +37,12 @@ public class SimpleScheduler extends AbstractLifeCycle implements Scheduler
Timer _timer;
final String _name;
- public SimpleScheduler()
+ public TimerScheduler()
{
this(null);
}
- public SimpleScheduler(String name)
+ public TimerScheduler(String name)
{
_name=name;
}
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java
index 4dd42bc82a..5a0559b763 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/SchedulerTest.java
@@ -28,11 +28,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
+import org.eclipse.jetty.util.BenchmarkHelper;
import org.eclipse.jetty.util.statistic.SampleStatistic;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -41,15 +43,18 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class SchedulerTest
{
- public static Executor executor = Executors.newFixedThreadPool(256);
+ private static final BenchmarkHelper benchmark = new BenchmarkHelper();
+ private static final Executor executor = Executors.newFixedThreadPool(256);
@Parameterized.Parameters
public static Collection<Object[]> data()
{
Object[][] data = new Object[][]{
- {new SimpleScheduler()},
- {new ConcurrentScheduler(executor,0)},
- {new ConcurrentScheduler(executor,2000)}
+ {new TimerScheduler()},
+ {new ScheduledExecutionServiceScheduler()},
+ {new ConcurrentScheduler(0)},
+ {new ConcurrentScheduler(1500)},
+ {new ConcurrentScheduler(executor,1500)}
};
return Arrays.asList(data);
}
@@ -132,14 +137,30 @@ public class SchedulerTest
Thread.sleep(1500);
Assert.assertEquals(0,executed.get());
}
-
-
+
@Test
@Slow
public void testManySchedulesAndCancels() throws Exception
{
- final Random random = new Random();
- Thread[] test = new Thread[500];
+ schedule(500,5000,2000,50);
+ }
+
+ @Test
+ @Slow
+ @Ignore
+ public void testBenchmark() throws Exception
+ {
+ schedule(2000,10000,2000,50);
+ benchmark.startStatistics();
+ System.err.println(_scheduler);
+ schedule(2000,30000,2000,50);
+ benchmark.stopStatistics();
+ }
+
+ private void schedule(int threads,final int duration, final int delay, final int interval) throws Exception
+ {
+ final Random random = new Random(1);
+ Thread[] test = new Thread[threads];
final AtomicInteger schedules = new AtomicInteger();
final SampleStatistic executions = new SampleStatistic();
@@ -156,18 +177,18 @@ public class SchedulerTest
{
long now = System.currentTimeMillis();
long start=now;
- long end=start+5000;
+ long end=start+duration;
- while (now<end)
+ while (now+interval<end)
{
- final int delay=random.nextInt((int)(end-now));
- final long expected = now+delay;
-
- int cancel=random.nextInt(200);
+ final long expected=now+delay;
+ int cancel=random.nextInt(interval);
+ boolean expected_to_execute=false;
if (cancel==0)
- cancel=(int)(end-now)+1000;
- else
- cancel=cancel/4;
+ {
+ expected_to_execute=true;
+ cancel=delay+1000;
+ }
schedules.incrementAndGet();
Scheduler.Task task=_scheduler.schedule(new Runnable()
@@ -184,10 +205,17 @@ public class SchedulerTest
now = System.currentTimeMillis();
if (task.cancel())
{
- long lateness=now-expected;
- cancellations.set(lateness);
+ if (expected_to_execute)
+ cancellations.set(now-expected);
+ else
+ cancellations.set(0);
+ }
+ else if (!expected_to_execute)
+ {
+ cancellations.set(9999); // flags failure
}
+ Thread.yield();
}
}
catch (InterruptedException e)
@@ -205,9 +233,9 @@ public class SchedulerTest
for (Thread thread : test)
thread.join();
- //System.err.println(schedules);
- //System.err.println(executions);
- //System.err.println(cancellations);
+ // System.err.println(schedules);
+ // System.err.println(executions);
+ // System.err.println(cancellations);
// there were some executions and cancellations
Assert.assertThat(executions.getCount(),Matchers.greaterThan(0L));
diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java
index 00e93929f9..45bcfe048a 100644
--- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java
+++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClientFactory.java
@@ -35,7 +35,7 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@@ -69,7 +69,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory(Executor threadPool)
{
- this(threadPool,new SimpleScheduler());
+ this(threadPool,new TimerScheduler());
}
public WebSocketClientFactory(Executor threadPool, Scheduler scheduler)
@@ -110,7 +110,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory(SslContextFactory sslContextFactory)
{
- this(new QueuedThreadPool(),new SimpleScheduler(),sslContextFactory);
+ this(new QueuedThreadPool(),new TimerScheduler(),sslContextFactory);
}
@Override
diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
index 4e28360bef..b6cca0c292 100644
--- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
+++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java
@@ -43,7 +43,7 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
-import org.eclipse.jetty.util.thread.SimpleScheduler;
+import org.eclipse.jetty.util.thread.TimerScheduler;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
@@ -78,7 +78,7 @@ public class WebSocketServerFactory extends AggregateLifeCycle implements WebSoc
/**
* Have the factory maintain 1 and only 1 scheduler. All connections share this scheduler.
*/
- private final Scheduler scheduler = new SimpleScheduler();
+ private final Scheduler scheduler = new TimerScheduler();
private final String supportedVersions;
private final WebSocketPolicy basePolicy;
private final EventMethodsCache methodsCache;

Back to the top