diff options
69 files changed, 1813 insertions, 693 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java index f4e9d90f7e..81959031f3 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpClientTransport.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Map; @@ -31,6 +32,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -173,13 +175,15 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp } @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key) + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) { - return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout()); + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); + endp.setIdleTimeout(client.getIdleTimeout()); + return endp; } @Override - public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException + public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException { @SuppressWarnings("unchecked") Map<String, Object> context = (Map<String, Object>)attachment; @@ -188,7 +192,7 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp } @Override - protected void connectionFailed(SocketChannel channel, Throwable x, Object attachment) + protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment) { @SuppressWarnings("unchecked") Map<String, Object> context = (Map<String, Object>)attachment; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java index 8dd287f492..535f7ecd89 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ssl/SslBytesServerTest.java @@ -54,10 +54,10 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.ssl.SslBytesTest.TLSRecord.Type; import org.eclipse.jetty.http.HttpParser; +import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; -import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConnection; @@ -173,9 +173,9 @@ public class SslBytesServerTest extends SslBytesTest ServerConnector connector = new ServerConnector(server, null,null,null,1,1,sslFactory, httpFactory) { @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { - SelectChannelEndPoint endp = super.newEndPoint(channel,selectSet,key); + ChannelEndPoint endp = super.newEndPoint(channel,selectSet,key); serverEndPoint.set(endp); return endp; } @@ -367,11 +367,19 @@ public class SslBytesServerTest extends SslBytesTest System.arraycopy(doneBytes, 0, chunk, recordBytes.length, doneBytes.length); System.arraycopy(closeRecordBytes, 0, chunk, recordBytes.length + doneBytes.length, closeRecordBytes.length); proxy.flushToServer(0, chunk); + // Close the raw socket proxy.flushToServer(null); // Expect the server to send a FIN as well record = proxy.readFromServer(); + if (record!=null) + { + // Close alert snuck out // TODO check if this is acceptable + Assert.assertEquals(Type.ALERT,record.getType()); + record = proxy.readFromServer(); + } + Assert.assertNull(record); // Check that we did not spin diff --git a/jetty-client/src/test/resources/jetty-logging.properties b/jetty-client/src/test/resources/jetty-logging.properties index 1c19e5331e..5f8794e83f 100644 --- a/jetty-client/src/test/resources/jetty-logging.properties +++ b/jetty-client/src/test/resources/jetty-logging.properties @@ -1,3 +1,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.client.LEVEL=DEBUG +#org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG
\ No newline at end of file diff --git a/jetty-distribution/pom.xml b/jetty-distribution/pom.xml index 476e8a6fc9..9878feb1ce 100644 --- a/jetty-distribution/pom.xml +++ b/jetty-distribution/pom.xml @@ -711,6 +711,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-unixsocket</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.eclipse.jetty.fcgi</groupId> <artifactId>fcgi-server</artifactId> <version>${project.version}</version> diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java index 6256a6d2c4..142d10bf4b 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2Client.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.client; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Arrays; @@ -36,8 +37,8 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.MappedByteBufferPool; -import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -318,13 +319,15 @@ public class HTTP2Client extends ContainerLifeCycle } @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { - return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout()); + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); + endp.setIdleTimeout(getIdleTimeout()); + return endp; } @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException { @SuppressWarnings("unchecked") Map<String, Object> context = (Map<String, Object>)attachment; @@ -335,7 +338,7 @@ public class HTTP2Client extends ContainerLifeCycle } @Override - protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment) + protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment) { @SuppressWarnings("unchecked") Map<String, Object> context = (Map<String, Object>)attachment; diff --git a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java index 554a5b4717..c78bf9f6e6 100644 --- a/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java +++ b/jetty-http2/http2-server/src/test/java/org/eclipse/jetty/http2/server/HTTP2ServerTest.java @@ -57,6 +57,7 @@ import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.generator.Generator; import org.eclipse.jetty.http2.parser.Parser; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.server.HttpChannel; @@ -329,7 +330,7 @@ public class HTTP2ServerTest extends AbstractServerTest ServerConnector connector2 = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration())) { @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout()) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index 5b49c8e44c..ceab75bd02 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -19,9 +19,9 @@ package org.eclipse.jetty.io; import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -31,10 +31,10 @@ import org.eclipse.jetty.util.thread.Scheduler; public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint { + enum State {OPEN, ISHUTTING, ISHUT, OSHUTTING, OSHUT, CLOSED}; private static final Logger LOG = Log.getLogger(AbstractEndPoint.class); + private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN); private final long _created=System.currentTimeMillis(); - private final InetSocketAddress _local; - private final InetSocketAddress _remote; private volatile Connection _connection; private final FillInterest _fillInterest = new FillInterest() @@ -55,29 +55,237 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } }; - protected AbstractEndPoint(Scheduler scheduler,InetSocketAddress local,InetSocketAddress remote) + protected AbstractEndPoint(Scheduler scheduler) { super(scheduler); - _local=local; - _remote=remote; + } + + + protected final void shutdownInput() + { + while(true) + { + State s = _state.get(); + switch(s) + { + case OPEN: + if (!_state.compareAndSet(s,State.ISHUTTING)) + continue; + try + { + doShutdownInput(); + } + finally + { + if(!_state.compareAndSet(State.ISHUTTING,State.ISHUT)) + { + // If somebody else switched to CLOSED while we were ishutting, + // then we do the close for them + if (_state.get()==State.CLOSED) + doOnClose(); + else + throw new IllegalStateException(); + } + } + return; + + case ISHUTTING: // Somebody else ishutting + case ISHUT: // Already ishut + return; + + case OSHUTTING: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // The thread doing the OSHUT will close + return; + + case OSHUT: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // Already OSHUT so we close + doOnClose(); + return; + + case CLOSED: // already closed + return; + } + } } @Override - public long getCreatedTimeStamp() + public final void shutdownOutput() { - return _created; + while(true) + { + State s = _state.get(); + switch(s) + { + case OPEN: + if (!_state.compareAndSet(s,State.OSHUTTING)) + continue; + try + { + doShutdownOutput(); + } + finally + { + if(!_state.compareAndSet(State.OSHUTTING,State.OSHUT)) + { + // If somebody else switched to CLOSED while we were oshutting, + // then we do the close for them + if (_state.get()==State.CLOSED) + doOnClose(); + else + throw new IllegalStateException(); + } + } + return; + + case ISHUTTING: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // The thread doing the ISHUT will close + return; + + case ISHUT: + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // Already ISHUT so we close + doOnClose(); + return; + + case OSHUTTING: // Somebody else oshutting + case OSHUT: // Already oshut + return; + + case CLOSED: // already closed + return; + } + } + } + + @Override + public final void close() + { + while(true) + { + State s = _state.get(); + switch(s) + { + case OPEN: + case ISHUT: // Already ishut + case OSHUT: // Already oshut + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + doOnClose(); + return; + + case ISHUTTING: // Somebody else ishutting + case OSHUTTING: // Somebody else oshutting + if (!_state.compareAndSet(s,State.CLOSED)) + continue; + // The thread doing the IO SHUT will call doOnClose + return; + + case CLOSED: // already closed + return; + } + } + } + + protected void doShutdownInput() + {} + + protected void doShutdownOutput() + {} + + protected void doClose() + {} + + private void doOnClose() + { + try + { + doClose(); + } + finally + { + onClose(); + } + } + + + @Override + public boolean isOutputShutdown() + { + switch(_state.get()) + { + case CLOSED: + case OSHUT: + case OSHUTTING: + return true; + default: + return false; + } + } + @Override + public boolean isInputShutdown() + { + switch(_state.get()) + { + case CLOSED: + case ISHUT: + case ISHUTTING: + return true; + default: + return false; + } } @Override - public InetSocketAddress getLocalAddress() + public boolean isOpen() + { + switch(_state.get()) + { + case CLOSED: + return false; + default: + return true; + } + } + + public void checkFlush() throws IOException { - return _local; + State s=_state.get(); + switch(s) + { + case OSHUT: + case OSHUTTING: + case CLOSED: + throw new IOException(s.toString()); + default: + break; + } + } + + public void checkFill() throws IOException + { + State s=_state.get(); + switch(s) + { + case ISHUT: + case ISHUTTING: + case CLOSED: + throw new IOException(s.toString()); + default: + break; + } } @Override - public InetSocketAddress getRemoteAddress() + public long getCreatedTimeStamp() { - return _remote; + return _created; } @Override @@ -98,12 +306,22 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint return false; } + + + protected void reset() + { + _state.set(State.OPEN); + _writeFlusher.onClose(); + _fillInterest.onClose(); + } + @Override public void onOpen() { if (LOG.isDebugEnabled()) LOG.debug("onOpen {}",this); - super.onOpen(); + if (_state.get()!=State.OPEN) + throw new IllegalStateException(); } @Override @@ -117,12 +335,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint } @Override - public void close() - { - onClose(); - } - - @Override public void fillInterested(Callback callback) throws IllegalStateException { notIdle(); @@ -207,15 +419,13 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint c=c.getSuperclass(); name=c.getSimpleName(); } - - return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}", + + return String.format("%s@%x{%s<->%s,%s,%s|%s,%d/%d,%s}", name, hashCode(), getRemoteAddress(), - getLocalAddress().getPort(), - isOpen()?"Open":"CLOSED", - isInputShutdown()?"ISHUT":"in", - isOutputShutdown()?"OSHUT":"out", + getLocalAddress(), + _state.get(), _fillInterest.toStateString(), _writeFlusher.toStateString(), getIdleFor(), diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index cbaebf78db..954d0e5742 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -20,7 +20,10 @@ package org.eclipse.jetty.io; import java.io.EOFException; import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; @@ -42,7 +45,28 @@ import org.eclipse.jetty.util.thread.Scheduler; public class ByteArrayEndPoint extends AbstractEndPoint { static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class); - public final static InetSocketAddress NOIP=new InetSocketAddress(0); + static final InetAddress NOIP; + static final InetSocketAddress NOIPPORT; + + static + { + InetAddress noip=null; + try + { + noip = Inet4Address.getByName("0.0.0.0"); + } + catch (UnknownHostException e) + { + LOG.warn(e); + } + finally + { + NOIP=noip; + NOIPPORT=new InetSocketAddress(NOIP,0); + } + } + + private static final ByteBuffer EOF = BufferUtil.allocate(0); private final Runnable _runFillable = new Runnable() @@ -57,9 +81,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint private final Locker _locker = new Locker(); private final Queue<ByteBuffer> _inQ = new ArrayQueue<>(); private ByteBuffer _out; - private boolean _ishut; - private boolean _oshut; - private boolean _closed; private boolean _growOutput; /* ------------------------------------------------------------ */ @@ -112,11 +133,26 @@ public class ByteArrayEndPoint extends AbstractEndPoint /* ------------------------------------------------------------ */ public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output) { - super(timer,NOIP,NOIP); + super(timer); if (BufferUtil.hasContent(input)) addInput(input); _out=output==null?BufferUtil.allocate(1024):output; setIdleTimeout(idleTimeoutMs); + onOpen(); + } + + /* ------------------------------------------------------------ */ + @Override + public InetSocketAddress getLocalAddress() + { + return NOIPPORT; + } + + /* ------------------------------------------------------------ */ + @Override + public InetSocketAddress getRemoteAddress() + { + return NOIPPORT; } /* ------------------------------------------------------------ */ @@ -138,7 +174,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint { try(Locker.Lock lock = _locker.lock()) { - if (_closed) + if (!isOpen()) throw new ClosedChannelException(); ByteBuffer in = _inQ.peek(); @@ -288,92 +324,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint } /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.io.EndPoint#isOpen() - */ - @Override - public boolean isOpen() - { - try(Locker.Lock lock = _locker.lock()) - { - return !_closed; - } - } - - /* ------------------------------------------------------------ */ - /* - */ - @Override - public boolean isInputShutdown() - { - try(Locker.Lock lock = _locker.lock()) - { - return _ishut||_closed; - } - } - - /* ------------------------------------------------------------ */ - /* - */ - @Override - public boolean isOutputShutdown() - { - try(Locker.Lock lock = _locker.lock()) - { - return _oshut||_closed; - } - } - - /* ------------------------------------------------------------ */ - public void shutdownInput() - { - boolean close=false; - try(Locker.Lock lock = _locker.lock()) - { - _ishut=true; - if (_oshut && !_closed) - close=_closed=true; - } - if (close) - super.close(); - } - - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.io.EndPoint#shutdownOutput() - */ - @Override - public void shutdownOutput() - { - boolean close=false; - try(Locker.Lock lock = _locker.lock()) - { - _oshut=true; - if (_ishut && !_closed) - close=_closed=true; - } - if (close) - super.close(); - } - - /* ------------------------------------------------------------ */ - /* - * @see org.eclipse.io.EndPoint#close() - */ - @Override - public void close() - { - boolean close=false; - try(Locker.Lock lock = _locker.lock()) - { - if (!_closed) - close=_closed=_ishut=_oshut=true; - } - if (close) - super.close(); - } - - /* ------------------------------------------------------------ */ /** * @return <code>true</code> if there are bytes remaining to be read from the encoded input */ @@ -390,15 +340,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint public int fill(ByteBuffer buffer) throws IOException { int filled=0; - boolean close=false; try(Locker.Lock lock = _locker.lock()) { while(true) { - if (_closed) + if (!isOpen()) throw new EofException("CLOSED"); - if (_ishut) + if (isInputShutdown()) return -1; if (_inQ.isEmpty()) @@ -407,9 +356,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint ByteBuffer in= _inQ.peek(); if (in==EOF) { - _ishut=true; - if (_oshut) - close=_closed=true; filled=-1; break; } @@ -425,10 +371,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint } } - if (close) - super.close(); if (filled>0) notIdle(); + else if (filled<0) + shutdownInput(); return filled; } @@ -439,9 +385,9 @@ public class ByteArrayEndPoint extends AbstractEndPoint @Override public boolean flush(ByteBuffer... buffers) throws IOException { - if (_closed) + if (!isOpen()) throw new IOException("CLOSED"); - if (_oshut) + if (isOutputShutdown()) throw new IOException("OSHUT"); boolean flushed=true; @@ -483,13 +429,12 @@ public class ByteArrayEndPoint extends AbstractEndPoint */ public void reset() { - getFillInterest().onClose(); - getWriteFlusher().onClose(); - _ishut=false; - _oshut=false; - _closed=false; - _inQ.clear(); + try(Locker.Lock lock = _locker.lock()) + { + _inQ.clear(); + } BufferUtil.clear(_out); + super.reset(); } /* ------------------------------------------------------------ */ diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 306e74fb0e..c33ab5a3f6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -19,37 +19,104 @@ package org.eclipse.jetty.io; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; -import java.nio.channels.SocketChannel; +import java.nio.channels.CancelledKeyException; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.SelectionKey; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; /** * Channel End Point. * <p>Holds the channel and socket for an NIO endpoint. */ -public class ChannelEndPoint extends AbstractEndPoint +public abstract class ChannelEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable { private static final Logger LOG = Log.getLogger(ChannelEndPoint.class); - private final SocketChannel _channel; - private final Socket _socket; - private volatile boolean _ishut; - private volatile boolean _oshut; + private final Locker _locker = new Locker(); + private final ByteChannel _channel; + private final GatheringByteChannel _gather; + protected final ManagedSelector _selector; + protected final SelectionKey _key; - public ChannelEndPoint(Scheduler scheduler,SocketChannel channel) + private boolean _updatePending; + + /** + * The current value for {@link SelectionKey#interestOps()}. + */ + protected int _currentInterestOps; + + /** + * The desired value for {@link SelectionKey#interestOps()}. + */ + protected int _desiredInterestOps; + + + private abstract class RunnableTask implements Runnable + { + final String _operation; + RunnableTask(String op) + { + _operation=op; + } + + @Override + public String toString() + { + return ChannelEndPoint.this.toString()+":"+_operation; + } + } + + private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey") + { + @Override + public void run() + { + updateKey(); + } + }; + + private final Runnable _runFillable = new RunnableTask("runFillable") + { + @Override + public void run() + { + getFillInterest().fillable(); + } + }; + + private final Runnable _runCompleteWrite = new RunnableTask("runCompleteWrite") + { + @Override + public void run() + { + getWriteFlusher().completeWrite(); + } + }; + + private final Runnable _runFillableCompleteWrite = new RunnableTask("runFillableCompleteWrite") + { + @Override + public void run() + { + getFillInterest().fillable(); + getWriteFlusher().completeWrite(); + } + }; + + public ChannelEndPoint(ByteChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { - super(scheduler, - (InetSocketAddress)channel.socket().getLocalSocketAddress(), - (InetSocketAddress)channel.socket().getRemoteSocketAddress()); + super(scheduler); _channel=channel; - _socket=channel.socket(); + _selector=selector; + _key=key; + _gather=(channel instanceof GatheringByteChannel)?(GatheringByteChannel)channel:null; } @Override @@ -64,27 +131,16 @@ public class ChannelEndPoint extends AbstractEndPoint return _channel.isOpen(); } - protected void shutdownInput() - { - if (LOG.isDebugEnabled()) - LOG.debug("ishut {}", this); - _ishut=true; - if (_oshut) - close(); - } - @Override - public void shutdownOutput() + public void doClose() { if (LOG.isDebugEnabled()) - LOG.debug("oshut {}", this); - _oshut = true; - if (_channel.isOpen()) + LOG.debug("doClose {}", this); + try { try { - if (!_socket.isOutputShutdown()) - _socket.shutdownOutput(); + _channel.close(); } catch (IOException e) { @@ -92,51 +148,20 @@ public class ChannelEndPoint extends AbstractEndPoint } finally { - if (_ishut) - { - close(); - } + super.doClose(); } } - } - - @Override - public boolean isOutputShutdown() - { - return _oshut || !_channel.isOpen() || _socket.isOutputShutdown(); - } - - @Override - public boolean isInputShutdown() - { - return _ishut || !_channel.isOpen() || _socket.isInputShutdown(); - } - - @Override - public void close() - { - super.close(); - if (LOG.isDebugEnabled()) - LOG.debug("close {}", this); - try - { - _channel.close(); - } - catch (IOException e) - { - LOG.debug(e); - } finally { - _ishut=true; - _oshut=true; + if (_selector!=null) + _selector.onClose(this); } } @Override public int fill(ByteBuffer buffer) throws IOException { - if (_ishut) + if (isInputShutdown()) return -1; int pos=BufferUtil.flipToFill(buffer); @@ -173,8 +198,8 @@ public class ChannelEndPoint extends AbstractEndPoint { if (buffers.length==1) flushed=_channel.write(buffers[0]); - else if (buffers.length>1) - flushed=_channel.write(buffers,0,buffers.length); + else if (_gather!=null && buffers.length>1) + flushed=_gather.write(buffers,0,buffers.length); else { for (ByteBuffer b : buffers) @@ -218,20 +243,160 @@ public class ChannelEndPoint extends AbstractEndPoint return _channel; } - public Socket getSocket() + + @Override + protected void needsFillInterest() { - return _socket; + changeInterests(SelectionKey.OP_READ); } @Override protected void onIncompleteFlush() { - throw new UnsupportedOperationException(); + changeInterests(SelectionKey.OP_WRITE); } @Override - protected void needsFillInterest() throws IOException + public Runnable onSelected() { - throw new UnsupportedOperationException(); + /** + * This method may run concurrently with {@link #changeInterests(int)}. + */ + + int readyOps = _key.readyOps(); + int oldInterestOps; + int newInterestOps; + try (Locker.Lock lock = _locker.lock()) + { + _updatePending = true; + // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps & ~readyOps; + _desiredInterestOps = newInterestOps; + } + + + boolean readable = (readyOps & SelectionKey.OP_READ) != 0; + boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0; + + + if (LOG.isDebugEnabled()) + LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this); + + // Run non-blocking code immediately. + // This producer knows that this non-blocking code is special + // and that it must be run in this thread and not fed to the + // ExecutionStrategy, which could not have any thread to run these + // tasks (or it may starve forever just after having run them). + if (readable && getFillInterest().isCallbackNonBlocking()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Direct readable run {}",this); + _runFillable.run(); + readable = false; + } + if (writable && getWriteFlusher().isCallbackNonBlocking()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Direct writable run {}",this); + _runCompleteWrite.run(); + writable = false; + } + + // return task to complete the job + Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable) + : (writable ? _runCompleteWrite : null); + + if (LOG.isDebugEnabled()) + LOG.debug("task {}",task); + return task; + } + + @Override + public void updateKey() + { + /** + * This method may run concurrently with {@link #changeInterests(int)}. + */ + + try + { + int oldInterestOps; + int newInterestOps; + try (Locker.Lock lock = _locker.lock()) + { + _updatePending = false; + oldInterestOps = _currentInterestOps; + newInterestOps = _desiredInterestOps; + if (oldInterestOps != newInterestOps) + { + _currentInterestOps = newInterestOps; + _key.interestOps(newInterestOps); + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this); + } + catch (CancelledKeyException x) + { + LOG.debug("Ignoring key update for concurrently closed channel {}", this); + close(); + } + catch (Throwable x) + { + LOG.warn("Ignoring key update for " + this, x); + close(); + } + } + + private void changeInterests(int operation) + { + /** + * This method may run concurrently with + * {@link #updateKey()} and {@link #onSelected()}. + */ + + int oldInterestOps; + int newInterestOps; + boolean pending; + try (Locker.Lock lock = _locker.lock()) + { + pending = _updatePending; + oldInterestOps = _desiredInterestOps; + newInterestOps = oldInterestOps | operation; + if (newInterestOps != oldInterestOps) + _desiredInterestOps = newInterestOps; + } + + if (LOG.isDebugEnabled()) + LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); + + if (!pending && _selector!=null) + _selector.submit(_runUpdateKey); + } + + + @Override + public String toString() + { + // We do a best effort to print the right toString() and that's it. + try + { + boolean valid = _key != null && _key.isValid(); + int keyInterests = valid ? _key.interestOps() : -1; + int keyReadiness = valid ? _key.readyOps() : -1; + return String.format("%s{io=%d/%d,kio=%d,kro=%d}", + super.toString(), + _currentInterestOps, + _desiredInterestOps, + keyInterests, + keyReadiness); + } + catch (Throwable x) + { + return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps); + } } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index cb1cda8083..73309e7a61 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -94,7 +94,7 @@ import org.eclipse.jetty.util.IteratingCallback; * </pre></blockquote> */ public interface EndPoint extends Closeable -{ +{ /* ------------------------------------------------------------ */ /** * @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code> diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index d2a33bff57..30d670c91a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -23,10 +23,9 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketTimeoutException; import java.nio.channels.CancelledKeyException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -77,12 +76,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump protected void doStart() throws Exception { super.doStart(); - _selector = newSelector(); - } - - protected Selector newSelector() throws IOException - { - return Selector.open(); + _selector = _selectorManager.newSelector(); } public int size() @@ -137,10 +131,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } /** - * A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be + * A {@link Selectable} is an {@link EndPoint} that wish to be * notified of non-blocking events by the {@link ManagedSelector}. */ - public interface SelectableEndPoint extends EndPoint + public interface Selectable { /** * Callback method invoked when a read or write events has been @@ -264,12 +258,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump if (key.isValid()) { Object attachment = key.attachment(); + if (LOG.isDebugEnabled()) + LOG.debug("selected {} {} ",key,attachment); try { - if (attachment instanceof SelectableEndPoint) + if (attachment instanceof Selectable) { // Try to produce a task - Runnable task = ((SelectableEndPoint)attachment).onSelected(); + Runnable task = ((Selectable)attachment).onSelected(); if (task != null) return task; } @@ -323,8 +319,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump private void updateKey(SelectionKey key) { Object attachment = key.attachment(); - if (attachment instanceof SelectableEndPoint) - ((SelectableEndPoint)attachment).updateKey(); + if (attachment instanceof Selectable) + ((Selectable)attachment).updateKey(); } } @@ -334,11 +330,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump private Runnable processConnect(SelectionKey key, final Connect connect) { - SocketChannel channel = (SocketChannel)key.channel(); + SelectableChannel channel = (SelectableChannel)key.channel(); try { key.attach(connect.attachment); - boolean connected = _selectorManager.finishConnect(channel); + boolean connected = _selectorManager.doFinishConnect(channel); if (LOG.isDebugEnabled()) LOG.debug("Connected {} {}", connected, channel); if (connected) @@ -375,14 +371,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump private void processAccept(SelectionKey key) { - ServerSocketChannel server = (ServerSocketChannel)key.channel(); - SocketChannel channel = null; + SelectableChannel server = key.channel(); + SelectableChannel channel = null; try { - while ((channel = server.accept()) != null) - { + channel = _selectorManager.doAccept(server); + if (channel!=null) _selectorManager.accepted(channel); - } } catch (Throwable x) { @@ -404,7 +399,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump } } - private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException + private EndPoint createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException { EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); _selectorManager.endPointOpened(endPoint); @@ -417,7 +412,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump return endPoint; } - public void destroyEndPoint(final EndPoint endPoint) + public void onClose(final EndPoint endPoint) { final Connection connection = endPoint.getConnection(); submit(new Product() @@ -517,9 +512,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump class Acceptor implements Runnable { - private final ServerSocketChannel _channel; + private final SelectableChannel _channel; - public Acceptor(ServerSocketChannel channel) + public Acceptor(SelectableChannel channel) { this._channel = channel; } @@ -543,10 +538,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump class Accept implements Runnable { - private final SocketChannel channel; + private final SelectableChannel channel; private final Object attachment; - Accept(SocketChannel channel, Object attachment) + Accept(SelectableChannel channel, Object attachment) { this.channel = channel; this.attachment = attachment; @@ -570,10 +565,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump private class CreateEndPoint implements Product { - private final SocketChannel channel; + private final SelectableChannel channel; private final SelectionKey key; - public CreateEndPoint(SocketChannel channel, SelectionKey key) + public CreateEndPoint(SelectableChannel channel, SelectionKey key) { this.channel = channel; this.key = key; @@ -603,11 +598,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump class Connect implements Runnable { private final AtomicBoolean failed = new AtomicBoolean(); - private final SocketChannel channel; + private final SelectableChannel channel; private final Object attachment; private final Scheduler.Task timeout; - Connect(SocketChannel channel, Object attachment) + Connect(SelectableChannel channel, Object attachment) { this.channel = channel; this.attachment = attachment; @@ -650,8 +645,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump @Override public void run() { - SocketChannel channel = connect.channel; - if (channel.isConnectionPending()) + SelectableChannel channel = connect.channel; + if (_selectorManager.isConnectionPending(channel)) { if (LOG.isDebugEnabled()) LOG.debug("Channel {} timed out while connecting, closing it", channel); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java index b45c39b80e..52473d7aef 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectChannelEndPoint.java @@ -18,285 +18,24 @@ package org.eclipse.jetty.io; -import java.nio.channels.CancelledKeyException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; /** * An ChannelEndpoint that can be scheduled by {@link SelectorManager}. */ -public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSelector.SelectableEndPoint +@Deprecated +public class SelectChannelEndPoint extends SocketChannelEndPoint implements ManagedSelector.Selectable { public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class); - private final Locker _locker = new Locker(); - private boolean _updatePending; - - /** - * true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called - */ - private final AtomicBoolean _open = new AtomicBoolean(); - private final ManagedSelector _selector; - private final SelectionKey _key; - /** - * The current value for {@link SelectionKey#interestOps()}. - */ - private int _currentInterestOps; - /** - * The desired value for {@link SelectionKey#interestOps()}. - */ - private int _desiredInterestOps; - - private final Runnable _runUpdateKey = new Runnable() - { - @Override - public void run() - { - updateKey(); - } - - @Override - public String toString() - { - return SelectChannelEndPoint.this.toString()+":runUpdateKey"; - } - }; - private final Runnable _runFillable = new Runnable() - { - @Override - public void run() - { - getFillInterest().fillable(); - } - - @Override - public String toString() - { - return SelectChannelEndPoint.this.toString()+":runFillable"; - } - }; - private final Runnable _runCompleteWrite = new Runnable() - { - @Override - public void run() - { - getWriteFlusher().completeWrite(); - } - - @Override - public String toString() - { - return SelectChannelEndPoint.this.toString()+":runCompleteWrite"; - } - }; - private final Runnable _runFillableCompleteWrite = new Runnable() - { - @Override - public void run() - { - getFillInterest().fillable(); - getWriteFlusher().completeWrite(); - } - - @Override - public String toString() - { - return SelectChannelEndPoint.this.toString()+":runFillableCompleteWrite"; - } - }; - public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) { - super(scheduler, channel); - _selector = selector; - _key = key; + super(channel,selector,key,scheduler); setIdleTimeout(idleTimeout); } - - @Override - protected void needsFillInterest() - { - changeInterests(SelectionKey.OP_READ); - } - - @Override - protected void onIncompleteFlush() - { - changeInterests(SelectionKey.OP_WRITE); - } - - @Override - public Runnable onSelected() - { - /** - * This method may run concurrently with {@link #changeInterests(int)}. - */ - - int readyOps = _key.readyOps(); - int oldInterestOps; - int newInterestOps; - try (Locker.Lock lock = _locker.lock()) - { - _updatePending = true; - // Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both). - oldInterestOps = _desiredInterestOps; - newInterestOps = oldInterestOps & ~readyOps; - _desiredInterestOps = newInterestOps; - } - - - boolean readable = (readyOps & SelectionKey.OP_READ) != 0; - boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0; - - - if (LOG.isDebugEnabled()) - LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this); - - // Run non-blocking code immediately. - // This producer knows that this non-blocking code is special - // and that it must be run in this thread and not fed to the - // ExecutionStrategy, which could not have any thread to run these - // tasks (or it may starve forever just after having run them). - if (readable && getFillInterest().isCallbackNonBlocking()) - { - if (LOG.isDebugEnabled()) - LOG.debug("Direct readable run {}",this); - _runFillable.run(); - readable = false; - } - if (writable && getWriteFlusher().isCallbackNonBlocking()) - { - if (LOG.isDebugEnabled()) - LOG.debug("Direct writable run {}",this); - _runCompleteWrite.run(); - writable = false; - } - - // return task to complete the job - Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable) - : (writable ? _runCompleteWrite : null); - - if (LOG.isDebugEnabled()) - LOG.debug("task {}",task); - return task; - } - - @Override - public void updateKey() - { - /** - * This method may run concurrently with {@link #changeInterests(int)}. - */ - - try - { - int oldInterestOps; - int newInterestOps; - try (Locker.Lock lock = _locker.lock()) - { - _updatePending = false; - oldInterestOps = _currentInterestOps; - newInterestOps = _desiredInterestOps; - if (oldInterestOps != newInterestOps) - { - _currentInterestOps = newInterestOps; - _key.interestOps(newInterestOps); - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this); - } - catch (CancelledKeyException x) - { - LOG.debug("Ignoring key update for concurrently closed channel {}", this); - close(); - } - catch (Throwable x) - { - LOG.warn("Ignoring key update for " + this, x); - close(); - } - } - - private void changeInterests(int operation) - { - /** - * This method may run concurrently with - * {@link #updateKey()} and {@link #onSelected()}. - */ - - int oldInterestOps; - int newInterestOps; - boolean pending; - try (Locker.Lock lock = _locker.lock()) - { - pending = _updatePending; - oldInterestOps = _desiredInterestOps; - newInterestOps = oldInterestOps | operation; - if (newInterestOps != oldInterestOps) - _desiredInterestOps = newInterestOps; - } - - if (LOG.isDebugEnabled()) - LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); - - if (!pending) - _selector.submit(_runUpdateKey); - } - - - @Override - public void close() - { - if (_open.compareAndSet(true, false)) - { - super.close(); - _selector.destroyEndPoint(this); - } - } - - @Override - public boolean isOpen() - { - // We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen(): - // a thread may call close(), which flips the boolean but has not yet called super.close(), and - // another thread calls isOpen() which would return true - wrong - if based on super.isOpen(). - return _open.get(); - } - - @Override - public void onOpen() - { - if (_open.compareAndSet(false, true)) - super.onOpen(); - } - - @Override - public String toString() - { - // We do a best effort to print the right toString() and that's it. - try - { - boolean valid = _key != null && _key.isValid(); - int keyInterests = valid ? _key.interestOps() : -1; - int keyReadiness = valid ? _key.readyOps() : -1; - return String.format("%s{io=%d/%d,kio=%d,kro=%d}", - super.toString(), - _currentInterestOps, - _desiredInterestOps, - keyInterests, - keyReadiness); - } - catch (Throwable x) - { - return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps); - } - } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 9dfe8db4d5..452643d4c7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; @@ -133,7 +135,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa return _selectors.length; } - private ManagedSelector chooseSelector(SocketChannel channel) + private ManagedSelector chooseSelector(SelectableChannel channel) { // Ideally we would like to have all connections from the same client end // up on the same selector (to try to avoid smearing the data from a single @@ -145,14 +147,17 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa { try { - SocketAddress remote = channel.getRemoteAddress(); - if (remote instanceof InetSocketAddress) + if (channel instanceof SocketChannel) { - byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress(); - if (addr != null) + SocketAddress remote = ((SocketChannel)channel).getRemoteAddress(); + if (remote instanceof InetSocketAddress) { - int s = addr[addr.length - 1] & 0xFF; - candidate1 = _selectors[s % getSelectorCount()]; + byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress(); + if (addr != null) + { + int s = addr[addr.length - 1] & 0xFF; + candidate1 = _selectors[s % getSelectorCount()]; + } } } } @@ -184,7 +189,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param attachment the attachment object * @see #accept(SocketChannel, Object) */ - public void connect(SocketChannel channel, Object attachment) + public void connect(SelectableChannel channel, Object attachment) { ManagedSelector set = chooseSelector(channel); set.submit(set.new Connect(channel, attachment)); @@ -194,7 +199,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param channel the channel to accept * @see #accept(SocketChannel, Object) */ - public void accept(SocketChannel channel) + public void accept(SelectableChannel channel) { accept(channel, null); } @@ -209,7 +214,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param channel the channel to register * @param attachment the attachment object */ - public void accept(SocketChannel channel, Object attachment) + public void accept(SelectableChannel channel, Object attachment) { final ManagedSelector selector = chooseSelector(channel); selector.submit(selector.new Accept(channel, attachment)); @@ -223,7 +228,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * * @param server the server channel to register */ - public void acceptor(ServerSocketChannel server) + public void acceptor(SelectableChannel server) { final ManagedSelector selector = chooseSelector(null); selector.submit(selector.new Acceptor(server)); @@ -238,7 +243,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param channel the * @throws IOException if unable to accept channel */ - protected void accepted(SocketChannel channel) throws IOException + protected void accepted(SelectableChannel channel) throws IOException { throw new UnsupportedOperationException(); } @@ -332,11 +337,22 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa } } - protected boolean finishConnect(SocketChannel channel) throws IOException + protected boolean doFinishConnect(SelectableChannel channel) throws IOException { - return channel.finishConnect(); + return ((SocketChannel)channel).finishConnect(); + } + + protected boolean isConnectionPending(SelectableChannel channel) + { + return ((SocketChannel)channel).isConnectionPending(); + } + + protected SelectableChannel doAccept(SelectableChannel server) throws IOException + { + return ((ServerSocketChannel)server).accept(); } + /** * <p>Callback method invoked when a non-blocking connect cannot be completed.</p> * <p>By default it just logs with level warning.</p> @@ -345,11 +361,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @param ex the exception that caused the connect to fail * @param attachment the attachment object associated at registration */ - protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) + protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) { LOG.warn(String.format("%s - %s", channel, attachment), ex); } + protected Selector newSelector() throws IOException + { + return Selector.open(); + } + /** * <p>Factory method to create {@link EndPoint}.</p> * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)} @@ -362,7 +383,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @throws IOException if the endPoint cannot be created * @see #newConnection(SocketChannel, EndPoint, Object) */ - protected abstract EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException; + protected abstract EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException; /** * <p>Factory method to create {@link Connection}.</p> @@ -374,7 +395,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa * @throws IOException if unable to create new connection * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey) */ - public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException; + public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException; @Override public String dump() @@ -388,4 +409,5 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa ContainerLifeCycle.dumpObject(out, this); ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors)); } + } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java new file mode 100644 index 0000000000..0824d54899 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java @@ -0,0 +1,81 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; + +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +public class SocketChannelEndPoint extends ChannelEndPoint +{ + private static final Logger LOG = Log.getLogger(SocketChannelEndPoint.class); + private final Socket _socket; + private final InetSocketAddress _local; + private final InetSocketAddress _remote; + + public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + { + this((SocketChannel)channel,selector,key,scheduler); + } + + public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + { + super(channel,selector,key,scheduler); + + _socket=channel.socket(); + _local=(InetSocketAddress)_socket.getLocalSocketAddress(); + _remote=(InetSocketAddress)_socket.getRemoteSocketAddress(); + } + + public Socket getSocket() + { + return _socket; + } + + public InetSocketAddress getLocalAddress() + { + return _local; + } + + public InetSocketAddress getRemoteAddress() + { + return _remote; + } + + @Override + protected void doShutdownOutput() + { + try + { + if (!_socket.isOutputShutdown()) + _socket.shutdownOutput(); + } + catch (IOException e) + { + LOG.debug(e); + } + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index ca32841446..014b2c3c33 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.io.ssl; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.concurrent.Executor; @@ -328,10 +329,28 @@ public class SslConnection extends AbstractConnection public DecryptedEndPoint() { - super(null,getEndPoint().getLocalAddress(), getEndPoint().getRemoteAddress()); + super(((AbstractEndPoint)getEndPoint()).getScheduler()); setIdleTimeout(getEndPoint().getIdleTimeout()); } + + + @Override + public InetSocketAddress getLocalAddress() + { + return getEndPoint().getLocalAddress(); + } + + + + @Override + public InetSocketAddress getRemoteAddress() + { + return getEndPoint().getRemoteAddress(); + } + + + @Override public void setIdleTimeout(long idleTimeout) { @@ -868,12 +887,11 @@ public class SslConnection extends AbstractConnection } @Override - public void shutdownOutput() + public void doShutdownOutput() { boolean ishut = isInputShutdown(); - boolean oshut = isOutputShutdown(); if (DEBUG) - LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut); + LOG.debug("{} shutdownOutput: ishut={}", SslConnection.this, ishut); if (ishut) { // Aggressively close, since inbound close alert has already been processed @@ -882,7 +900,7 @@ public class SslConnection extends AbstractConnection // reply. If a TLS close reply is sent, most implementations send a RST. getEndPoint().close(); } - else if (!oshut) + else { try { @@ -914,12 +932,27 @@ public class SslConnection extends AbstractConnection } @Override - public void close() + public void doClose() { // First send the TLS Close Alert, then the FIN - shutdownOutput(); + if (!_sslEngine.isOutboundDone()) + { + try + { + synchronized (this) // TODO review synchronized boundary + { + _sslEngine.closeOutbound(); + flush(BufferUtil.EMPTY_BUFFER); // Send close handshake + ensureFillInterested(); + } + } + catch (Exception e) + { + LOG.ignore(e); + } + } getEndPoint().close(); - super.close(); + super.doClose(); } @Override diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java index cc38b8ee94..578a6867fe 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/IOTest.java @@ -18,6 +18,11 @@ package org.eclipse.jetty.io; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -45,11 +50,6 @@ import org.eclipse.jetty.util.IO; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class IOTest { @Test diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java index ddec08085f..b9f880f52f 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointInterestsTest.java @@ -24,6 +24,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -62,10 +63,11 @@ public class SelectChannelEndPointInterestsTest selectorManager = new SelectorManager(threadPool, scheduler) { + @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), 60000) + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()) { @Override protected void onIncompleteFlush() @@ -74,10 +76,13 @@ public class SelectChannelEndPointInterestsTest interested.onIncompleteFlush(); } }; + + endp.setIdleTimeout(60000); + return endp; } @Override - public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment) + public Connection newConnection(SelectableChannel channel, final EndPoint endPoint, Object attachment) { return new AbstractConnection(endPoint, getExecutor()) { @@ -136,7 +141,7 @@ public class SelectChannelEndPointInterestsTest connection.fillInterested(); ByteBuffer output = ByteBuffer.allocate(size.get()); - endPoint.write(new Callback.Adapter(), output); + endPoint.write(new Callback(){}, output); latch1.countDown(); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java index 2367ed6b85..ec3d158835 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointSslTest.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; @@ -71,7 +72,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest } @Override - protected Connection newConnection(SocketChannel channel, EndPoint endpoint) + protected Connection newConnection(SelectableChannel channel, EndPoint endpoint) { SSLEngine engine = __sslCtxFactory.newSSLEngine(); engine.setUseClientMode(false); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java index 77e004ca3c..77691a235f 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectChannelEndPointTest.java @@ -32,6 +32,7 @@ import java.io.OutputStream; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -64,19 +65,21 @@ public class SelectChannelEndPointTest protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler) { @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) { return SelectChannelEndPointTest.this.newConnection(channel, endpoint); } @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException { - SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000); + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); + endp.setIdleTimeout(60000); _lastEndPoint = endp; _lastEndPointLatch.countDown(); return endp; } + }; // Must be volatile or the test may fail spuriously @@ -110,7 +113,7 @@ public class SelectChannelEndPointTest return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort()); } - protected Connection newConnection(SocketChannel channel, EndPoint endpoint) + protected Connection newConnection(SelectableChannel channel, EndPoint endpoint) { return new TestConnection(endpoint); } @@ -228,11 +231,11 @@ public class SelectChannelEndPointTest } catch (InterruptedException | EofException e) { - SelectChannelEndPoint.LOG.ignore(e); + Log.getRootLogger().ignore(e); } catch (Exception e) { - SelectChannelEndPoint.LOG.warn(e); + Log.getRootLogger().warn(e); } finally { diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java index 66d39d3a16..a5f09432a8 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SelectorManagerTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.io; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -69,20 +70,22 @@ public class SelectorManagerTest SelectorManager selectorManager = new SelectorManager(executor, scheduler) { @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), connectTimeout / 2); + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); + endp.setIdleTimeout(connectTimeout/2); + return endp; } - + @Override - protected boolean finishConnect(SocketChannel channel) throws IOException + protected boolean doFinishConnect(SelectableChannel channel) throws IOException { try { long timeout = timeoutConnection.get(); if (timeout > 0) TimeUnit.MILLISECONDS.sleep(timeout); - return super.finishConnect(channel); + return super.doFinishConnect(channel); } catch (InterruptedException e) { @@ -91,7 +94,7 @@ public class SelectorManagerTest } @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException { ((Callback)attachment).succeeded(); return new AbstractConnection(endpoint, executor) @@ -104,7 +107,7 @@ public class SelectorManagerTest } @Override - protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) + protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) { ((Callback)attachment).failed(ex); } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java index 0a437ec907..69035af133 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ChannelEndPointTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SocketChannelEndPointTest.java @@ -24,7 +24,7 @@ import java.nio.channels.SocketChannel; import org.junit.AfterClass; import org.junit.BeforeClass; -public class ChannelEndPointTest extends EndPointTest<ChannelEndPoint> +public class SocketChannelEndPointTest extends EndPointTest<SocketChannelEndPoint> { static ServerSocketChannel connector; @@ -43,16 +43,22 @@ public class ChannelEndPointTest extends EndPointTest<ChannelEndPoint> } @Override - protected EndPointPair<ChannelEndPoint> newConnection() throws Exception + protected EndPointPair<SocketChannelEndPoint> newConnection() throws Exception { - EndPointPair<ChannelEndPoint> c = new EndPointPair<>(); + EndPointPair<SocketChannelEndPoint> c = new EndPointPair<>(); - c.client=new ChannelEndPoint(null,SocketChannel.open(connector.socket().getLocalSocketAddress())); - c.server=new ChannelEndPoint(null,connector.accept()); + c.client=new SocketChannelEndPoint(SocketChannel.open(connector.socket().getLocalSocketAddress()),null,null,null); + c.server=new SocketChannelEndPoint(connector.accept(),null,null,null); return c; } @Override + public void testClientClose() throws Exception + { + super.testClientClose(); + } + + @Override public void testClientServerExchange() throws Exception { super.testClientServerExchange(); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java index 0ec33fb660..d12868e21b 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/SslConnectionTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -39,6 +40,7 @@ import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.Scheduler; @@ -74,7 +76,7 @@ public class SslConnectionTest protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler) { @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) { SSLEngine engine = __sslCtxFactory.newSSLEngine(); engine.setUseClientMode(false); @@ -85,10 +87,12 @@ public class SslConnectionTest return sslConnection; } + @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { - SelectChannelEndPoint endp = new TestEP(channel,selectSet, selectionKey, getScheduler(), 60000); + SocketChannelEndPoint endp = new TestEP(channel, selector, selectionKey, getScheduler()); + endp.setIdleTimeout(60000); _lastEndp=endp; return endp; } @@ -96,12 +100,11 @@ public class SslConnectionTest static final AtomicInteger __startBlocking = new AtomicInteger(); static final AtomicInteger __blockFor = new AtomicInteger(); - private static class TestEP extends SelectChannelEndPoint + private static class TestEP extends SocketChannelEndPoint { - - public TestEP(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) + public TestEP(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { - super(channel,selector,key,scheduler,idleTimeout); + super((SocketChannel)channel,selector,key,scheduler); } @Override @@ -121,7 +124,6 @@ public class SslConnectionTest return false; } } - String s=BufferUtil.toDetailString(buffers[0]); boolean flushed=super.flush(buffers); return flushed; } @@ -235,11 +237,11 @@ public class SslConnectionTest } catch(InterruptedException|EofException e) { - SelectChannelEndPoint.LOG.ignore(e); + Log.getRootLogger().ignore(e); } catch(Exception e) { - SelectChannelEndPoint.LOG.warn(e); + Log.getRootLogger().warn(e); } finally { diff --git a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java index 7fd0b777fc..7b196626f3 100644 --- a/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java +++ b/jetty-proxy/src/main/java/org/eclipse/jetty/proxy/ConnectHandler.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.HashSet; @@ -45,6 +46,7 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpTransport; @@ -502,16 +504,18 @@ public class ConnectHandler extends HandlerWrapper } @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException { - return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout()); + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler()); + endp.setIdleTimeout(getIdleTimeout()); + return endp; } @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException { if (ConnectHandler.LOG.isDebugEnabled()) - ConnectHandler.LOG.debug("Connected to {}", channel.getRemoteAddress()); + ConnectHandler.LOG.debug("Connected to {}", ((SocketChannel)channel).getRemoteAddress()); ConnectContext connectContext = (ConnectContext)attachment; UpstreamConnection connection = newUpstreamConnection(endpoint, connectContext); connection.setInputBufferSize(getBufferSize()); @@ -519,7 +523,7 @@ public class ConnectHandler extends HandlerWrapper } @Override - protected void connectionFailed(SocketChannel channel, final Throwable ex, final Object attachment) + protected void connectionFailed(SelectableChannel channel, final Throwable ex, final Object attachment) { close(channel); ConnectContext connectContext = (ConnectContext)attachment; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index bfd8206df9..c46ed08be5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -253,9 +253,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co @Override protected void doStart() throws Exception { + if(_defaultProtocol==null) + throw new IllegalStateException("No default protocol for "+this); _defaultConnectionFactory = getConnectionFactory(_defaultProtocol); if(_defaultConnectionFactory==null) - throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol); + throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this); super.doStart(); @@ -298,7 +300,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co // If we have a stop timeout long stopTimeout = getStopTimeout(); CountDownLatch stopping=_stopping; - if (stopTimeout > 0 && stopping!=null) + if (stopTimeout > 0 && stopping!=null && getAcceptors()>0) stopping.await(stopTimeout,TimeUnit.MILLISECONDS); _stopping=null; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/DebugListener.java b/jetty-server/src/main/java/org/eclipse/jetty/server/DebugListener.java index e1f6a3e36b..955a655775 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/DebugListener.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/DebugListener.java @@ -33,7 +33,6 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletRequestEvent; import javax.servlet.ServletRequestListener; import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandler.Context; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java index 813ffd06d6..16aba09eec 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ForwardedRequestCustomizer.java @@ -215,6 +215,7 @@ public class ForwardedRequestCustomizer implements Customizer { request.setAttribute("javax.servlet.request.ssl_session_id", ssl_session_id); request.setScheme(HttpScheme.HTTPS.asString()); + request.setSecure(true); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 95b4f59a9e..a6f96c3466 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.server; import static javax.servlet.RequestDispatcher.ERROR_EXCEPTION; -import static javax.servlet.RequestDispatcher.ERROR_MESSAGE; import static javax.servlet.RequestDispatcher.ERROR_STATUS_CODE; import java.io.IOException; @@ -32,8 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.DispatcherType; -import javax.servlet.RequestDispatcher; -import javax.servlet.UnavailableException; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpFields; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 7865c6b7f2..b1c1618e63 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -197,27 +197,16 @@ public class LocalConnector extends AbstractConnector } @Override - public void close() - { - boolean wasOpen=isOpen(); - super.close(); - if (wasOpen) - { - getConnection().onClose(); - onClose(); - } - } - - @Override public void onClose() { + getConnection().onClose(); LocalConnector.this.onEndPointClosed(this); super.onClose(); _closed.countDown(); } @Override - public void shutdownOutput() + public void doShutdownOutput() { super.shutdownOutput(); close(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java index cfd96c1808..e4ed0a2c59 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/NetworkTrafficServerConnector.java @@ -26,10 +26,10 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.NetworkTrafficListener; import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint; -import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Scheduler; @@ -84,7 +84,7 @@ public class NetworkTrafficServerConnector extends ServerConnector } @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners); return endPoint; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 86294c8a9b..7d78750c2a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -938,22 +938,25 @@ public class Request implements HttpServletRequest @Override public String getLocalName() { - if (_channel==null) + if (_channel!=null) { - try - { - String name =InetAddress.getLocalHost().getHostName(); - if (StringUtil.ALL_INTERFACES.equals(name)) - return null; - return name; - } - catch (java.net.UnknownHostException e) - { - LOG.ignore(e); - } + InetSocketAddress local=_channel.getLocalAddress(); + if (local!=null) + return local.getHostString(); } - InetSocketAddress local=_channel.getLocalAddress(); - return local.getHostString(); + + try + { + String name =InetAddress.getLocalHost().getHostName(); + if (StringUtil.ALL_INTERFACES.equals(name)) + return null; + return name; + } + catch (java.net.UnknownHostException e) + { + LOG.ignore(e); + } + return null; } /* ------------------------------------------------------------ */ @@ -966,7 +969,7 @@ public class Request implements HttpServletRequest if (_channel==null) return 0; InetSocketAddress local=_channel.getLocalAddress(); - return local.getPort(); + return local==null?0:local.getPort(); } /* ------------------------------------------------------------ */ diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/RequestLogCollection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/RequestLogCollection.java index 2359c32616..b6cfa1cea3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/RequestLogCollection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/RequestLogCollection.java @@ -18,10 +18,10 @@ package org.eclipse.jetty.server; -import java.util.ArrayList; - import static java.util.Arrays.asList; +import java.util.ArrayList; + class RequestLogCollection implements RequestLog { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SecureRequestCustomizer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SecureRequestCustomizer.java index 1ede4423c6..aefb547f2a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SecureRequestCustomizer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SecureRequestCustomizer.java @@ -27,6 +27,7 @@ import javax.servlet.ServletRequest; import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint; import org.eclipse.jetty.util.TypeUtil; @@ -66,14 +67,10 @@ public class SecureRequestCustomizer implements HttpConfiguration.Customizer @Override public void customize(Connector connector, HttpConfiguration channelConfig, Request request) { - if (request.getHttpChannel().getEndPoint() instanceof DecryptedEndPoint) + EndPoint endp = request.getHttpChannel().getEndPoint(); + if (endp instanceof DecryptedEndPoint) { - request.setSecure(true); - - if (request.getHttpURI().getScheme()==null) - request.getHttpURI().setScheme(HttpScheme.HTTPS.asString()); - - SslConnection.DecryptedEndPoint ssl_endp = (DecryptedEndPoint)request.getHttpChannel().getEndPoint(); + SslConnection.DecryptedEndPoint ssl_endp = (DecryptedEndPoint)endp; SslConnection sslConnection = ssl_endp.getSslConnection(); SSLEngine sslEngine=sslConnection.getSSLEngine(); customize(sslEngine,request); @@ -81,6 +78,12 @@ public class SecureRequestCustomizer implements HttpConfiguration.Customizer if (request.getHttpURI().getScheme()==null) request.setScheme(HttpScheme.HTTPS.asString()); } + else if (endp instanceof ProxyConnectionFactory.ProxyEndPoint) + { + ProxyConnectionFactory.ProxyEndPoint proxy = (ProxyConnectionFactory.ProxyEndPoint)endp; + if (request.getHttpURI().getScheme()==null && proxy.getAttribute(ProxyConnectionFactory.TLS_VERSION)!=null) + request.setScheme(HttpScheme.HTTPS.asString()); + } if (HttpScheme.HTTPS.is(request.getScheme())) request.setSecure(true); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java index 1665a19b16..4cd668dd95 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ServerConnector.java @@ -24,6 +24,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.nio.channels.Channel; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -32,6 +33,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; @@ -229,7 +231,6 @@ public class ServerConnector extends AbstractNetworkConnector _manager = newSelectorManager(getExecutor(), getScheduler(), selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2))); addBean(_manager, true); - setSelectorPriorityDelta(-1); setAcceptorPriorityDelta(-2); } @@ -426,7 +427,7 @@ public class ServerConnector extends AbstractNetworkConnector return _localPort; } - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout()); } @@ -493,19 +494,19 @@ public class ServerConnector extends AbstractNetworkConnector } @Override - protected void accepted(SocketChannel channel) throws IOException + protected void accepted(SelectableChannel channel) throws IOException { - ServerConnector.this.accepted(channel); + ServerConnector.this.accepted((SocketChannel)channel); } @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + protected ChannelEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { - return ServerConnector.this.newEndPoint(channel, selectSet, selectionKey); + return ServerConnector.this.newEndPoint((SocketChannel)channel, selectSet, selectionKey); } @Override - public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException { return getDefaultConnectionFactory().newConnection(ServerConnector.this, endpoint); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java index cdc402120c..9ac93c7a52 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SocketCustomizationListener.java @@ -20,12 +20,12 @@ package org.eclipse.jetty.server; import java.net.Socket; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection.Listener; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint; -import org.eclipse.jetty.io.EndPoint; /* ------------------------------------------------------------ */ @@ -70,9 +70,9 @@ public class SocketCustomizationListener implements Listener ssl=true; } - if (endp instanceof ChannelEndPoint) + if (endp instanceof SocketChannelEndPoint) { - Socket socket = ((ChannelEndPoint)endp).getSocket(); + Socket socket = ((SocketChannelEndPoint)endp).getSocket(); customize(socket,connection.getClass(),ssl); } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java index 97a9b388ca..f8fd5a33e2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/DebugHandler.java @@ -27,7 +27,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.server.AbstractConnector; import org.eclipse.jetty.server.Connector; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerCollection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerCollection.java index f799b3a4f8..2a1b771d6c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerCollection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerCollection.java @@ -28,7 +28,6 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.ArrayUtil; import org.eclipse.jetty.util.MultiException; import org.eclipse.jetty.util.annotation.ManagedAttribute; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerWrapper.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerWrapper.java index c8ef17b61c..bc60dc7c05 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerWrapper.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/HandlerWrapper.java @@ -25,10 +25,8 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.LifeCycle; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractHttpTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractHttpTest.java index 6d28ff3079..eb464180d6 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractHttpTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AbstractHttpTest.java @@ -18,6 +18,9 @@ package org.eclipse.jetty.server; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -26,6 +29,7 @@ import java.io.PrintWriter; import java.net.Socket; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; + import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -41,9 +45,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Rule; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - public abstract class AbstractHttpTest { @Rule diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java index d188105fad..087007499f 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java @@ -18,6 +18,11 @@ package org.eclipse.jetty.server; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -42,11 +47,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - public class AsyncRequestReadTest { private static Server server; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java index caad44978f..2583a0310e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.server; -import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java index 7a16f503ee..ff49a768c8 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ExtendedServerTest.java @@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; @@ -60,7 +61,7 @@ public class ExtendedServerTest extends HttpServerTestBase { @Override - protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException { return new ExtendedEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout()); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java index 8ee21c5fd6..3f66131ab6 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTest.java @@ -18,6 +18,15 @@ package org.eclipse.jetty.server; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; + import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; @@ -33,8 +42,8 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.io.ChannelEndPoint; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerList; @@ -42,15 +51,6 @@ import org.eclipse.jetty.toolchain.test.OS; import org.eclipse.jetty.util.IO; import org.junit.Test; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; - public class ServerConnectorTest { public static class ReuseInfoHandler extends AbstractHandler @@ -61,8 +61,8 @@ public class ServerConnectorTest response.setContentType("text/plain"); EndPoint endPoint = baseRequest.getHttpChannel().getEndPoint(); - assertThat("Endpoint",endPoint,instanceOf(ChannelEndPoint.class)); - ChannelEndPoint channelEndPoint = (ChannelEndPoint)endPoint; + assertThat("Endpoint",endPoint,instanceOf(SocketChannelEndPoint.class)); + SocketChannelEndPoint channelEndPoint = (SocketChannelEndPoint)endPoint; Socket socket = channelEndPoint.getSocket(); ServerConnector connector = (ServerConnector)baseRequest.getHttpChannel().getConnector(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java index fa46de262f..7179c373af 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ThreadStarvationTest.java @@ -26,26 +26,16 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; -import java.net.SocketException; -import java.util.concurrent.Exchanger; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLException; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.util.IO; -import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.hamcrest.Matchers; -import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/DebugHandlerTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/DebugHandlerTest.java index 56f7252fc9..f3ae33d9d0 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/handler/DebugHandlerTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/handler/DebugHandlerTest.java @@ -18,8 +18,9 @@ package org.eclipse.jetty.server.handler; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; import java.io.ByteArrayOutputStream; import java.io.File; diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SelectChannelServerSslTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SelectChannelServerSslTest.java index a242a51151..f33d45d1f1 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SelectChannelServerSslTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ssl/SelectChannelServerSslTest.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.server.ssl; import static org.junit.Assert.assertEquals; -import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java index 9243ca7ccb..88647ab355 100644 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/ThreadStarvationTest.java @@ -49,6 +49,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.HttpChannel;
@@ -110,7 +111,7 @@ public class ThreadStarvationTest ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
- protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
+ protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@@ -264,7 +265,7 @@ public class ThreadStarvationTest ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
{
@Override
- protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
+ protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
diff --git a/jetty-unixsocket/.gitignore b/jetty-unixsocket/.gitignore new file mode 100644 index 0000000000..b83d22266a --- /dev/null +++ b/jetty-unixsocket/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/jetty-unixsocket/pom.xml b/jetty-unixsocket/pom.xml new file mode 100644 index 0000000000..91c7af717e --- /dev/null +++ b/jetty-unixsocket/pom.xml @@ -0,0 +1,43 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-project</artifactId> + <version>9.4.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>jetty-unixsocket</artifactId> + <name>Jetty :: UnixSocket</name> + <description>Jetty UnixSocket</description> + <url>http://www.eclipse.org/jetty</url> + <properties> + <bundle-symbolic-name>${project.groupId}.unixsocket</bundle-symbolic-name> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <configuration> + <onlyAnalyze>org.eclipse.jetty.unixsocket.*</onlyAnalyze> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.github.jnr</groupId> + <artifactId>jnr-unixsocket</artifactId> + <version>0.8</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.toolchain</groupId> + <artifactId>jetty-test-helper</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-forwarded.xml b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-forwarded.xml new file mode 100644 index 0000000000..d30ea10a51 --- /dev/null +++ b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-forwarded.xml @@ -0,0 +1,17 @@ +<?xml version="1.0"?> +<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd"> +<Configure id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration"> + <Call name="addCustomizer"> + <Arg> + <New class="org.eclipse.jetty.server.ForwardedRequestCustomizer"> + <Set name="forwardedHostHeader"><Property name="jetty.unixSocketHttpConfig.forwardedHostHeader" default="X-Forwarded-Host"/></Set> + <Set name="forwardedServerHeader"><Property name="jetty.unixSocketHttpConfig.forwardedServerHeader" default="X-Forwarded-Server"/></Set> + <Set name="forwardedProtoHeader"><Property name="jetty.unixSocketHttpConfig.forwardedProtoHeader" default="X-Forwarded-Proto"/></Set> + <Set name="forwardedForHeader"><Property name="jetty.unixSocketHttpConfig.forwardedForHeader" default="X-Forwarded-For"/></Set> + <Set name="forwardedSslSessionIdHeader"><Property name="jetty.unixSocketHttpConfig.forwardedSslSessionIdHeader" /></Set> + <Set name="forwardedCipherSuiteHeader"><Property name="jetty.unixSocketHttpConfig.forwardedCipherSuiteHeader" /></Set> + </New> + </Arg> + </Call> +</Configure> + diff --git a/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-http.xml b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-http.xml new file mode 100644 index 0000000000..0520c345b3 --- /dev/null +++ b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-http.xml @@ -0,0 +1,13 @@ +<?xml version="1.0"?> +<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd"> + +<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector"> + <Call name="addConnectionFactory"> + <Arg> + <New class="org.eclipse.jetty.server.HttpConnectionFactory"> + <Arg name="config"><Ref refid="unixSocketHttpConfig" /></Arg> + </New> + </Arg> + </Call> +</Configure> + diff --git a/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-http2c.xml b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-http2c.xml new file mode 100644 index 0000000000..1213f1b2fd --- /dev/null +++ b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-http2c.xml @@ -0,0 +1,18 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd"> + +<!-- ============================================================= --> +<!-- Configure a HTTP2 on the ssl connector. --> +<!-- ============================================================= --> +<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector"> + <Call name="addConnectionFactory"> + <Arg> + <New class="org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory"> + <Arg name="config"><Ref refid="unixSocketHttpConfig"/></Arg> + <Set name="maxConcurrentStreams"><Property name="jetty.http2c.maxConcurrentStreams" default="1024"/></Set> + <Set name="initialStreamSendWindow"><Property name="jetty.http2c.initialStreamSendWindow" default="65535"/></Set> + </New> + </Arg> + </Call> +</Configure> + diff --git a/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-proxy-protocol.xml b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-proxy-protocol.xml new file mode 100644 index 0000000000..066a508645 --- /dev/null +++ b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-proxy-protocol.xml @@ -0,0 +1,10 @@ +<?xml version="1.0"?> +<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd"> + +<Configure id="unixSocketConnector" class="org.eclipse.jetty.server.ServerConnector"> + <Call name="addFirstConnectionFactory"> + <Arg> + <New class="org.eclipse.jetty.server.ProxyConnectionFactory"/> + </Arg> + </Call> +</Configure> diff --git a/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-secure.xml b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-secure.xml new file mode 100644 index 0000000000..2a053233cc --- /dev/null +++ b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket-secure.xml @@ -0,0 +1,11 @@ +<?xml version="1.0"?> +<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd"> +<Configure id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration"> + <Call name="addCustomizer"> + <Arg> + <New class="org.eclipse.jetty.server.SecureRequestCustomizer"> + </New> + </Arg> + </Call> +</Configure> + diff --git a/jetty-unixsocket/src/main/config/etc/jetty-unixsocket.xml b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket.xml new file mode 100644 index 0000000000..ecf1f43bb6 --- /dev/null +++ b/jetty-unixsocket/src/main/config/etc/jetty-unixsocket.xml @@ -0,0 +1,25 @@ +<?xml version="1.0"?> +<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd"> + +<Configure id="Server" class="org.eclipse.jetty.server.Server"> + <New id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration"> + <Arg><Ref refid="httpConfig"/></Arg> + </New> + + <Call name="addConnector"> + <Arg> + <New id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector"> + <Arg name="server"><Ref refid="Server" /></Arg> + <Arg name="selectors" type="int"><Property name="jetty.unixsocket.selectors" default="-1"/></Arg> + <Arg name="factories"> + <Array type="org.eclipse.jetty.server.ConnectionFactory"> + </Array> + </Arg> + <Set name="unixSocket"><Property name="jetty.unixsocket" default="/tmp/jetty.sock" /></Set> + <Set name="idleTimeout"><Property name="jetty.unixsocket.idleTimeout" default="30000"/></Set> + <Set name="acceptQueueSize"><Property name="jetty.unixsocket.acceptQueueSize" default="0"/></Set> + </New> + </Arg> + </Call> +</Configure> + diff --git a/jetty-unixsocket/src/main/config/modules/unixsocket-forwarded.mod b/jetty-unixsocket/src/main/config/modules/unixsocket-forwarded.mod new file mode 100644 index 0000000000..57bd612bad --- /dev/null +++ b/jetty-unixsocket/src/main/config/modules/unixsocket-forwarded.mod @@ -0,0 +1,18 @@ +[depend] +unixsocket-http + +[xml] +etc/jetty-unixsocket-forwarded.xml + +[ini-template] +### ForwardedRequestCustomizer Configuration +# jetty.unixSocketHttpConfig.forwardedHostHeader=X-Forwarded-Host +# jetty.unixSocketHttpConfig.forwardedServerHeader=X-Forwarded-Server +# jetty.unixSocketHttpConfig.forwardedProtoHeader=X-Forwarded-Proto +# jetty.unixSocketHttpConfig.forwardedForHeader=X-Forwarded-For +# jetty.unixSocketHttpConfig.forwardedSslSessionIdHeader= +# jetty.unixSocketHttpConfig.forwardedCipherSuiteHeader= + + + + diff --git a/jetty-unixsocket/src/main/config/modules/unixsocket-http.mod b/jetty-unixsocket/src/main/config/modules/unixsocket-http.mod new file mode 100644 index 0000000000..3af1f1c50c --- /dev/null +++ b/jetty-unixsocket/src/main/config/modules/unixsocket-http.mod @@ -0,0 +1,8 @@ +[depend] +unixsocket + +[xml] +etc/jetty-unixsocket-http.xml + + + diff --git a/jetty-unixsocket/src/main/config/modules/unixsocket-http2c.mod b/jetty-unixsocket/src/main/config/modules/unixsocket-http2c.mod new file mode 100644 index 0000000000..93a08db0a6 --- /dev/null +++ b/jetty-unixsocket/src/main/config/modules/unixsocket-http2c.mod @@ -0,0 +1,16 @@ +[depend] +unixsocket-http + +[lib] +lib/http2/*.jar + +[xml] +etc/jetty-unixsocket-http2c.xml + +[ini-template] +## Max number of concurrent streams per connection +# jetty.http2.maxConcurrentStreams=1024 + +## Initial stream send (server to client) window +# jetty.http2.initialStreamSendWindow=65535 + diff --git a/jetty-unixsocket/src/main/config/modules/unixsocket-proxy-protocol.mod b/jetty-unixsocket/src/main/config/modules/unixsocket-proxy-protocol.mod new file mode 100644 index 0000000000..386000443a --- /dev/null +++ b/jetty-unixsocket/src/main/config/modules/unixsocket-proxy-protocol.mod @@ -0,0 +1,9 @@ +# +# PROXY Protocol Module - UnixSocket +# + +[depend] +unixsocket + +[xml] +etc/jetty-unixsocket-proxy-protocol.xml diff --git a/jetty-unixsocket/src/main/config/modules/unixsocket-secure.mod b/jetty-unixsocket/src/main/config/modules/unixsocket-secure.mod new file mode 100644 index 0000000000..054f31747c --- /dev/null +++ b/jetty-unixsocket/src/main/config/modules/unixsocket-secure.mod @@ -0,0 +1,10 @@ +[depend] +unixsocket-http + +[xml] +etc/jetty-unixsocket-secure.xml + +[ini-template] +### SecureRequestCustomizer Configuration + + diff --git a/jetty-unixsocket/src/main/config/modules/unixsocket.mod b/jetty-unixsocket/src/main/config/modules/unixsocket.mod new file mode 100644 index 0000000000..51377b7de2 --- /dev/null +++ b/jetty-unixsocket/src/main/config/modules/unixsocket.mod @@ -0,0 +1,49 @@ +# +# Jetty UnixSocket Connector +# + +[depend] +server + +[xml] +etc/jetty-unixsocket.xml + +[files] +maven://com.github.jnr/jnr-unixsocket/0.8|lib/jnr/jnr-unixsocket-0.8.jar +maven://com.github.jnr/jnr-ffi/2.0.3|lib/jnr/jnr-ffi-2.0.3.jar +maven://com.github.jnr/jffi/1.2.9|lib/jnr/jffi-1.2.9.jar +maven://com.github.jnr/jffi/1.2.9/jar/native|lib/jnr/jffi-1.2.9-native.jar +maven://org.ow2.asm/asm/5.0.1|lib/jnr/asm-5.0.1.jar +maven://org.ow2.asm/asm-commons/5.0.1|lib/jnr/asm-commons-5.0.1.jar +maven://org.ow2.asm/asm-analysis/5.0.3|lib/jnr/asm-analysis-5.0.3.jar +maven://org.ow2.asm/asm-tree/5.0.3|lib/jnr/asm-tree-5.0.3.jar +maven://org.ow2.asm/asm-util/5.0.3|lib/jnr/asm-util-5.0.3.jar +maven://com.github.jnr/jnr-x86asm/1.0.2|lib/jnr/jnr-x86asm-1.0.2.jar +maven://com.github.jnr/jnr-constants/0.8.7|lib/jnr/jnr-constants-0.8.7.jar +maven://com.github.jnr/jnr-enxio/0.9|lib/jnr/jnr-enxio-0.9.jar +maven://com.github.jnr/jnr-posix/3.0.12|lib/jnr/jnr-posix-3.0.12.jar + +[lib] +lib/jetty-unixsocket-${jetty.version}.jar +lib/jnr/*.jar + +[license] +Jetty UnixSockets is implmented using the Java Native Runtime, which is an +open source project hosted on Github and released under the Apache 2.0 license. +https://github.com/jnr/jnr-unixsocket +http://www.apache.org/licenses/LICENSE-2.0.html + +[ini-template] +### HTTP Connector Configuration + +## Connector host/address to bind to +# jetty.unixsocket=/tmp/jetty.sock + +## Connector idle timeout in milliseconds +# jetty.unixsocket.idleTimeout=30000 + +## Number of selectors (-1 picks default 1) +# jetty.unixsocket.selectors=-1 + +## ServerSocketChannel backlog (0 picks platform default) +# jetty.unixsocket.acceptorQueueSize=0 diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketConnector.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketConnector.java new file mode 100644 index 0000000000..bc004d0afa --- /dev/null +++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketConnector.java @@ -0,0 +1,436 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.unixsocket; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.server.AbstractConnectionFactory; +import org.eclipse.jetty.server.AbstractConnector; +import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.SslConnectionFactory; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.Name; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.Scheduler; + +import jnr.enxio.channels.NativeSelectorProvider; +import jnr.unixsocket.UnixServerSocketChannel; +import jnr.unixsocket.UnixSocketAddress; +import jnr.unixsocket.UnixSocketChannel; + +/** + * + */ +@ManagedObject("HTTP connector using NIO ByteChannels and Selectors") +public class UnixSocketConnector extends AbstractConnector +{ + private static final Logger LOG = Log.getLogger(UnixSocketConnector.class); + + private final SelectorManager _manager; + private String _unixSocket = "/tmp/jetty.sock"; + private volatile UnixServerSocketChannel _acceptChannel; + private volatile int _acceptQueueSize = 0; + private volatile boolean _reuseAddress = true; + + + /* ------------------------------------------------------------ */ + /** HTTP Server Connection. + * <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p> + * @param server The {@link Server} this connector will accept connection for. + */ + public UnixSocketConnector( @Name("server") Server server) + { + this(server,null,null,null,-1,new HttpConnectionFactory()); + } + + /* ------------------------------------------------------------ */ + /** HTTP Server Connection. + * <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p> + * @param server The {@link Server} this connector will accept connection for. + * @param selectors + * the number of selector threads, or <=0 for a default value. Selectors notice and schedule established connection that can make IO progress. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("selectors") int selectors) + { + this(server,null,null,null,selectors,new HttpConnectionFactory()); + } + + /* ------------------------------------------------------------ */ + /** HTTP Server Connection. + * <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p> + * @param server The {@link Server} this connector will accept connection for. + * @param selectors + * the number of selector threads, or <=0 for a default value. Selectors notice and schedule established connection that can make IO progress. + * @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("selectors") int selectors, + @Name("factories") ConnectionFactory... factories) + { + this(server,null,null,null,selectors,factories); + } + + /* ------------------------------------------------------------ */ + /** Generic Server Connection with default configuration. + * <p>Construct a Server Connector with the passed Connection factories.</p> + * @param server The {@link Server} this connector will accept connection for. + * @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("factories") ConnectionFactory... factories) + { + this(server,null,null,null,-1,factories); + } + + /* ------------------------------------------------------------ */ + /** HTTP Server Connection. + * <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the primary protocol</p>. + * @param server The {@link Server} this connector will accept connection for. + * @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the + * list of HTTP Connection Factory. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("sslContextFactory") SslContextFactory sslContextFactory) + { + this(server,null,null,null,-1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory())); + } + + /* ------------------------------------------------------------ */ + /** HTTP Server Connection. + * <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the primary protocol</p>. + * @param server The {@link Server} this connector will accept connection for. + * @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the + * list of HTTP Connection Factory. + * @param selectors + * the number of selector threads, or <=0 for a default value. Selectors notice and schedule established connection that can make IO progress. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("selectors") int selectors, + @Name("sslContextFactory") SslContextFactory sslContextFactory) + { + this(server,null,null,null,selectors,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory())); + } + + /* ------------------------------------------------------------ */ + /** Generic SSL Server Connection. + * @param server The {@link Server} this connector will accept connection for. + * @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the + * list of ConnectionFactories, with the first factory being the default protocol for the SslConnectionFactory. + * @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("sslContextFactory") SslContextFactory sslContextFactory, + @Name("factories") ConnectionFactory... factories) + { + this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory, factories)); + } + + /** Generic Server Connection. + * @param server + * The server this connector will be accept connection for. + * @param executor + * An executor used to run tasks for handling requests, acceptors and selectors. + * If null then use the servers executor + * @param scheduler + * A scheduler used to schedule timeouts. If null then use the servers scheduler + * @param bufferPool + * A ByteBuffer pool used to allocate buffers. If null then create a private pool with default configuration. + * @param selectors + * the number of selector threads, or <=0 for a default value(1). Selectors notice and schedule established connection that can make IO progress. + * @param factories + * Zero or more {@link ConnectionFactory} instances used to create and configure connections. + */ + public UnixSocketConnector( + @Name("server") Server server, + @Name("executor") Executor executor, + @Name("scheduler") Scheduler scheduler, + @Name("bufferPool") ByteBufferPool bufferPool, + @Name("selectors") int selectors, + @Name("factories") ConnectionFactory... factories) + { + super(server,executor,scheduler,bufferPool,0,factories); + _manager = newSelectorManager(getExecutor(), getScheduler(), + selectors>0?selectors:1); + addBean(_manager, true); + setAcceptorPriorityDelta(-2); + } + + @ManagedAttribute + public String getUnixSocket() + { + return _unixSocket; + } + + public void setUnixSocket(String filename) + { + _unixSocket=filename; + } + + protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors) + { + return new UnixSocketConnectorManager(executor, scheduler, selectors); + } + + @Override + protected void doStart() throws Exception + { + open(); + super.doStart(); + + if (getAcceptors()==0) + _manager.acceptor(_acceptChannel); + } + + @Override + protected void doStop() throws Exception + { + super.doStop(); + close(); + } + + public boolean isOpen() + { + UnixServerSocketChannel channel = _acceptChannel; + return channel!=null && channel.isOpen(); + } + + + public void open() throws IOException + { + if (_acceptChannel == null) + { + UnixServerSocketChannel serverChannel = UnixServerSocketChannel.open(); + SocketAddress bindAddress = new UnixSocketAddress(new File(_unixSocket)); + serverChannel.socket().bind(bindAddress, getAcceptQueueSize()); + serverChannel.configureBlocking(getAcceptors()>0); + addBean(serverChannel); + + LOG.debug("opened {}",serverChannel); + _acceptChannel = serverChannel; + } + } + + @Override + public Future<Void> shutdown() + { + // shutdown all the connections + return super.shutdown(); + } + + public void close() + { + UnixServerSocketChannel serverChannel = _acceptChannel; + _acceptChannel = null; + + if (serverChannel != null) + { + removeBean(serverChannel); + + // If the interrupt did not close it, we should close it + if (serverChannel.isOpen()) + { + try + { + serverChannel.close(); + } + catch (IOException e) + { + LOG.warn(e); + } + } + + new File(_unixSocket).delete(); + } + } + + @Override + public void accept(int acceptorID) throws IOException + { + LOG.warn("Blocking UnixSocket accept used. Cannot be interrupted!"); + UnixServerSocketChannel serverChannel = _acceptChannel; + if (serverChannel != null && serverChannel.isOpen()) + { + LOG.debug("accept {}",serverChannel); + UnixSocketChannel channel = serverChannel.accept(); + LOG.debug("accepted {}",channel); + accepted(channel); + } + } + + protected void accepted(UnixSocketChannel channel) throws IOException + { + channel.configureBlocking(false); + _manager.accept(channel); + } + + public SelectorManager getSelectorManager() + { + return _manager; + } + + @Override + public Object getTransport() + { + return _acceptChannel; + } + + protected UnixSocketEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException + { + return new UnixSocketEndPoint((UnixSocketChannel)channel,selector,key,getScheduler()); + } + + + /** + * @return the accept queue size + */ + @ManagedAttribute("Accept Queue size") + public int getAcceptQueueSize() + { + return _acceptQueueSize; + } + + /** + * @param acceptQueueSize the accept queue size (also known as accept backlog) + */ + public void setAcceptQueueSize(int acceptQueueSize) + { + _acceptQueueSize = acceptQueueSize; + } + + /** + * @return whether the server socket reuses addresses + * @see ServerSocket#getReuseAddress() + */ + public boolean getReuseAddress() + { + return _reuseAddress; + } + + /** + * @param reuseAddress whether the server socket reuses addresses + * @see ServerSocket#setReuseAddress(boolean) + */ + public void setReuseAddress(boolean reuseAddress) + { + _reuseAddress = reuseAddress; + } + + + @Override + public String toString() + { + return String.format("%s{%s}", + super.toString(), + _unixSocket); + } + + protected class UnixSocketConnectorManager extends SelectorManager + { + public UnixSocketConnectorManager(Executor executor, Scheduler scheduler, int selectors) + { + super(executor, scheduler, selectors); + } + + @Override + protected void accepted(SelectableChannel channel) throws IOException + { + UnixSocketConnector.this.accepted((UnixSocketChannel)channel); + } + + @Override + protected Selector newSelector() throws IOException + { + return NativeSelectorProvider.getInstance().openSelector(); + } + + @Override + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException + { + UnixSocketEndPoint endp = UnixSocketConnector.this.newEndPoint(channel, selector, selectionKey); + endp.setIdleTimeout(getIdleTimeout()); + return endp; + } + + @Override + public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException + { + return getDefaultConnectionFactory().newConnection(UnixSocketConnector.this, endpoint); + } + + @Override + protected void endPointOpened(EndPoint endpoint) + { + super.endPointOpened(endpoint); + onEndPointOpened(endpoint); + } + + @Override + protected void endPointClosed(EndPoint endpoint) + { + onEndPointClosed(endpoint); + super.endPointClosed(endpoint); + } + + @Override + protected boolean doFinishConnect(SelectableChannel channel) throws IOException + { + return ((UnixSocketChannel)channel).finishConnect(); + } + + @Override + protected boolean isConnectionPending(SelectableChannel channel) + { + return ((UnixSocketChannel)channel).isConnectionPending(); + } + + @Override + protected SelectableChannel doAccept(SelectableChannel server) throws IOException + { + LOG.debug("doAccept async {}",server); + UnixSocketChannel channel = ((UnixServerSocketChannel)server).accept(); + LOG.debug("accepted async {}",channel); + return channel; + } + } +} diff --git a/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java new file mode 100644 index 0000000000..f6ece100d0 --- /dev/null +++ b/jetty-unixsocket/src/main/java/org/eclipse/jetty/unixsocket/UnixSocketEndPoint.java @@ -0,0 +1,74 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.unixsocket; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; + +import org.eclipse.jetty.io.ChannelEndPoint; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + +import jnr.unixsocket.UnixSocketChannel; + +public class UnixSocketEndPoint extends ChannelEndPoint +{ + public final static InetSocketAddress NOIP=new InetSocketAddress(0); + private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class); + + private final UnixSocketChannel _channel; + + public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) + { + super(channel,selector,key,scheduler); + _channel=channel; + } + + @Override + public InetSocketAddress getLocalAddress() + { + return null; + } + + @Override + public InetSocketAddress getRemoteAddress() + { + return null; + } + + + @Override + protected void doShutdownOutput() + { + if (LOG.isDebugEnabled()) + LOG.debug("oshut {}", this); + try + { + _channel.shutdownOutput(); + super.doShutdownOutput(); + } + catch (IOException e) + { + LOG.debug(e); + } + } +} diff --git a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketClient.java b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketClient.java new file mode 100644 index 0000000000..142317d297 --- /dev/null +++ b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketClient.java @@ -0,0 +1,57 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.unixsocket; + +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.nio.CharBuffer; +import java.nio.channels.Channels; +import java.util.Date; + +import jnr.unixsocket.UnixSocketAddress; +import jnr.unixsocket.UnixSocketChannel; + +public class UnixSocketClient +{ + public static void main(String[] args) throws Exception + { + java.io.File path = new java.io.File("/tmp/jetty.sock"); + String data = "GET / HTTP/1.1\r\nHost: unixsock\r\n\r\n"; + UnixSocketAddress address = new UnixSocketAddress(path); + UnixSocketChannel channel = UnixSocketChannel.open(address); + System.out.println("connected to " + channel.getRemoteSocketAddress()); + + PrintWriter w = new PrintWriter(Channels.newOutputStream(channel)); + InputStreamReader r = new InputStreamReader(Channels.newInputStream(channel)); + + while (true) + { + w.print(data); + w.flush(); + + CharBuffer result = CharBuffer.allocate(4096); + r.read(result); + result.flip(); + System.out.println("read from server: " + result.toString()); + + Thread.sleep(1000); + } + } +} + diff --git a/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketServer.java b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketServer.java new file mode 100644 index 0000000000..20e3a73c4f --- /dev/null +++ b/jetty-unixsocket/src/test/java/org/eclipse/jetty/unixsocket/UnixSocketServer.java @@ -0,0 +1,63 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.unixsocket; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.ProxyConnectionFactory; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; + +public class UnixSocketServer +{ + public static void main (String... args) throws Exception + { + Server server = new Server(); + + HttpConnectionFactory http = new HttpConnectionFactory(); + ProxyConnectionFactory proxy = new ProxyConnectionFactory(http.getProtocol()); + UnixSocketConnector connector = new UnixSocketConnector(server,proxy,http); + server.addConnector(connector); + + server.setHandler(new AbstractHandler() + { + + @Override + protected void doHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setStatus(200); + response.getWriter().write("Hello World\r\n"); + response.getWriter().write("remote="+request.getRemoteAddr()+":"+request.getRemotePort()+"\r\n"); + response.getWriter().write("local ="+request.getLocalAddr()+":"+request.getLocalPort()+"\r\n"); + } + + }); + + server.start(); + server.join(); + } +} diff --git a/jetty-unixsocket/src/test/resources/haproxy b/jetty-unixsocket/src/test/resources/haproxy Binary files differnew file mode 100755 index 0000000000..73db7b00b8 --- /dev/null +++ b/jetty-unixsocket/src/test/resources/haproxy diff --git a/jetty-unixsocket/src/test/resources/jetty-logging.properties b/jetty-unixsocket/src/test/resources/jetty-logging.properties new file mode 100644 index 0000000000..a825af95f3 --- /dev/null +++ b/jetty-unixsocket/src/test/resources/jetty-logging.properties @@ -0,0 +1,7 @@ +org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog +#org.eclipse.jetty.LEVEL=DEBUG +#org.eclipse.jetty.client.LEVEL=DEBUG +#org.eclipse.jetty.proxy.LEVEL=DEBUG +org.eclipse.jetty.unixsocket.LEVEL=DEBUG +org.eclipse.jetty.io.LEVEL=DEBUG +org.eclipse.jetty.server.ProxyConnectionFactory.LEVEL=DEBUG
\ No newline at end of file diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java index 35180c4aca..a71a4a36c9 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.websocket.client.io; import java.io.IOException; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; @@ -31,6 +32,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -53,7 +55,7 @@ public class WebSocketClientSelectorManager extends SelectorManager } @Override - protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) + protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) { if (LOG.isDebugEnabled()) LOG.debug("Connection Failed",ex); @@ -67,7 +69,7 @@ public class WebSocketClientSelectorManager extends SelectorManager } @Override - public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) throws IOException + public Connection newConnection(final SelectableChannel channel, EndPoint endPoint, final Object attachment) throws IOException { if (LOG.isDebugEnabled()) LOG.debug("newConnection({},{},{})",channel,endPoint,attachment); @@ -114,24 +116,33 @@ public class WebSocketClientSelectorManager extends SelectorManager } } + @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException { if (LOG.isDebugEnabled()) - LOG.debug("newEndPoint({}, {}, {})",channel,selectSet,selectionKey); - return new SelectChannelEndPoint(channel,selectSet,selectionKey,getScheduler(),policy.getIdleTimeout()); + LOG.debug("newEndPoint({}, {}, {})",channel,selector,selectionKey); + SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler()); + endp.setIdleTimeout(policy.getIdleTimeout()); + return endp; } - public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel) + public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SelectableChannel channel) { - String peerHost = channel.socket().getInetAddress().getHostName(); - int peerPort = channel.socket().getPort(); + String peerHost = null; + int peerPort = 0; + if (channel instanceof SocketChannel) + { + SocketChannel sc = (SocketChannel)channel; + peerHost = sc.socket().getInetAddress().getHostName(); + peerPort = sc.socket().getPort(); + } SSLEngine engine = sslContextFactory.newSSLEngine(peerHost,peerPort); engine.setUseClientMode(true); return engine; } - public UpgradeConnection newUpgradeConnection(SocketChannel channel, EndPoint endPoint, ConnectPromise connectPromise) + public UpgradeConnection newUpgradeConnection(SelectableChannel channel, EndPoint endPoint, ConnectPromise connectPromise) { WebSocketClient client = connectPromise.getClient(); Executor executor = client.getExecutor(); diff --git a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java index 2cbc3ab747..68079c385c 100644 --- a/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java +++ b/jetty-websocket/websocket-client/src/test/java/org/eclipse/jetty/websocket/client/ClientCloseTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.*; import java.io.IOException; import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Arrays; @@ -37,6 +38,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SelectChannelEndPoint; +import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.toolchain.test.EventQueue; import org.eclipse.jetty.toolchain.test.TestTracker; import org.eclipse.jetty.util.BufferUtil; @@ -283,19 +285,21 @@ public class ClientCloseTest } @Override - protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException + protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { - return new TestEndPoint(channel,selectSet,selectionKey,getScheduler(),getPolicy().getIdleTimeout()); + TestEndPoint endp = new TestEndPoint(channel,selectSet,selectionKey,getScheduler()); + endp.setIdleTimeout(getPolicy().getIdleTimeout()); + return endp; } } - public static class TestEndPoint extends SelectChannelEndPoint + public static class TestEndPoint extends SocketChannelEndPoint { public AtomicBoolean congestedFlush = new AtomicBoolean(false); - public TestEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout) + public TestEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler) { - super(channel,selector,key,scheduler,idleTimeout); + super((SocketChannel)channel,selector,key,scheduler); } @Override @@ -531,6 +531,7 @@ <module>jetty-nosql</module> <module>jetty-infinispan</module> <module>jetty-gcloud</module> + <module>jetty-unixsocket</module> <module>tests</module> <module>examples</module> <module>jetty-quickstart</module> |