From 91bfa30e6cbc74b9d9b338bf29c797fcad07cc2f Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 12 May 2010 12:20:49 +0000 Subject: 312243 Optimized timeout handling git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1764 7e9141cc-0065-0410-87d8-b60c137991c4 --- VERSION.txt | 1 + .../org/eclipse/jetty/ajp/Ajp13Connection.java | 2 - .../java/org/eclipse/jetty/ajp/Ajp13Generator.java | 2 - .../eclipse/jetty/ajp/Ajp13SocketConnector.java | 10 +- .../org/eclipse/jetty/ajp/Ajp13ConnectionTest.java | 29 ++-- .../jetty/http/ssl/SslSelectChannelEndPoint.java | 29 ---- .../org/eclipse/jetty/io/ByteArrayEndPoint.java | 34 ++++- .../main/java/org/eclipse/jetty/io/EndPoint.java | 23 ++- .../org/eclipse/jetty/io/bio/SocketEndPoint.java | 26 ++++ .../org/eclipse/jetty/io/bio/StreamEndPoint.java | 13 ++ .../org/eclipse/jetty/io/nio/ChannelEndPoint.java | 34 ++++- .../jetty/io/nio/SelectChannelEndPoint.java | 62 ++++---- .../org/eclipse/jetty/io/nio/SelectorManager.java | 168 ++++++++++----------- .../eclipse/jetty/server/AbstractConnector.java | 2 - .../eclipse/jetty/server/bio/SocketConnector.java | 19 +-- .../jetty/server/nio/BlockingChannelConnector.java | 12 +- .../java/org/eclipse/jetty/server/StressTest.java | 2 +- .../jetty/websocket/WebSocketConnection.java | 19 ++- .../eclipse/jetty/websocket/WebSocketFactory.java | 4 +- 19 files changed, 273 insertions(+), 218 deletions(-) diff --git a/VERSION.txt b/VERSION.txt index 1a6e020008..fd8c885770 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -16,6 +16,7 @@ jetty-7.1.0 5 May 2010 + 306353 fixed cross context dispatch to root context. + 311154 Added deprecated StringBuffer API for backwards compatibility + 311554 Protect shutdown thread from Server#doStop + + 312243 Optimized timeout handling jetty-7.1.0.RC1 5 May 2010 + 286889 Allow System and Server classes to be set on Server instance and when applied to all webapps diff --git a/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Connection.java b/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Connection.java index 8f81643f98..807cc6fd98 100644 --- a/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Connection.java +++ b/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Connection.java @@ -34,8 +34,6 @@ import org.eclipse.jetty.server.Server; * Connection implementation of the Ajp13 protocol.

