diff options
author | Greg Wilkins | 2011-10-27 05:37:07 +0000 |
---|---|---|
committer | Greg Wilkins | 2011-10-27 05:37:07 +0000 |
commit | 738cbfdccc1b4c07eea7dd954130af7da7c90401 (patch) | |
tree | 5b5923bd490583721a38fe90d29ad29828690ec7 /jetty-client/src/main/java/org/eclipse/jetty | |
parent | 3dfd8b7698282994b28ad6877018ad100e086dad (diff) | |
download | org.eclipse.jetty.project-738cbfdccc1b4c07eea7dd954130af7da7c90401.tar.gz org.eclipse.jetty.project-738cbfdccc1b4c07eea7dd954130af7da7c90401.tar.xz org.eclipse.jetty.project-738cbfdccc1b4c07eea7dd954130af7da7c90401.zip |
refactored client to use upgradeable endpoint. Instert SslConnection when needed
Diffstat (limited to 'jetty-client/src/main/java/org/eclipse/jetty')
3 files changed, 138 insertions, 151 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java index 0acd2b3d33..2f65309c54 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.client; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -354,7 +353,8 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen @Override public String toString() { - return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort(); + return "HttpConnection@" + hashCode() + "//" + + (_destination==null?"?.?.?.?:??":(_destination.getAddress().getHost() + ":" + _destination.getAddress().getPort())); } public String toDetailString() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 310854026a..63f8390d12 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -352,9 +352,9 @@ public class HttpDestination implements Dumpable else { EndPoint endPoint = connection.getEndPoint(); - if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint) + if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint) { - SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint; + SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint; HttpExchange exchange = _queue.get(0); ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange); connect.setAddress(getProxy()); @@ -668,10 +668,10 @@ public class HttpDestination implements Dumpable private class ConnectExchange extends ContentExchange { - private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint; + private final SelectConnector.UpgradableEndPoint proxyEndPoint; private final HttpExchange exchange; - public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange) + public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange) { this.proxyEndPoint = proxyEndPoint; this.exchange = exchange; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java index 92558d4e1e..c9aff5a10f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java @@ -24,23 +24,23 @@ import java.util.concurrent.ConcurrentHashMap; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLSession; -import org.eclipse.jetty.http.HttpGenerator; -import org.eclipse.jetty.http.HttpParser; +import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers.Type; import org.eclipse.jetty.io.BuffersFactory; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; -import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.AsyncConnection; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectorManager; +import org.eclipse.jetty.io.nio.SslConnection; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.Timeout; +import org.eclipse.jetty.util.thread.Timeout.Task; class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector { @@ -49,8 +49,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector private final HttpClient _httpClient; private final Manager _selectorManager=new Manager(); private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>(); - private Buffers _sslBuffers; - + /** * @param httpClient the HttpClient this connector is associated to */ @@ -65,16 +64,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector { super.doStart(); - - final boolean direct=_httpClient.getUseDirectBuffers(); - - SSLEngine sslEngine=_selectorManager.newSslEngine(null); - final SSLSession ssl_session=sslEngine.getSession(); - _sslBuffers = BuffersFactory.newBuffers( - direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(), - direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(), - direct?Type.DIRECT:Type.INDIRECT,1024); - _selectorManager.start(); } @@ -129,6 +118,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector /* ------------------------------------------------------------ */ class Manager extends SelectorManager { + Logger LOG = SelectConnector.LOG; + @Override public boolean dispatch(Runnable task) { @@ -151,11 +142,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector } @Override - protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) + public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment) { - if (endpoint instanceof SslSelectChannelEndPoint) - return new AsyncHttpConnection(_sslBuffers,_sslBuffers,endpoint); - return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint); } @@ -172,32 +160,29 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector // key should have destination at this point (will be replaced by endpoint after this call) HttpDestination dest=(HttpDestination)key.attachment(); - SelectChannelEndPoint ep=null; + AsyncEndPoint ep=null; + SelectChannelEndPoint scep= new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout()); + ep = scep; + if (dest.isSecure()) { - if (dest.isProxied()) - { - SSLEngine engine=newSslEngine(channel); - ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout()); - } - else - { - SSLEngine engine=newSslEngine(channel); - SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout()); - sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate()); - ep = sslEp; - } - } - else - { - ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout()); + LOG.debug("secure to {}, proxied={}",channel,dest.isProxied()); + ep = new UpgradableEndPoint(ep,newSslEngine(channel)); } - - AbstractHttpConnection connection=(AbstractHttpConnection)ep.getConnection(); - connection.setDestination(dest); - dest.onNewConnection(connection); - return ep; + + AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment()); + ep.setConnection(connection); + + AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection; + httpConnection.setDestination(dest); + + if (dest.isSecure() && !dest.isProxied()) + ((UpgradableEndPoint)ep).upgrade(); + + dest.onNewConnection(httpConnection); + + return scep; } private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException @@ -268,204 +253,206 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector } } } - - /** - * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint. - * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint} - * interface, this class overrides all methods of {@link EndPoint} to provide the right - * behavior depending on the fact that it has been upgraded or not. - */ - public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint + + public static class UpgradableEndPoint implements AsyncEndPoint { - private final SelectChannelEndPoint plainEndPoint; - private volatile boolean upgraded = false; - - public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException + AsyncEndPoint _endp; + SSLEngine _engine; + + public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException { - super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout); - this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout); + _engine=engine; + _endp=endp; } public void upgrade() { - upgraded = true; + AsyncHttpConnection connection = (AsyncHttpConnection) ((SelectChannelEndPoint)_endp).getConnection(); + + SslConnection sslConnection = new SslConnection(_engine,_endp); + ((SelectChannelEndPoint)_endp).setConnection(sslConnection); + + _endp=sslConnection.getSslEndPoint(); + sslConnection.setConnection(connection); + + LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection); + } + + + public Connection getConnection() + { + return _endp.getConnection(); + } + + public void setConnection(Connection connection) + { + _endp.setConnection(connection); } public void shutdownOutput() throws IOException { - if (upgraded) - super.shutdownOutput(); - else - plainEndPoint.shutdownOutput(); + _endp.shutdownOutput(); + } + + public void asyncDispatch() + { + _endp.asyncDispatch(); + } + + public boolean isOutputShutdown() + { + return _endp.isOutputShutdown(); + } + + public void shutdownInput() throws IOException + { + _endp.shutdownInput(); + } + + public void scheduleWrite() + { + _endp.scheduleWrite(); + } + + public boolean isInputShutdown() + { + return _endp.isInputShutdown(); } public void close() throws IOException { - if (upgraded) - super.close(); - else - plainEndPoint.close(); + _endp.close(); + } + + public void scheduleIdle() + { + _endp.scheduleIdle(); } public int fill(Buffer buffer) throws IOException { - if (upgraded) - return super.fill(buffer); - else - return plainEndPoint.fill(buffer); + return _endp.fill(buffer); + } + + public void cancelIdle() + { + _endp.cancelIdle(); + } + + public boolean isWritable() + { + return _endp.isWritable(); + } + + public boolean hasProgressed() + { + return _endp.hasProgressed(); } public int flush(Buffer buffer) throws IOException { - if (upgraded) - return super.flush(buffer); - else - return plainEndPoint.flush(buffer); + return _endp.flush(buffer); + } + + public void scheduleTimeout(Task task, long timeoutMs) + { + _endp.scheduleTimeout(task,timeoutMs); + } + + public void cancelTimeout(Task task) + { + _endp.cancelTimeout(task); } public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { - if (upgraded) - return super.flush(header, buffer, trailer); - else - return plainEndPoint.flush(header, buffer, trailer); + return _endp.flush(header,buffer,trailer); } public String getLocalAddr() { - if (upgraded) - return super.getLocalAddr(); - else - return plainEndPoint.getLocalAddr(); + return _endp.getLocalAddr(); } public String getLocalHost() { - if (upgraded) - return super.getLocalHost(); - else - return plainEndPoint.getLocalHost(); + return _endp.getLocalHost(); } public int getLocalPort() { - if (upgraded) - return super.getLocalPort(); - else - return plainEndPoint.getLocalPort(); + return _endp.getLocalPort(); } public String getRemoteAddr() { - if (upgraded) - return super.getRemoteAddr(); - else - return plainEndPoint.getRemoteAddr(); + return _endp.getRemoteAddr(); } public String getRemoteHost() { - if (upgraded) - return super.getRemoteHost(); - else - return plainEndPoint.getRemoteHost(); + return _endp.getRemoteHost(); } public int getRemotePort() { - if (upgraded) - return super.getRemotePort(); - else - return plainEndPoint.getRemotePort(); + return _endp.getRemotePort(); } public boolean isBlocking() { - if (upgraded) - return super.isBlocking(); - else - return plainEndPoint.isBlocking(); + return _endp.isBlocking(); } public boolean isBufferred() { - if (upgraded) - return super.isBufferred(); - else - return plainEndPoint.isBufferred(); + return _endp.isBufferred(); } public boolean blockReadable(long millisecs) throws IOException { - if (upgraded) - return super.blockReadable(millisecs); - else - return plainEndPoint.blockReadable(millisecs); + return _endp.blockReadable(millisecs); } public boolean blockWritable(long millisecs) throws IOException { - if (upgraded) - return super.blockWritable(millisecs); - else - return plainEndPoint.blockWritable(millisecs); + return _endp.blockWritable(millisecs); } public boolean isOpen() { - if (upgraded) - return super.isOpen(); - else - return plainEndPoint.isOpen(); + return _endp.isOpen(); } public Object getTransport() { - if (upgraded) - return super.getTransport(); - else - return plainEndPoint.getTransport(); + return _endp.getTransport(); } public boolean isBufferingInput() { - if (upgraded) - return super.isBufferingInput(); - else - return plainEndPoint.isBufferingInput(); + return _endp.isBufferingInput(); } public boolean isBufferingOutput() { - if (upgraded) - return super.isBufferingOutput(); - else - return plainEndPoint.isBufferingOutput(); + return _endp.isBufferingOutput(); } public void flush() throws IOException { - if (upgraded) - super.flush(); - else - plainEndPoint.flush(); - + _endp.flush(); } public int getMaxIdleTime() { - if (upgraded) - return super.getMaxIdleTime(); - else - return plainEndPoint.getMaxIdleTime(); + return _endp.getMaxIdleTime(); } public void setMaxIdleTime(int timeMs) throws IOException { - if (upgraded) - super.setMaxIdleTime(timeMs); - else - plainEndPoint.setMaxIdleTime(timeMs); + _endp.setMaxIdleTime(timeMs); } + } } |