Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGreg Wilkins2011-10-27 05:37:07 +0000
committerGreg Wilkins2011-10-27 05:37:07 +0000
commit738cbfdccc1b4c07eea7dd954130af7da7c90401 (patch)
tree5b5923bd490583721a38fe90d29ad29828690ec7 /jetty-client/src/main/java/org/eclipse/jetty
parent3dfd8b7698282994b28ad6877018ad100e086dad (diff)
downloadorg.eclipse.jetty.project-738cbfdccc1b4c07eea7dd954130af7da7c90401.tar.gz
org.eclipse.jetty.project-738cbfdccc1b4c07eea7dd954130af7da7c90401.tar.xz
org.eclipse.jetty.project-738cbfdccc1b4c07eea7dd954130af7da7c90401.zip
refactored client to use upgradeable endpoint. Instert SslConnection when needed
Diffstat (limited to 'jetty-client/src/main/java/org/eclipse/jetty')
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java4
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java8
-rw-r--r--jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java277
3 files changed, 138 insertions, 151 deletions
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java
index 0acd2b3d33..2f65309c54 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractHttpConnection.java
@@ -13,7 +13,6 @@
package org.eclipse.jetty.client;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@@ -354,7 +353,8 @@ public abstract class AbstractHttpConnection extends AbstractConnection implemen
@Override
public String toString()
{
- return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
+ return "HttpConnection@" + hashCode() + "//" +
+ (_destination==null?"?.?.?.?:??":(_destination.getAddress().getHost() + ":" + _destination.getAddress().getPort()));
}
public String toDetailString()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
index 310854026a..63f8390d12 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
@@ -352,9 +352,9 @@ public class HttpDestination implements Dumpable
else
{
EndPoint endPoint = connection.getEndPoint();
- if (isProxied() && endPoint instanceof SelectConnector.ProxySelectChannelEndPoint)
+ if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
{
- SelectConnector.ProxySelectChannelEndPoint proxyEndPoint = (SelectConnector.ProxySelectChannelEndPoint)endPoint;
+ SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
HttpExchange exchange = _queue.get(0);
ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint, exchange);
connect.setAddress(getProxy());
@@ -668,10 +668,10 @@ public class HttpDestination implements Dumpable
private class ConnectExchange extends ContentExchange
{
- private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
+ private final SelectConnector.UpgradableEndPoint proxyEndPoint;
private final HttpExchange exchange;
- public ConnectExchange(Address serverAddress, SelectConnector.ProxySelectChannelEndPoint proxyEndPoint, HttpExchange exchange)
+ public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint, HttpExchange exchange)
{
this.proxyEndPoint = proxyEndPoint;
this.exchange = exchange;
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java
index 92558d4e1e..c9aff5a10f 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java
@@ -24,23 +24,23 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
-import org.eclipse.jetty.http.HttpGenerator;
-import org.eclipse.jetty.http.HttpParser;
+import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
-import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.AsyncConnection;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
+import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Timeout;
+import org.eclipse.jetty.util.thread.Timeout.Task;
class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
{
@@ -49,8 +49,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
private final HttpClient _httpClient;
private final Manager _selectorManager=new Manager();
private final Map<SocketChannel, Timeout.Task> _connectingChannels = new ConcurrentHashMap<SocketChannel, Timeout.Task>();
- private Buffers _sslBuffers;
-
+
/**
* @param httpClient the HttpClient this connector is associated to
*/
@@ -65,16 +64,6 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
{
super.doStart();
-
- final boolean direct=_httpClient.getUseDirectBuffers();
-
- SSLEngine sslEngine=_selectorManager.newSslEngine(null);
- final SSLSession ssl_session=sslEngine.getSession();
- _sslBuffers = BuffersFactory.newBuffers(
- direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
- direct?Type.DIRECT:Type.INDIRECT,ssl_session.getApplicationBufferSize(),
- direct?Type.DIRECT:Type.INDIRECT,1024);
-
_selectorManager.start();
}
@@ -129,6 +118,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
/* ------------------------------------------------------------ */
class Manager extends SelectorManager
{
+ Logger LOG = SelectConnector.LOG;
+
@Override
public boolean dispatch(Runnable task)
{
@@ -151,11 +142,8 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
}
@Override
- protected AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
+ public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
- if (endpoint instanceof SslSelectChannelEndPoint)
- return new AsyncHttpConnection(_sslBuffers,_sslBuffers,endpoint);
-
return new AsyncHttpConnection(_httpClient.getRequestBuffers(),_httpClient.getResponseBuffers(),endpoint);
}
@@ -172,32 +160,29 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
// key should have destination at this point (will be replaced by endpoint after this call)
HttpDestination dest=(HttpDestination)key.attachment();
- SelectChannelEndPoint ep=null;
+ AsyncEndPoint ep=null;
+ SelectChannelEndPoint scep= new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
+ ep = scep;
+
if (dest.isSecure())
{
- if (dest.isProxied())
- {
- SSLEngine engine=newSslEngine(channel);
- ep = new ProxySelectChannelEndPoint(channel, selectSet, key, _sslBuffers, engine, (int)_httpClient.getIdleTimeout());
- }
- else
- {
- SSLEngine engine=newSslEngine(channel);
- SslSelectChannelEndPoint sslEp = new SslSelectChannelEndPoint(_sslBuffers, channel, selectSet, key, engine, (int)_httpClient.getIdleTimeout());
- sslEp.setAllowRenegotiate(_httpClient.getSslContextFactory().isAllowRenegotiate());
- ep = sslEp;
- }
- }
- else
- {
- ep = new SelectChannelEndPoint(channel, selectSet, key, (int)_httpClient.getIdleTimeout());
+ LOG.debug("secure to {}, proxied={}",channel,dest.isProxied());
+ ep = new UpgradableEndPoint(ep,newSslEngine(channel));
}
-
- AbstractHttpConnection connection=(AbstractHttpConnection)ep.getConnection();
- connection.setDestination(dest);
- dest.onNewConnection(connection);
- return ep;
+
+ AsyncConnection connection = selectSet.getManager().newConnection(channel,ep, key.attachment());
+ ep.setConnection(connection);
+
+ AbstractHttpConnection httpConnection=(AbstractHttpConnection)connection;
+ httpConnection.setDestination(dest);
+
+ if (dest.isSecure() && !dest.isProxied())
+ ((UpgradableEndPoint)ep).upgrade();
+
+ dest.onNewConnection(httpConnection);
+
+ return scep;
}
private synchronized SSLEngine newSslEngine(SocketChannel channel) throws IOException
@@ -268,204 +253,206 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
}
}
}
-
- /**
- * An endpoint that is able to "upgrade" from a normal endpoint to a SSL endpoint.
- * Since {@link HttpParser} and {@link HttpGenerator} only depend on the {@link EndPoint}
- * interface, this class overrides all methods of {@link EndPoint} to provide the right
- * behavior depending on the fact that it has been upgraded or not.
- */
- public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
+
+ public static class UpgradableEndPoint implements AsyncEndPoint
{
- private final SelectChannelEndPoint plainEndPoint;
- private volatile boolean upgraded = false;
-
- public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
+ AsyncEndPoint _endp;
+ SSLEngine _engine;
+
+ public UpgradableEndPoint(AsyncEndPoint endp, SSLEngine engine) throws IOException
{
- super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
- this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
+ _engine=engine;
+ _endp=endp;
}
public void upgrade()
{
- upgraded = true;
+ AsyncHttpConnection connection = (AsyncHttpConnection) ((SelectChannelEndPoint)_endp).getConnection();
+
+ SslConnection sslConnection = new SslConnection(_engine,_endp);
+ ((SelectChannelEndPoint)_endp).setConnection(sslConnection);
+
+ _endp=sslConnection.getSslEndPoint();
+ sslConnection.setConnection(connection);
+
+ LOG.debug("upgrade {} to {} for {}",this,sslConnection,connection);
+ }
+
+
+ public Connection getConnection()
+ {
+ return _endp.getConnection();
+ }
+
+ public void setConnection(Connection connection)
+ {
+ _endp.setConnection(connection);
}
public void shutdownOutput() throws IOException
{
- if (upgraded)
- super.shutdownOutput();
- else
- plainEndPoint.shutdownOutput();
+ _endp.shutdownOutput();
+ }
+
+ public void asyncDispatch()
+ {
+ _endp.asyncDispatch();
+ }
+
+ public boolean isOutputShutdown()
+ {
+ return _endp.isOutputShutdown();
+ }
+
+ public void shutdownInput() throws IOException
+ {
+ _endp.shutdownInput();
+ }
+
+ public void scheduleWrite()
+ {
+ _endp.scheduleWrite();
+ }
+
+ public boolean isInputShutdown()
+ {
+ return _endp.isInputShutdown();
}
public void close() throws IOException
{
- if (upgraded)
- super.close();
- else
- plainEndPoint.close();
+ _endp.close();
+ }
+
+ public void scheduleIdle()
+ {
+ _endp.scheduleIdle();
}
public int fill(Buffer buffer) throws IOException
{
- if (upgraded)
- return super.fill(buffer);
- else
- return plainEndPoint.fill(buffer);
+ return _endp.fill(buffer);
+ }
+
+ public void cancelIdle()
+ {
+ _endp.cancelIdle();
+ }
+
+ public boolean isWritable()
+ {
+ return _endp.isWritable();
+ }
+
+ public boolean hasProgressed()
+ {
+ return _endp.hasProgressed();
}
public int flush(Buffer buffer) throws IOException
{
- if (upgraded)
- return super.flush(buffer);
- else
- return plainEndPoint.flush(buffer);
+ return _endp.flush(buffer);
+ }
+
+ public void scheduleTimeout(Task task, long timeoutMs)
+ {
+ _endp.scheduleTimeout(task,timeoutMs);
+ }
+
+ public void cancelTimeout(Task task)
+ {
+ _endp.cancelTimeout(task);
}
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
- if (upgraded)
- return super.flush(header, buffer, trailer);
- else
- return plainEndPoint.flush(header, buffer, trailer);
+ return _endp.flush(header,buffer,trailer);
}
public String getLocalAddr()
{
- if (upgraded)
- return super.getLocalAddr();
- else
- return plainEndPoint.getLocalAddr();
+ return _endp.getLocalAddr();
}
public String getLocalHost()
{
- if (upgraded)
- return super.getLocalHost();
- else
- return plainEndPoint.getLocalHost();
+ return _endp.getLocalHost();
}
public int getLocalPort()
{
- if (upgraded)
- return super.getLocalPort();
- else
- return plainEndPoint.getLocalPort();
+ return _endp.getLocalPort();
}
public String getRemoteAddr()
{
- if (upgraded)
- return super.getRemoteAddr();
- else
- return plainEndPoint.getRemoteAddr();
+ return _endp.getRemoteAddr();
}
public String getRemoteHost()
{
- if (upgraded)
- return super.getRemoteHost();
- else
- return plainEndPoint.getRemoteHost();
+ return _endp.getRemoteHost();
}
public int getRemotePort()
{
- if (upgraded)
- return super.getRemotePort();
- else
- return plainEndPoint.getRemotePort();
+ return _endp.getRemotePort();
}
public boolean isBlocking()
{
- if (upgraded)
- return super.isBlocking();
- else
- return plainEndPoint.isBlocking();
+ return _endp.isBlocking();
}
public boolean isBufferred()
{
- if (upgraded)
- return super.isBufferred();
- else
- return plainEndPoint.isBufferred();
+ return _endp.isBufferred();
}
public boolean blockReadable(long millisecs) throws IOException
{
- if (upgraded)
- return super.blockReadable(millisecs);
- else
- return plainEndPoint.blockReadable(millisecs);
+ return _endp.blockReadable(millisecs);
}
public boolean blockWritable(long millisecs) throws IOException
{
- if (upgraded)
- return super.blockWritable(millisecs);
- else
- return plainEndPoint.blockWritable(millisecs);
+ return _endp.blockWritable(millisecs);
}
public boolean isOpen()
{
- if (upgraded)
- return super.isOpen();
- else
- return plainEndPoint.isOpen();
+ return _endp.isOpen();
}
public Object getTransport()
{
- if (upgraded)
- return super.getTransport();
- else
- return plainEndPoint.getTransport();
+ return _endp.getTransport();
}
public boolean isBufferingInput()
{
- if (upgraded)
- return super.isBufferingInput();
- else
- return plainEndPoint.isBufferingInput();
+ return _endp.isBufferingInput();
}
public boolean isBufferingOutput()
{
- if (upgraded)
- return super.isBufferingOutput();
- else
- return plainEndPoint.isBufferingOutput();
+ return _endp.isBufferingOutput();
}
public void flush() throws IOException
{
- if (upgraded)
- super.flush();
- else
- plainEndPoint.flush();
-
+ _endp.flush();
}
public int getMaxIdleTime()
{
- if (upgraded)
- return super.getMaxIdleTime();
- else
- return plainEndPoint.getMaxIdleTime();
+ return _endp.getMaxIdleTime();
}
public void setMaxIdleTime(int timeMs) throws IOException
{
- if (upgraded)
- super.setMaxIdleTime(timeMs);
- else
- plainEndPoint.setMaxIdleTime(timeMs);
+ _endp.setMaxIdleTime(timeMs);
}
+
}
}

Back to the top