diff options
Diffstat (limited to 'jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java')
-rw-r--r-- | jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java | 260 |
1 files changed, 235 insertions, 25 deletions
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 0d00a17a1c..be1b1f2c3b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -19,9 +19,9 @@ package org.eclipse.jetty.io; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -31,10 +31,10 @@ import org.eclipse.jetty.util.thread.Scheduler; public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint { + enum State {OPEN, ISHUTTING, ISHUT, OSHUTTING, OSHUT, CLOSED}; private static final Logger LOG = Log.getLogger(AbstractEndPoint.class); + private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN); private final long _created=System.currentTimeMillis(); - private final InetSocketAddress _local; - private final InetSocketAddress _remote; private volatile Connection _connection; private final FillInterest _fillInterest = new FillInterest() @@ -55,29 +55,237 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } }; - protected AbstractEndPoint(Scheduler scheduler,InetSocketAddress local,InetSocketAddress remote) + protected AbstractEndPoint(Scheduler scheduler) { super(scheduler); - _local=local; - _remote=remote; + } + + + protected final void shutdownInput() + { + while(true) + { + State s = _state.get(); + switch(s) + { + case OPEN: + if (!_state.compareAndSet(s,State.ISHUTTING)) + continue; + try + { + doShutdownInput(); + } + finally + { + if(!_state.compareAndSet(State.ISHUTTING,State.ISHUT)) + { + // If somebody else switched to CLOSED while we were ishutting, + // then we do the close for them + if (_state.get()==State.CLOSED) + doOnClose(); + else + throw new IllegalStateException(); + } + } + return; + + case ISHUTTING: // Somebody else ishutting + case ISHUT: // Already ishut + return; + + case OSHUTTING: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // The thread doing the OSHUT will close + return; + + case OSHUT: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // Already OSHUT so we close + doOnClose(); + return; + + case CLOSED: // already closed + return; + } + } } @Override - public long getCreatedTimeStamp() + public final void shutdownOutput() { - return _created; + while(true) + { + State s = _state.get(); + switch(s) + { + case OPEN: + if (!_state.compareAndSet(s,State.OSHUTTING)) + continue; + try + { + doShutdownOutput(); + } + finally + { + if(!_state.compareAndSet(State.OSHUTTING,State.OSHUT)) + { + // If somebody else switched to CLOSED while we were oshutting, + // then we do the close for them + if (_state.get()==State.CLOSED) + doOnClose(); + else + throw new IllegalStateException(); + } + } + return; + + case ISHUTTING: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // The thread doing the ISHUT will close + return; + + case ISHUT: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // Already ISHUT so we close + doOnClose(); + return; + + case OSHUTTING: // Somebody else oshutting + case OSHUT: // Already oshut + return; + + case CLOSED: // already closed + return; + } + } + } + + @Override + public final void close() + { + while(true) + { + State s = _state.get(); + switch(s) + { + case OPEN: + case ISHUT: // Already ishut + case OSHUT: // Already oshut + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + doOnClose(); + return; + + case ISHUTTING: // Somebody else ishutting + case OSHUTTING: // Somebody else oshutting + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // The thread doing the IO SHUT will call doOnClose + return; + + case CLOSED: // already closed + return; + } + } + } + + protected void doShutdownInput() + {} + + protected void doShutdownOutput() + {} + + protected void doClose() + {} + + private void doOnClose() + { + try + { + doClose(); + } + finally + { + onClose(); + } + } + + + @Override + public boolean isOutputShutdown() + { + switch(_state.get()) + { + case CLOSED: + case OSHUT: + case OSHUTTING: + return true; + default: + return false; + } + } + @Override + public boolean isInputShutdown() + { + switch(_state.get()) + { + case CLOSED: + case ISHUT: + case ISHUTTING: + return true; + default: + return false; + } } @Override - public InetSocketAddress getLocalAddress() + public boolean isOpen() + { + switch(_state.get()) + { + case CLOSED: + return false; + default: + return true; + } + } + + public void checkFlush() throws IOException { - return _local; + State s=_state.get(); + switch(s) + { + case OSHUT: + case OSHUTTING: + case CLOSED: + throw new IOException(s.toString()); + default: + break; + } + } + + public void checkFill() throws IOException + { + State s=_state.get(); + switch(s) + { + case ISHUT: + case ISHUTTING: + case CLOSED: + throw new IOException(s.toString()); + default: + break; + } } @Override - public InetSocketAddress getRemoteAddress() + public long getCreatedTimeStamp() { - return _remote; + return _created; } @Override @@ -98,12 +306,22 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint return false; } + + + protected void reset() + { + _state.set(State.OPEN); + _writeFlusher.onClose(); + _fillInterest.onClose(); + } + @Override public void onOpen() { if (LOG.isDebugEnabled()) LOG.debug("onOpen {}",this); - super.onOpen(); + if (_state.get()!=State.OPEN) + throw new IllegalStateException(); } @Override @@ -117,12 +335,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } @Override - public void close() - { - onClose(); - } - - @Override public void fillInterested(Callback callback) throws IllegalStateException { notIdle(); @@ -211,15 +423,13 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint c=c.getSuperclass(); name=c.getSimpleName(); } - - return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}", + + return String.format("%s@%x{%s<->%s,%s,%s|%s,%d/%d,%s}", name, hashCode(), getRemoteAddress(), - getLocalAddress().getPort(), - isOpen()?"Open":"CLOSED", - isInputShutdown()?"ISHUT":"in", - isOutputShutdown()?"OSHUT":"out", + getLocalAddress(), + _state.get(), _fillInterest.toStateString(), _writeFlusher.toStateString(), getIdleFor(), |