XXX Refactor to remove * duplication of HttpConnection * - * - * */ public class Ajp13Connection extends HttpConnection { diff --git a/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Generator.java b/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Generator.java index b147e448b2..ce2efa1bc8 100644 --- a/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Generator.java +++ b/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13Generator.java @@ -602,8 +602,6 @@ public class Ajp13Generator extends AbstractGenerator total += len; } - - return total; } catch (IOException e) diff --git a/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13SocketConnector.java b/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13SocketConnector.java index 6acaeccbe0..0aabac5611 100644 --- a/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13SocketConnector.java +++ b/jetty-ajp/src/main/java/org/eclipse/jetty/ajp/Ajp13SocketConnector.java @@ -33,12 +33,13 @@ public class Ajp13SocketConnector extends SocketConnector static boolean __allowShutdown = false; public Ajp13SocketConnector() { - super.setHeaderBufferSize(Ajp13Packet.MAX_DATA_SIZE); + super.setRequestHeaderSize(Ajp13Packet.MAX_DATA_SIZE); + super.setResponseHeaderSize(Ajp13Packet.MAX_DATA_SIZE); super.setRequestBufferSize(Ajp13Packet.MAX_DATA_SIZE); super.setResponseBufferSize(Ajp13Packet.MAX_DATA_SIZE); // IN AJP protocol the socket stay open, so - // by default the time out is set to 900 seconds - super.setMaxIdleTime(900000); + // by default the time out is set to 0 seconds + super.setMaxIdleTime(0); } @Override @@ -60,6 +61,9 @@ public class Ajp13SocketConnector extends SocketConnector super.customize(endpoint,request); if (request.isSecure()) request.setScheme(HttpSchemes.HTTPS); + + System.err.println("Customize "+endpoint+" "+request); + } /* ------------------------------------------------------------ */ diff --git a/jetty-ajp/src/test/java/org/eclipse/jetty/ajp/Ajp13ConnectionTest.java b/jetty-ajp/src/test/java/org/eclipse/jetty/ajp/Ajp13ConnectionTest.java index d9861a909b..1bf169ac8f 100644 --- a/jetty-ajp/src/test/java/org/eclipse/jetty/ajp/Ajp13ConnectionTest.java +++ b/jetty-ajp/src/test/java/org/eclipse/jetty/ajp/Ajp13ConnectionTest.java @@ -14,10 +14,13 @@ package org.eclipse.jetty.ajp; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.Socket; +import java.net.SocketTimeoutException; + import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -26,7 +29,9 @@ import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.log.Log; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -48,7 +53,6 @@ public class Ajp13ConnectionTest _connector=new Ajp13SocketConnector(); _connector.setPort(0); - _connector.setMaxIdleTime(100); _server.setConnectors(new Connector[] { _connector }); _server.setHandler(new Handler()); _server.start(); @@ -65,6 +69,7 @@ public class Ajp13ConnectionTest public void openSocket() throws Exception { _client=new Socket("localhost",_connector.getLocalPort()); + _client.setSoTimeout(500); } @After @@ -292,29 +297,17 @@ public class Ajp13ConnectionTest // TODO: char array instead of string? private String readResponse(Socket _client) throws IOException { - BufferedReader br=null; + ByteArrayOutputStream bout = new ByteArrayOutputStream(); try { - br=new BufferedReader(new InputStreamReader(_client.getInputStream())); - - StringBuffer sb=new StringBuffer(); - String line; - while ((line=br.readLine()) != null) - { - sb.append(line); - sb.append('\n'); - } - - return sb.toString(); + IO.copy(_client.getInputStream(),bout); } - finally + catch(SocketTimeoutException e) { - if (br != null) - { - br.close(); - } + Log.ignore(e); } + return bout.toString("utf-8"); } public static class Handler extends AbstractHandler diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/ssl/SslSelectChannelEndPoint.java b/jetty-http/src/main/java/org/eclipse/jetty/http/ssl/SslSelectChannelEndPoint.java index 848c0fc5bb..12af314109 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/ssl/SslSelectChannelEndPoint.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/ssl/SslSelectChannelEndPoint.java @@ -119,35 +119,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint { Log.info(""+_result); } - - /* ------------------------------------------------------------ */ - /* (non-Javadoc) - * @see org.eclipse.io.nio.SelectChannelEndPoint#idleExpired() - */ - @Override - protected void idleExpired() - { - try - { - getSelectManager().dispatch(new Runnable() - { - public void run() - { - doIdleExpired(); - } - }); - } - catch(Exception e) - { - Log.ignore(e); - } - } - - /* ------------------------------------------------------------ */ - protected void doIdleExpired() - { - super.idleExpired(); - } /* ------------------------------------------------------------ */ @Override diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index 484e3660b5..befe4f7be0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -24,13 +24,15 @@ import java.io.IOException; */ public class ByteArrayEndPoint implements ConnectedEndPoint { - byte[] _inBytes; - ByteArrayBuffer _in; - ByteArrayBuffer _out; - boolean _closed; - boolean _nonBlocking; - boolean _growOutput; - Connection _connection; + protected byte[] _inBytes; + protected ByteArrayBuffer _in; + protected ByteArrayBuffer _out; + protected boolean _closed; + protected boolean _nonBlocking; + protected boolean _growOutput; + protected Connection _connection; + protected int _maxIdleTime; + /* ------------------------------------------------------------ */ /** @@ -353,5 +355,23 @@ public class ByteArrayEndPoint implements ConnectedEndPoint _growOutput=growOutput; } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.EndPoint#getMaxIdleTime() + */ + public int getMaxIdleTime() + { + return _maxIdleTime; + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.EndPoint#setMaxIdleTime(int) + */ + public void setMaxIdleTime(int timeMs) throws IOException + { + _maxIdleTime=timeMs; + } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 24e7fb9fc6..4a329da7ba 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -16,14 +16,12 @@ package org.eclipse.jetty.io; import java.io.IOException; - /** * * A transport EndPoint */ public interface EndPoint { - /** * Close any backing stream associated with the buffer */ @@ -149,4 +147,25 @@ public interface EndPoint */ public void flush() throws IOException; + + /* ------------------------------------------------------------ */ + /** Get the max idle time in ms. + *

The max idle time is the time the endpoint can be idle before + * extraordinary handling takes place. This loosely corresponds to + * the {@link java.net.Socket#getSoTimeout()} for blocking connections, + * but {@link AsyncEndPoint} implementations must use other mechanisms + * to implement the max idle time. + * @return the max idle time in ms. + */ + public int getMaxIdleTime(); + + /* ------------------------------------------------------------ */ + /** Set the max idle time. + * @param timeMs the max idle time in MS. + * @throws IOException if the timeout cannot be set. + */ + public void setMaxIdleTime(int timeMs) throws IOException; + + + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/SocketEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/bio/SocketEndPoint.java index 35dc953312..b53ddb50f5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/SocketEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/bio/SocketEndPoint.java @@ -41,6 +41,18 @@ public class SocketEndPoint extends StreamEndPoint { super(socket.getInputStream(),socket.getOutputStream()); _socket=socket; + super.setMaxIdleTime(_socket.getSoTimeout()); + } + + /** + * + */ + protected SocketEndPoint(Socket socket, int maxIdleTime) + throws IOException + { + super(socket.getInputStream(),socket.getOutputStream()); + _socket=socket; + super.setMaxIdleTime(maxIdleTime); } /* (non-Javadoc) @@ -177,4 +189,18 @@ public class SocketEndPoint extends StreamEndPoint { return _socket; } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int) + */ + @Override + public void setMaxIdleTime(int timeMs) throws IOException + { + if (timeMs!=getMaxIdleTime()) + _socket.setSoTimeout(timeMs>0?timeMs:0); + super.setMaxIdleTime(timeMs); + } + + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java index 355ac3bd81..eee564a06b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/bio/StreamEndPoint.java @@ -31,6 +31,7 @@ public class StreamEndPoint implements EndPoint { InputStream _in; OutputStream _out; + int _maxIdleTime; /** * @@ -280,5 +281,17 @@ public class StreamEndPoint implements EndPoint { return false; } + + /* ------------------------------------------------------------ */ + public int getMaxIdleTime() + { + return _maxIdleTime; + } + + /* ------------------------------------------------------------ */ + public void setMaxIdleTime(int timeMs) throws IOException + { + _maxIdleTime=timeMs; + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java index 762e9380df..d89161cd05 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/ChannelEndPoint.java @@ -41,15 +41,30 @@ public class ChannelEndPoint implements EndPoint protected final Socket _socket; protected InetSocketAddress _local; protected InetSocketAddress _remote; + protected int _maxIdleTime; /** * */ - public ChannelEndPoint(ByteChannel channel) + public ChannelEndPoint(ByteChannel channel) throws IOException { super(); this._channel = channel; _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null; + if (_socket!=null) + _maxIdleTime=_socket.getSoTimeout(); + } + + /** + * + */ + protected ChannelEndPoint(ByteChannel channel, int maxIdleTime) throws IOException + { + this._channel = channel; + _maxIdleTime=maxIdleTime; + _socket=(channel instanceof SocketChannel)?((SocketChannel)channel).socket():null; + if (_socket!=null) + _socket.setSoTimeout(_maxIdleTime); } public boolean isBlocking() @@ -442,4 +457,21 @@ public class ChannelEndPoint implements EndPoint { return false; } + + /* ------------------------------------------------------------ */ + public int getMaxIdleTime() + { + return _maxIdleTime; + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.bio.StreamEndPoint#setMaxIdleTime(int) + */ + public void setMaxIdleTime(int timeMs) throws IOException + { + if (_socket!=null && timeMs!=_maxIdleTime) + _socket.setSoTimeout(timeMs>0?timeMs:0); + _maxIdleTime=timeMs; + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java index e479a8bc76..2449ebea57 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @@ -46,10 +46,11 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, private boolean _readBlocked; private boolean _writeBlocked; private boolean _open; - private final Timeout.Task _idleTask = new IdleTask(); + private volatile long _idleTimestamp; /* ------------------------------------------------------------ */ public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) + throws IOException { super(channel); @@ -61,7 +62,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, _key = key; _connection = _manager.newConnection(channel,this); - _manager.endPointOpened(this); scheduleIdle(); } @@ -198,13 +198,23 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, /* ------------------------------------------------------------ */ public void scheduleIdle() { - _selectSet.scheduleIdle(_idleTask); + _idleTimestamp=System.currentTimeMillis(); } /* ------------------------------------------------------------ */ public void cancelIdle() { - _selectSet.cancelIdle(_idleTask); + _idleTimestamp=0; + } + + /* ------------------------------------------------------------ */ + public void checkIdleTimestamp(long now) + { + if (_idleTimestamp!=0 && _maxIdleTime!=0 && now>(_idleTimestamp+_maxIdleTime)) + { + System.err.println("EXPIRED "+now+">("+_idleTimestamp+"+"+_maxIdleTime+")"); + idleExpired(); + } } /* ------------------------------------------------------------ */ @@ -336,7 +346,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, finally { _writeBlocked=false; - if (_idleTask.isScheduled()) + if (_idleTimestamp!=-1) scheduleIdle(); } } @@ -426,7 +436,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, cancelIdle(); if (_open) - _manager.endPointClosed(this); + { + _selectSet.destroyEndPoint(this); + } _open=false; _key = null; } @@ -452,7 +464,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, cancelIdle(); if (_open) - _manager.endPointClosed(this); + { + _selectSet.destroyEndPoint(this); + } _open=false; _key = null; } @@ -554,12 +568,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } } - /* ------------------------------------------------------------ */ - public Timeout.Task getTimeoutTask() - { - return _idleTask; - } - /* ------------------------------------------------------------ */ public SelectSet getSelectSet() { @@ -567,24 +575,16 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, } /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - private class IdleTask extends Timeout.Task + /** + * Don't set the SoTimeout + * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int) + */ + @Override + public void setMaxIdleTime(int timeMs) throws IOException { - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.thread.Timeout.Task#expire() - */ - @Override - public void expired() - { - idleExpired(); - } - - @Override - public String toString() - { - return "TimeoutTask:" + SelectChannelEndPoint.this.toString(); - } + _maxIdleTime=timeMs; } + + + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index abd91be175..497d605c3f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -21,7 +21,11 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; -import java.util.List; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; @@ -49,9 +53,9 @@ public abstract class SelectorManager extends AbstractLifeCycle private static final int __BUSY_PAUSE=Integer.getInteger("org.mortbay.io.nio.BUSY_PAUSE",50).intValue(); private static final int __BUSY_KEY=Integer.getInteger("org.mortbay.io.nio.BUSY_KEY",-1).intValue(); - private long _maxIdleTime; + private int _maxIdleTime; + private int _lowResourcesMaxIdleTime; private long _lowResourcesConnections; - private long _lowResourcesMaxIdleTime; private transient SelectSet[] _selectSet; private int _selectSets=1; private volatile int _set; @@ -63,7 +67,7 @@ public abstract class SelectorManager extends AbstractLifeCycle */ public void setMaxIdleTime(long maxIdleTime) { - _maxIdleTime=maxIdleTime; + _maxIdleTime=(int)maxIdleTime; } /* ------------------------------------------------------------ */ @@ -167,7 +171,7 @@ public abstract class SelectorManager extends AbstractLifeCycle */ public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime) { - _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime; + _lowResourcesMaxIdleTime=(int)lowResourcesMaxIdleTime; } /* ------------------------------------------------------------ */ @@ -296,13 +300,13 @@ public abstract class SelectorManager extends AbstractLifeCycle public class SelectSet { private final int _setID; - private final Timeout _idleTimeout; private final Timeout _timeout; - private final List[] _changes; - - private int _change; - private int _nextSet; + + private final ConcurrentLinkedQueue _changes = new ConcurrentLinkedQueue(); + private Selector _selector; + + private int _nextSet; private volatile Thread _selecting; private int _jvmBug; private int _selects; @@ -316,21 +320,20 @@ public abstract class SelectorManager extends AbstractLifeCycle private int _jvmFix0; private int _jvmFix1; private int _jvmFix2; + private volatile long _idleTick; + private ConcurrentMap _endPoints = new ConcurrentHashMap(); /* ------------------------------------------------------------ */ SelectSet(int acceptorID) throws Exception { _setID=acceptorID; - _idleTimeout = new Timeout(this); - _idleTimeout.setDuration(getMaxIdleTime()); + _idleTick = System.currentTimeMillis(); _timeout = new Timeout(this); _timeout.setDuration(0L); - _changes = new List[] {new ArrayList(),new ArrayList()}; // create a selector; _selector = Selector.open(); - _change=0; _monitorStart=System.currentTimeMillis(); _monitorNext=_monitorStart+__MONITOR_PERIOD; _log=_monitorStart+60000; @@ -339,10 +342,7 @@ public abstract class SelectorManager extends AbstractLifeCycle /* ------------------------------------------------------------ */ public void addChange(Object point) { - synchronized (_changes) - { - _changes[_change].add(point); - } + _changes.add(point); } /* ------------------------------------------------------------ */ @@ -356,12 +356,6 @@ public abstract class SelectorManager extends AbstractLifeCycle addChange(new ChangeSelectableChannel(channel,att)); } - /* ------------------------------------------------------------ */ - public void cancelIdle(Timeout.Task task) - { - task.cancel(); - } - /* ------------------------------------------------------------ */ /** * Select and dispatch tasks found from changes and the selector. @@ -373,44 +367,36 @@ public abstract class SelectorManager extends AbstractLifeCycle try { _selecting=Thread.currentThread(); - List changes; - final Selector selector; - synchronized (_changes) - { - changes=_changes[_change]; - _change=_change==0?1:0; - selector=_selector; - } + final Selector selector=_selector; // Make any key changes required - final int size=changes.size(); - for (int i = 0; i < size; i++) + Object change; + int changes=_changes.size(); + while (changes-->0 && (change=_changes.poll())!=null) { try { - Object o = changes.get(i); - - if (o instanceof EndPoint) + if (change instanceof EndPoint) { // Update the operations for a key. - SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o; + SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; endpoint.doUpdateKey(); } - else if (o instanceof Runnable) + else if (change instanceof Runnable) { - dispatch((Runnable)o); + dispatch((Runnable)change); } - else if (o instanceof ChangeSelectableChannel) + else if (change instanceof ChangeSelectableChannel) { // finish accepting/connecting this connection - final ChangeSelectableChannel asc = (ChangeSelectableChannel)o; + final ChangeSelectableChannel asc = (ChangeSelectableChannel)change; final SelectableChannel channel=asc._channel; final Object att = asc._attachment; if ((channel instanceof SocketChannel) && ((SocketChannel)channel).isConnected()) { SelectionKey key = channel.register(selector,SelectionKey.OP_READ,att); - SelectChannelEndPoint endpoint = newEndPoint((SocketChannel)channel,this,key); + SelectChannelEndPoint endpoint = createEndPoint((SocketChannel)channel,key); key.attach(endpoint); endpoint.schedule(); } @@ -419,14 +405,14 @@ public abstract class SelectorManager extends AbstractLifeCycle channel.register(selector,SelectionKey.OP_CONNECT,att); } } - else if (o instanceof SocketChannel) + else if (change instanceof SocketChannel) { - final SocketChannel channel=(SocketChannel)o; + final SocketChannel channel=(SocketChannel)change; if (channel.isConnected()) { SelectionKey key = channel.register(selector,SelectionKey.OP_READ,null); - SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); endpoint.schedule(); } @@ -435,17 +421,17 @@ public abstract class SelectorManager extends AbstractLifeCycle channel.register(selector,SelectionKey.OP_CONNECT,null); } } - else if (o instanceof ServerSocketChannel) + else if (change instanceof ServerSocketChannel) { - ServerSocketChannel channel = (ServerSocketChannel)o; + ServerSocketChannel channel = (ServerSocketChannel)change; channel.register(getSelector(),SelectionKey.OP_ACCEPT); } - else if (o instanceof ChangeTask) + else if (change instanceof ChangeTask) { - ((ChangeTask)o).run(); + ((ChangeTask)change).run(); } else - throw new IllegalArgumentException(o.toString()); + throw new IllegalArgumentException(change.toString()); } catch (Exception e) { @@ -455,28 +441,15 @@ public abstract class SelectorManager extends AbstractLifeCycle Log.debug(e); } } - changes.clear(); - long idle_next; long retry_next; long now=System.currentTimeMillis(); - synchronized (this) - { - _idleTimeout.setNow(now); - _timeout.setNow(now); - - if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections) - _idleTimeout.setDuration(_lowResourcesMaxIdleTime); - else - _idleTimeout.setDuration(_maxIdleTime); - idle_next=_idleTimeout.getTimeToNext(); - retry_next=_timeout.getTimeToNext(); - } + _timeout.setNow(now); + + retry_next=_timeout.getTimeToNext(); // workout how low to wait in select - long wait = 1000L; // not getMaxIdleTime() as the now value of the idle timers needs to be updated. - if (idle_next >= 0 && wait > idle_next) - wait = idle_next; + long wait = 1000L; if (wait > 0 && retry_next >= 0 && wait > retry_next) wait = retry_next; @@ -499,7 +472,6 @@ public abstract class SelectorManager extends AbstractLifeCycle long before=now; int selected=selector.select(wait); now = System.currentTimeMillis(); - _idleTimeout.setNow(now); _timeout.setNow(now); _selects++; @@ -680,7 +652,8 @@ public abstract class SelectorManager extends AbstractLifeCycle { // bind connections to this select set. SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ); - SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey); + + SelectChannelEndPoint endpoint=_selectSet[_nextSet].createEndPoint(channel,cKey); cKey.attach(endpoint); if (endpoint != null) endpoint.schedule(); @@ -710,7 +683,7 @@ public abstract class SelectorManager extends AbstractLifeCycle if (connected) { key.interestOps(SelectionKey.OP_READ); - SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); endpoint.schedule(); } @@ -724,7 +697,7 @@ public abstract class SelectorManager extends AbstractLifeCycle { // Wrap readable registered channel in an endpoint SocketChannel channel = (SocketChannel)key.channel(); - SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); + SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); if (key.isReadable()) endpoint.schedule(); @@ -750,9 +723,6 @@ public abstract class SelectorManager extends AbstractLifeCycle // Everything always handled selector.selectedKeys().clear(); - // tick over the timers - _idleTimeout.tick(now); - _timeout.setNow(now); Task task = _timeout.expired(); while (task!=null) @@ -764,6 +734,27 @@ public abstract class SelectorManager extends AbstractLifeCycle task = _timeout.expired(); } + + // Idle tick + if (now-_idleTick>1000) + { + _idleTick=now; + + final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) + ?(now+_maxIdleTime-_lowResourcesMaxIdleTime) + :now; + + dispatch(new Runnable() + { + public void run() + { + for (SelectChannelEndPoint endp:_endPoints.keySet()) + { + endp.checkIdleTimestamp(idle_now); + } + } + }); + } } catch (CancelledKeyException e) { @@ -784,15 +775,7 @@ public abstract class SelectorManager extends AbstractLifeCycle /* ------------------------------------------------------------ */ public long getNow() { - return _idleTimeout.getNow(); - } - - /* ------------------------------------------------------------ */ - public void scheduleIdle(Timeout.Task task) - { - if (_idleTimeout.getDuration() <= 0) - return; - _idleTimeout.schedule(task); + return _timeout.getNow(); } /* ------------------------------------------------------------ */ @@ -820,6 +803,22 @@ public abstract class SelectorManager extends AbstractLifeCycle if (selector!=null) selector.wakeup(); } + + /* ------------------------------------------------------------ */ + private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException + { + SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); + endPointOpened(endp); + _endPoints.put(endp,this); + return endp; + } + + /* ------------------------------------------------------------ */ + public void destroyEndPoint(SelectChannelEndPoint endp) + { + _endPoints.remove(endp); + endPointClosed(endp); + } /* ------------------------------------------------------------ */ Selector getSelector() @@ -865,7 +864,6 @@ public abstract class SelectorManager extends AbstractLifeCycle selecting=_selecting!=null; } - _idleTimeout.cancelAll(); _timeout.cancelAll(); try { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 44ab147910..cc8877dcac 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -490,8 +490,6 @@ public abstract class AbstractConnector extends HttpBuffers implements Connector try { socket.setTcpNoDelay(true); - if (_maxIdleTime >= 0) - socket.setSoTimeout(_maxIdleTime); if (_soLingerTime >= 0) socket.setSoLinger(true,_soLingerTime / 1000); else diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java index 99b8f0a598..e3104dbb8d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java @@ -127,11 +127,7 @@ public class SocketConnector extends AbstractConnector { ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; - if (connection._sotimeout!=lrmit) - { - connection._sotimeout=lrmit; - ((Socket)endpoint.getTransport()).setSoTimeout(lrmit); - } + connection.setMaxIdleTime(lrmit); super.customize(endpoint, request); } @@ -177,14 +173,12 @@ public class SocketConnector extends AbstractConnector { boolean _dispatched=false; volatile Connection _connection; - int _sotimeout; protected final Socket _socket; public ConnectorEndPoint(Socket socket) throws IOException { - super(socket); + super(socket,_maxIdleTime); _connection = newConnection(this); - _sotimeout=socket.getSoTimeout(); _socket=socket; } @@ -241,14 +235,7 @@ public class SocketConnector extends AbstractConnector if (_connection.isIdle()) { if (isLowResources()) - { - int lrmit = getLowResourcesMaxIdleTime(); - if (lrmit>=0 && _sotimeout!= lrmit) - { - _sotimeout=lrmit; - _socket.setSoTimeout(_sotimeout); - } - } + setMaxIdleTime(getLowResourcesMaxIdleTime()); } _connection=_connection.handle(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java index 1973e0a4d0..3927cb1caf 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java @@ -100,14 +100,8 @@ public class BlockingChannelConnector extends AbstractNIOConnector public void customize(EndPoint endpoint, Request request) throws IOException { - ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; - if (connection._sotimeout!=_maxIdleTime) - { - connection._sotimeout=_maxIdleTime; - ((SocketChannel)endpoint.getTransport()).socket().setSoTimeout(_maxIdleTime); - } - super.customize(endpoint, request); + endpoint.setMaxIdleTime(_maxIdleTime); configure(((SocketChannel)endpoint.getTransport()).socket()); } @@ -130,12 +124,12 @@ public class BlockingChannelConnector extends AbstractNIOConnector int _sotimeout; ConnectorEndPoint(ByteChannel channel) + throws IOException { - super(channel); + super(channel,BlockingChannelConnector.this._maxIdleTime); _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer()); } - /* ------------------------------------------------------------ */ /** Get the connection. * @return the connection diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java index eecf2d80e5..a525af9dea 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/StressTest.java @@ -403,7 +403,7 @@ public class StressTest extends TestCase if (_stress) { System.err.println("STRESS!"); - doThreads(200,400,true); + doThreads(200,100,true); } else doThreads(20,40,true); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java index 02c5fa35fb..0935b740e6 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java @@ -8,6 +8,7 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.thread.Timeout; public class WebSocketConnection implements Connection, WebSocket.Outbound { @@ -17,24 +18,26 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound final WebSocketGenerator _generator; final long _timestamp; final WebSocket _websocket; - final int _maxIdleTimeMs; public WebSocketConnection(WebSocket websocket, EndPoint endpoint) + throws IOException { this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000); } - public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, long maxIdleTime) + public WebSocketConnection(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime) + throws IOException { // TODO - can we use the endpoint idle mechanism? if (endpoint instanceof AsyncEndPoint) ((AsyncEndPoint)endpoint).cancelIdle(); _endp = endpoint; + _endp.setMaxIdleTime(maxIdleTime); + _timestamp = timestamp; _websocket = websocket; _generator = new WebSocketGenerator(buffers, _endp); - _maxIdleTimeMs=(int)maxIdleTime; _parser = new WebSocketParser(buffers, endpoint, new WebSocketParser.EventHandler() { public void onFrame(byte frame, String data) @@ -81,10 +84,10 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound { public void access(EndPoint endp) { - scep.getSelectSet().scheduleTimeout(scep.getTimeoutTask(),_maxIdleTimeMs); + scep.scheduleIdle(); } }; - scep.getSelectSet().scheduleTimeout(scep.getTimeoutTask(),_maxIdleTimeMs); + scep.scheduleIdle(); } else { @@ -162,7 +165,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound public void sendMessage(byte frame, String content) throws IOException { - _generator.addFrame(frame,content,_maxIdleTimeMs); + _generator.addFrame(frame,content,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); @@ -175,7 +178,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound public void sendMessage(byte frame, byte[] content, int offset, int length) throws IOException { - _generator.addFrame(frame,content,offset,length,_maxIdleTimeMs); + _generator.addFrame(frame,content,offset,length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); @@ -185,7 +188,7 @@ public class WebSocketConnection implements Connection, WebSocket.Outbound { try { - _generator.flush(_maxIdleTimeMs); + _generator.flush(_endp.getMaxIdleTime()); _endp.close(); } catch(IOException e) diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java index cd8eb94748..6ebd208765 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java @@ -16,7 +16,7 @@ import org.eclipse.jetty.server.HttpConnection; public class WebSocketFactory { private WebSocketBuffers _buffers; - private long _maxIdleTime=300000; + private int _maxIdleTime=300000; /* ------------------------------------------------------------ */ public WebSocketFactory() @@ -43,7 +43,7 @@ public class WebSocketFactory /** Set the maxIdleTime. * @param maxIdleTime the maxIdleTime to set */ - public void setMaxIdleTime(long maxIdleTime) + public void setMaxIdleTime(int maxIdleTime) { _maxIdleTime = maxIdleTime; } -- cgit v1.2.3