Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2012-05-10 14:53:56 +0000
committerGreg Wilkins2012-05-10 14:53:56 +0000
commit6d70fa1c2aad214d756da26f1dae1769345e1356 (patch)
treebbcfa803d507604b504422a4c1bb7c51be3268a2
parente348f2d4f5c8c970cd73af0b241d95acf47767ac (diff)
downloadorg.eclipse.jetty.project-6d70fa1c2aad214d756da26f1dae1769345e1356.tar.gz
org.eclipse.jetty.project-6d70fa1c2aad214d756da26f1dae1769345e1356.tar.xz
org.eclipse.jetty.project-6d70fa1c2aad214d756da26f1dae1769345e1356.zip
jetty-9 factored out WriteFlusher and ReadInterest
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java5
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java3
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java84
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java188
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java1
-rw-r--r--jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java139
-rw-r--r--jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java1
7 files changed, 275 insertions, 146 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
index d96a69a900..49f1c95a9a 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java
@@ -64,6 +64,11 @@ public abstract class AbstractEndPoint implements EndPoint
}
/* ------------------------------------------------------------ */
+ public void onClose()
+ {
+ }
+
+ /* ------------------------------------------------------------ */
@Override
public String toString()
{
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java
index 57964bb7df..2627459468 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java
@@ -59,4 +59,7 @@ public interface AsyncEndPoint extends EndPoint
AsyncConnection getAsyncConnection();
void setAsyncConnection(AsyncConnection connection);
+
+ void onClose();
+
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java
new file mode 100644
index 0000000000..cf113d45e4
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ReadInterest.java
@@ -0,0 +1,84 @@
+package org.eclipse.jetty.io;
+
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadPendingException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.util.Callback;
+
+public class ReadInterest
+{
+ private final AtomicBoolean _interested = new AtomicBoolean(false);
+ private volatile Callback _readCallback;
+ private Object _readContext;
+
+ /* ------------------------------------------------------------ */
+ protected ReadInterest()
+ {
+ }
+
+ /* ------------------------------------------------------------ */
+ public void readable(Object context, Callback callback) throws ReadPendingException
+ {
+ if (!_interested.compareAndSet(false,true))
+ throw new ReadPendingException();
+ _readContext=context;
+ _readCallback=callback;
+ if (makeInterested())
+ completed();
+ }
+
+
+ /* ------------------------------------------------------------ */
+ public void completed()
+ {
+ if (_interested.compareAndSet(true,false))
+ {
+ Callback callback=_readCallback;
+ Object context=_readContext;
+ _readCallback=null;
+ _readContext=null;
+ callback.completed(context);
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ public boolean isInterested()
+ {
+ return _interested.get();
+ }
+
+ /* ------------------------------------------------------------ */
+ public void failed(Throwable cause)
+ {
+ if (_interested.compareAndSet(true,false))
+ {
+ Callback callback=_readCallback;
+ Object context=_readContext;
+ _readCallback=null;
+ _readContext=null;
+ callback.failed(context,cause);
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ public void close()
+ {
+ if (_interested.compareAndSet(true,false))
+ {
+ Callback callback=_readCallback;
+ Object context=_readContext;
+ _readCallback=null;
+ _readContext=null;
+ callback.failed(context,new ClosedChannelException());
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ protected boolean makeInterested()
+ {
+ throw new IllegalStateException("Unimplemented");
+ }
+
+}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
index e2a9350056..f649a81218 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java
@@ -15,6 +15,7 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
@@ -41,11 +42,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
- private Callback _readCallback;
- private Object _readContext;
- private Callback _writeCallback;
- private Object _writeContext;
-
private SelectionKey _key;
private boolean _selected;
@@ -60,7 +56,26 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
private volatile boolean _idlecheck;
private volatile AsyncConnection _connection;
- private ByteBuffer[] _writeBuffers;
+ private final ReadInterest _readInterest = new ReadInterest()
+ {
+ @Override
+ protected boolean makeInterested()
+ {
+ _interestOps=_interestOps | SelectionKey.OP_READ;
+ updateKey();
+ return false;
+ }
+ };
+
+ private final WriteFlusher _writeFlusher = new WriteFlusher(this)
+ {
+ @Override
+ protected void scheduleCompleteWrite()
+ {
+ _interestOps = _interestOps | SelectionKey.OP_WRITE;
+ updateKey();
+ }
+ };
/* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) throws IOException
@@ -138,9 +153,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
}
}
if (can_read)
- readCompleted();
+ _readInterest.completed();
if (can_write)
- completeWrite();
+ _writeFlusher.completeWrite();
}
/* ------------------------------------------------------------ */
@@ -175,7 +190,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
{
synchronized (_lock)
{
- if (_idlecheck || _readCallback!=null || _writeCallback!=null)
+ if (_idlecheck || _readInterest.isInterested() || _writeFlusher.isWriting())
{
long idleTimestamp = getIdleTimestamp();
long max_idle_time = getMaxIdleTime();
@@ -192,66 +207,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
_connection.onIdleExpired(idleForMs);
TimeoutException timeout = new TimeoutException();
- readFailed(timeout);
- writeFailed(timeout);
+ _readInterest.failed(timeout);
+ _writeFlusher.failWrite(timeout);
}
}
}
}
}
- /* ------------------------------------------------------------ */
- private void readCompleted()
- {
- if (_readCallback!=null)
- {
- Callback cb=_readCallback;
- Object ctx=_readContext;
- _readCallback=null;
- _readContext=null;
- cb.completed(ctx);
- }
- }
-
- /* ------------------------------------------------------------ */
- private void writeCompleted()
- {
- if (_writeCallback!=null)
- {
- Callback cb=_writeCallback;
- Object ctx=_writeContext;
- _writeCallback=null;
- _writeContext=null;
- _writeBuffers=null;
- cb.completed(ctx);
- }
- }
-
- /* ------------------------------------------------------------ */
- private void readFailed(Throwable cause)
- {
- if (_readCallback!=null)
- {
- Callback cb=_readCallback;
- Object ctx=_readContext;
- _readCallback=null;
- _readContext=null;
- cb.failed(ctx,cause);
- }
- }
-
- /* ------------------------------------------------------------ */
- private void writeFailed(Throwable cause)
- {
- if (_writeCallback!=null)
- {
- Callback cb=_writeCallback;
- Object ctx=_writeContext;
- _writeCallback=null;
- _writeContext=null;
- cb.failed(ctx,cause);
- }
- }
/* ------------------------------------------------------------ */
@Override
@@ -267,80 +230,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override
public <C> void readable(C context, Callback<C> callback) throws IllegalStateException
{
- synchronized (_lock)
- {
- if (_readCallback != null)
- throw new IllegalStateException("previous read not complete");
- _readContext=context;
- _readCallback=callback;
- _interestOps=_interestOps | SelectionKey.OP_READ;
- updateKey();
- }
+ _readInterest.readable(context,callback);
}
-
+
/* ------------------------------------------------------------ */
@Override
public <C> void write(C context, Callback<C> callback, ByteBuffer... buffers) throws IllegalStateException
{
- synchronized (_lock)
- {
- try
- {
- if (_writeCallback!=null)
- throw new IllegalStateException("previous write not complete");
-
- flush(buffers);
-
- // Are we complete?
- for (ByteBuffer b : buffers)
- {
- if (b.hasRemaining())
- {
- _writeBuffers=buffers;
- _writeContext=context;
- _writeCallback=callback;
- _interestOps = _interestOps | SelectionKey.OP_WRITE;
- updateKey();
- return;
- }
- }
-
- callback.completed(context);
- }
- catch (IOException e)
- {
- callback.failed(context,e);
- }
- }
- }
-
- /* ------------------------------------------------------------ */
- private void completeWrite()
- {
- if (_writeBuffers==null)
- return;
-
- try
- {
- flush(_writeBuffers);
-
- // Are we complete?
- for (ByteBuffer b : _writeBuffers)
- {
- if (b.hasRemaining())
- {
- _interestOps = _interestOps | SelectionKey.OP_WRITE;
- return;
- }
- }
- // we are complete and ready
- writeCompleted();
- }
- catch (IOException e)
- {
- writeFailed(e);
- }
-
+ _writeFlusher.write(context,callback,buffers);
}
/* ------------------------------------------------------------ */
@@ -359,7 +256,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
*/
private void updateKey()
{
- synchronized (this)
+ synchronized (_lock)
{
if (!_selected)
{
@@ -466,19 +363,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
@Override
public void close()
{
- synchronized (_lock)
- {
- try
- {
- super.close();
- }
- finally
- {
- updateKey();
- }
- }
+ super.close();
+ updateKey();
}
-
+
+ /* ------------------------------------------------------------ */
+ @Override
+ public void onClose()
+ {
+ _writeFlusher.close();
+ _readInterest.close();
+ }
+
/* ------------------------------------------------------------ */
@Override
public String toString()
@@ -508,7 +404,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements SelectorMa
}
return String.format("SCEP@%x{l(%s)<->r(%s),open=%b,ishut=%b,oshut=%b,i=%d%s,r=%s,w=%s}-{%s}",hashCode(),getRemoteAddress(),getLocalAddress(),isOpen(),
- isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readCallback,_writeCallback,getAsyncConnection());
+ isInputShutdown(),isOutputShutdown(),_interestOps,keyString,_readInterest,_writeFlusher,getAsyncConnection());
}
/* ------------------------------------------------------------ */
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
index 1a6fb34968..c4ffef3619 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java
@@ -845,6 +845,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
LOG.debug("destroyEndPoint {}",endp);
_endPoints.remove(endp);
AsyncConnection connection=endp.getAsyncConnection();
+ endp.onClose();
if (connection!=null)
connection.onClose();
endPointClosed(endp);
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
new file mode 100644
index 0000000000..a25f057095
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
@@ -0,0 +1,139 @@
+package org.eclipse.jetty.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.WritePendingException;
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.jetty.util.Callback;
+
+public class WriteFlusher
+{
+ private final AtomicBoolean _writing = new AtomicBoolean(false);
+ private final EndPoint _endp;
+
+ private ByteBuffer[] _writeBuffers;
+ private Object _writeContext;
+ private Callback _writeCallback;
+
+ protected WriteFlusher(EndPoint endp)
+ {
+ _endp=endp;
+ }
+
+ /* ------------------------------------------------------------ */
+ public void write(Object context, Callback callback, ByteBuffer... buffers)
+ {
+ if (!_writing.compareAndSet(false,true))
+ throw new WritePendingException();
+ try
+ {
+ _endp.flush(buffers);
+
+ // Are we complete?
+ for (ByteBuffer b : buffers)
+ {
+ if (b.hasRemaining())
+ {
+ _writeBuffers=buffers;
+ _writeContext=context;
+ _writeCallback=callback;
+ scheduleCompleteWrite();
+ _writing.set(true);
+ return;
+ }
+ }
+
+ if (!_writing.compareAndSet(true,false))
+ throw new ConcurrentModificationException();
+ callback.completed(context);
+ }
+ catch (IOException e)
+ {
+ if (!_writing.compareAndSet(true,false))
+ throw new ConcurrentModificationException();
+ callback.failed(context,e);
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ protected void scheduleCompleteWrite()
+ {
+ // _interestOps = _interestOps | SelectionKey.OP_WRITE;
+ // updateKey();
+ }
+
+ /* ------------------------------------------------------------ */
+ public void completeWrite()
+ {
+ if (!_writing.get())
+ return;
+
+ try
+ {
+ _endp.flush(_writeBuffers);
+
+ // Are we complete?
+ for (ByteBuffer b : _writeBuffers)
+ {
+ if (b.hasRemaining())
+ {
+ scheduleCompleteWrite();
+ return;
+ }
+ }
+
+ // we are complete and ready
+ Callback callback=_writeCallback;
+ Object context=_writeContext;
+ _writeBuffers=null;
+ _writeCallback=null;
+ if (!_writing.compareAndSet(true,false))
+ throw new ConcurrentModificationException();
+ callback.completed(context);
+ }
+ catch (IOException e)
+ {
+ Callback callback=_writeCallback;
+ Object context=_writeContext;
+ _writeBuffers=null;
+ _writeCallback=null;
+ if (!_writing.compareAndSet(true,false))
+ throw new ConcurrentModificationException();
+ callback.failed(context,e);
+ }
+ }
+
+ /* ------------------------------------------------------------ */
+ public void failWrite(Throwable th)
+ {
+ if (!_writing.compareAndSet(true,false))
+ return;
+ Callback callback=_writeCallback;
+ Object context=_writeContext;
+ _writeBuffers=null;
+ _writeCallback=null;
+ callback.failed(context,th);
+ }
+
+ /* ------------------------------------------------------------ */
+ public void close()
+ {
+ if (!_writing.compareAndSet(true,false))
+ return;
+ Callback callback=_writeCallback;
+ Object context=_writeContext;
+ _writeBuffers=null;
+ _writeCallback=null;
+ callback.failed(context,new ClosedChannelException());
+ }
+
+ /* ------------------------------------------------------------ */
+ public boolean isWriting()
+ {
+ return _writing.get();
+ }
+
+} \ No newline at end of file
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java
index 4d36d86196..ea5477354d 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java
@@ -19,6 +19,7 @@ import org.junit.Ignore;
import org.junit.Test;
+@Ignore
public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
{
static SslContextFactory __sslCtxFactory=new SslContextFactory();

Back to the top