diff options
Diffstat (limited to 'jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java')
-rw-r--r-- | jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java | 631 |
1 files changed, 0 insertions, 631 deletions
diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java deleted file mode 100644 index 349672abde..0000000000 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java +++ /dev/null @@ -1,631 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - - -package org.eclipse.jetty.spdy.server.proxy; - -import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; -import org.eclipse.jetty.spdy.api.DataInfo; -import org.eclipse.jetty.spdy.api.GoAwayInfo; -import org.eclipse.jetty.spdy.api.GoAwayResultInfo; -import org.eclipse.jetty.spdy.api.HeadersInfo; -import org.eclipse.jetty.spdy.api.Info; -import org.eclipse.jetty.spdy.api.PushInfo; -import org.eclipse.jetty.spdy.api.ReplyInfo; -import org.eclipse.jetty.spdy.api.RstInfo; -import org.eclipse.jetty.spdy.api.SPDY; -import org.eclipse.jetty.spdy.api.Session; -import org.eclipse.jetty.spdy.api.SessionFrameListener; -import org.eclipse.jetty.spdy.api.Stream; -import org.eclipse.jetty.spdy.api.StreamFrameListener; -import org.eclipse.jetty.spdy.api.StreamStatus; -import org.eclipse.jetty.spdy.api.SynInfo; -import org.eclipse.jetty.spdy.client.SPDYClient; -import org.eclipse.jetty.spdy.http.HTTPSPDYHeader; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.Fields; -import org.eclipse.jetty.util.Promise; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - -/** - * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into - * SPDY events for the servers.</p> - */ -public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener -{ - private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class); - - private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise"; - private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream"; - - private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>(); - private final SessionFrameListener sessionListener = new ProxySessionFrameListener(); - private final SPDYClient.Factory factory; - private volatile long connectTimeout = 15000; - private volatile long timeout = 60000; - - public SPDYProxyEngine(SPDYClient.Factory factory) - { - this.factory = factory; - } - - public long getConnectTimeout() - { - return connectTimeout; - } - - public void setConnectTimeout(long connectTimeout) - { - this.connectTimeout = connectTimeout; - } - - public long getTimeout() - { - return timeout; - } - - public void setTimeout(long timeout) - { - this.timeout = timeout; - } - - public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo) - { - Fields headers = new Fields(clientSynInfo.getHeaders(), false); - - short serverVersion = getVersion(proxyServerInfo.getProtocol()); - InetSocketAddress address = proxyServerInfo.getAddress(); - Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address); - if (serverSession == null) - { - rst(clientStream); - return null; - } - - final Session clientSession = clientStream.getSession(); - - addRequestProxyHeaders(clientStream, headers); - customizeRequestHeaders(clientStream, headers); - convert(clientSession.getVersion(), serverVersion, headers); - - SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose()); - StreamFrameListener listener = new ProxyStreamFrameListener(clientStream); - StreamPromise promise = new StreamPromise(clientStream, serverSynInfo); - clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise); - serverSession.syn(serverSynInfo, listener, promise); - return this; - } - - private static short getVersion(String protocol) - { - switch (protocol) - { - case "spdy/2": - return SPDY.V2; - case "spdy/3": - return SPDY.V3; - default: - throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol"); - } - } - - @Override - public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) - { - throw new IllegalStateException("We shouldn't receive pushes from clients"); - } - - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - throw new IllegalStateException("Servers do not receive replies"); - } - - @Override - public void onHeaders(Stream stream, HeadersInfo headersInfo) - { - // TODO - throw new UnsupportedOperationException("Not Yet Implemented"); - } - - @Override - public void onData(Stream clientStream, final DataInfo clientDataInfo) - { - if (LOG.isDebugEnabled()) - LOG.debug("C -> P {} on {}", clientDataInfo, clientStream); - - ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose()) - { - @Override - public void consume(int delta) - { - super.consume(delta); - clientDataInfo.consume(delta); - } - }; - - StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE); - streamPromise.data(serverDataInfo); - } - - @Override - public void onFailure(Stream stream, Throwable x) - { - LOG.debug(x); - } - - private Session produceSession(String host, short version, InetSocketAddress address) - { - try - { - Session session = serverSessions.get(host); - if (session == null) - { - SPDYClient client = factory.newSPDYClient(version); - session = client.connect(address, sessionListener); - if (LOG.isDebugEnabled()) - LOG.debug("Proxy session connected to {}", address); - Session existing = serverSessions.putIfAbsent(host, session); - if (existing != null) - { - session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE); - session = existing; - } - } - return session; - } - catch (Exception x) - { - LOG.debug(x); - return null; - } - } - - private void convert(short fromVersion, short toVersion, Fields headers) - { - if (fromVersion != toVersion) - { - for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values()) - { - Fields.Field header = headers.remove(httpHeader.name(fromVersion)); - if (header != null) - { - String toName = httpHeader.name(toVersion); - for (String value : header.getValues()) - headers.add(toName, value); - } - } - } - } - - private void rst(Stream stream) - { - RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM); - stream.getSession().rst(rstInfo, Callback.Adapter.INSTANCE); - } - - private class ProxyPushStreamFrameListener implements StreamFrameListener - { - private PushStreamPromise pushStreamPromise; - - private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise) - { - this.pushStreamPromise = pushStreamPromise; - } - - @Override - public StreamFrameListener onPush(Stream stream, PushInfo pushInfo) - { - if (LOG.isDebugEnabled()) - LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream); - PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo); - this.pushStreamPromise.push(newPushStreamPromise); - return new ProxyPushStreamFrameListener(newPushStreamPromise); - } - - @Override - public void onReply(Stream stream, ReplyInfo replyInfo) - { - // Push streams never send a reply - throw new UnsupportedOperationException(); - } - - @Override - public void onHeaders(Stream stream, HeadersInfo headersInfo) - { - throw new UnsupportedOperationException(); - } - - @Override - public void onData(Stream serverStream, final DataInfo serverDataInfo) - { - if (LOG.isDebugEnabled()) - LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream); - - ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) - { - @Override - public void consume(int delta) - { - super.consume(delta); - serverDataInfo.consume(delta); - } - }; - - pushStreamPromise.data(clientDataInfo); - } - - @Override - public void onFailure(Stream stream, Throwable x) - { - LOG.debug(x); - } - } - - private class ProxyStreamFrameListener extends StreamFrameListener.Adapter - { - private final Stream receiverStream; - - public ProxyStreamFrameListener(Stream receiverStream) - { - this.receiverStream = receiverStream; - } - - @Override - public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo) - { - if (LOG.isDebugEnabled()) - LOG.debug("S -> P {} on {}"); - PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream); - PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo); - receiverStream.push(newPushInfo, pushStreamPromise); - return new ProxyPushStreamFrameListener(pushStreamPromise); - } - - @Override - public void onReply(final Stream stream, ReplyInfo replyInfo) - { - if (LOG.isDebugEnabled()) - LOG.debug("S -> P {} on {}", replyInfo, stream); - final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()), - replyInfo.isClose()); - reply(stream, clientReplyInfo); - } - - private void reply(final Stream stream, final ReplyInfo clientReplyInfo) - { - receiverStream.reply(clientReplyInfo, new Callback() - { - @Override - public void succeeded() - { - if (LOG.isDebugEnabled()) - LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream); - } - - @Override - public void failed(Throwable x) - { - LOG.debug(x); - rst(receiverStream); - } - }); - } - - @Override - public void onHeaders(Stream stream, HeadersInfo headersInfo) - { - // TODO - throw new UnsupportedOperationException("Not Yet Implemented"); - } - - @Override - public void onData(final Stream stream, final DataInfo dataInfo) - { - if (LOG.isDebugEnabled()) - LOG.debug("S -> P {} on {}", dataInfo, stream); - data(stream, dataInfo); - } - - private void data(final Stream stream, final DataInfo serverDataInfo) - { - final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose()) - { - @Override - public void consume(int delta) - { - super.consume(delta); - serverDataInfo.consume(delta); - } - }; - - receiverStream.data(clientDataInfo, new Callback() //TODO: timeout??? - { - @Override - public void succeeded() - { - if (LOG.isDebugEnabled()) - LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream); - } - - @Override - public void failed(Throwable x) - { - LOG.debug(x); - rst(receiverStream); - } - }); - } - } - - /** - * <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice - * versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The - * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client - * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the - * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is - * a SPDY v3 server (and hence with flow control).</p> - */ - private class StreamPromise implements Promise<Stream> - { - private final Queue<DataInfoCallback> queue = new LinkedList<>(); - private final Stream senderStream; - private final Info info; - private Stream receiverStream; - - private StreamPromise(Stream senderStream, Info info) - { - this.senderStream = senderStream; - this.info = info; - } - - @Override - public void succeeded(Stream stream) - { - if (LOG.isDebugEnabled()) - LOG.debug("P -> S {} from {} to {}", info, senderStream, stream); - - stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream); - - DataInfoCallback dataInfoCallback; - synchronized (queue) - { - this.receiverStream = stream; - dataInfoCallback = queue.peek(); - if (dataInfoCallback != null) - { - if (dataInfoCallback.flushing) - { - if (LOG.isDebugEnabled()) - LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size()); - dataInfoCallback = null; - } - else - { - dataInfoCallback.flushing = true; - if (LOG.isDebugEnabled()) - LOG.debug("SYN completed, queue size {}", queue.size()); - } - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("SYN completed, queue empty"); - } - } - if (dataInfoCallback != null) - flush(stream, dataInfoCallback); - } - - @Override - public void failed(Throwable x) - { - LOG.debug(x); - rst(senderStream); - } - - public void data(DataInfo dataInfo) - { - Stream receiverStream; - DataInfoCallback dataInfoCallbackToFlush = null; - DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo); - synchronized (queue) - { - queue.offer(dataInfoCallBackToQueue); - receiverStream = this.receiverStream; - if (receiverStream != null) - { - dataInfoCallbackToFlush = queue.peek(); - if (dataInfoCallbackToFlush.flushing) - { - if (LOG.isDebugEnabled()) - LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size()); - receiverStream = null; - } - else - { - dataInfoCallbackToFlush.flushing = true; - if (LOG.isDebugEnabled()) - LOG.debug("Queued {}, queue size {}", dataInfo, queue.size()); - } - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size()); - } - } - if (receiverStream != null) - flush(receiverStream, dataInfoCallbackToFlush); - } - - private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback) - { - if (LOG.isDebugEnabled()) - LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream); - receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout??? - } - - private class DataInfoCallback implements Callback - { - private final DataInfo dataInfo; - private boolean flushing; - - private DataInfoCallback(DataInfo dataInfo) - { - this.dataInfo = dataInfo; - } - - @Override - public void succeeded() - { - Stream serverStream; - DataInfoCallback dataInfoCallback; - synchronized (queue) - { - serverStream = StreamPromise.this.receiverStream; - assert serverStream != null; - dataInfoCallback = queue.poll(); - assert dataInfoCallback == this; - dataInfoCallback = queue.peek(); - if (dataInfoCallback != null) - { - assert !dataInfoCallback.flushing; - dataInfoCallback.flushing = true; - if (LOG.isDebugEnabled()) - LOG.debug("Completed {}, queue size {}", dataInfo, queue.size()); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Completed {}, queue empty", dataInfo); - } - } - if (dataInfoCallback != null) - flush(serverStream, dataInfoCallback); - } - - @Override - public void failed(Throwable x) - { - LOG.debug(x); - rst(senderStream); - } - } - - public Stream getSenderStream() - { - return senderStream; - } - - public Info getInfo() - { - return info; - } - - public Stream getReceiverStream() - { - synchronized (queue) - { - return receiverStream; - } - } - } - - private class PushStreamPromise extends StreamPromise - { - private volatile PushStreamPromise pushStreamPromise; - - private PushStreamPromise(Stream senderStream, PushInfo pushInfo) - { - super(senderStream, pushInfo); - } - - @Override - public void succeeded(Stream receiverStream) - { - super.succeeded(receiverStream); - - if (LOG.isDebugEnabled()) - LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise); - - PushStreamPromise promise = pushStreamPromise; - if (promise != null) - receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise); - } - - public void push(PushStreamPromise pushStreamPromise) - { - Stream receiverStream = getReceiverStream(); - - if (receiverStream != null) - receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise); - else - this.pushStreamPromise = pushStreamPromise; - } - } - - private class ProxySessionFrameListener extends SessionFrameListener.Adapter - { - @Override - public void onRst(Session serverSession, RstInfo serverRstInfo) - { - Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId()); - if (serverStream != null) - { - Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE); - if (clientStream != null) - { - Session clientSession = clientStream.getSession(); - RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus()); - clientSession.rst(clientRstInfo, Callback.Adapter.INSTANCE); - } - } - } - - @Override - public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo) - { - serverSessions.values().remove(serverSession); - } - } - - private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to) - { - Fields headersToConvert = pushInfo.getHeaders(); - Fields headers = convertHeaders(from, to, headersToConvert); - return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose()); - } - - private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert) - { - Fields headers = new Fields(headersToConvert, false); - addResponseProxyHeaders(from, headers); - customizeResponseHeaders(from, headers); - convert(from.getSession().getVersion(), to.getSession().getVersion(), headers); - return headers; - } -} |