Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java')
-rw-r--r--jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java631
1 files changed, 0 insertions, 631 deletions
diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java
deleted file mode 100644
index 349672abde..0000000000
--- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/SPDYProxyEngine.java
+++ /dev/null
@@ -1,631 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
-// ------------------------------------------------------------------------
-// All rights reserved. This program and the accompanying materials
-// are made available under the terms of the Eclipse Public License v1.0
-// and Apache License v2.0 which accompanies this distribution.
-//
-// The Eclipse Public License is available at
-// http://www.eclipse.org/legal/epl-v10.html
-//
-// The Apache License v2.0 is available at
-// http://www.opensource.org/licenses/apache2.0.php
-//
-// You may elect to redistribute this code under either of these licenses.
-// ========================================================================
-//
-
-
-package org.eclipse.jetty.spdy.server.proxy;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
-import org.eclipse.jetty.spdy.api.DataInfo;
-import org.eclipse.jetty.spdy.api.GoAwayInfo;
-import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
-import org.eclipse.jetty.spdy.api.HeadersInfo;
-import org.eclipse.jetty.spdy.api.Info;
-import org.eclipse.jetty.spdy.api.PushInfo;
-import org.eclipse.jetty.spdy.api.ReplyInfo;
-import org.eclipse.jetty.spdy.api.RstInfo;
-import org.eclipse.jetty.spdy.api.SPDY;
-import org.eclipse.jetty.spdy.api.Session;
-import org.eclipse.jetty.spdy.api.SessionFrameListener;
-import org.eclipse.jetty.spdy.api.Stream;
-import org.eclipse.jetty.spdy.api.StreamFrameListener;
-import org.eclipse.jetty.spdy.api.StreamStatus;
-import org.eclipse.jetty.spdy.api.SynInfo;
-import org.eclipse.jetty.spdy.client.SPDYClient;
-import org.eclipse.jetty.spdy.http.HTTPSPDYHeader;
-import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.Fields;
-import org.eclipse.jetty.util.Promise;
-import org.eclipse.jetty.util.log.Log;
-import org.eclipse.jetty.util.log.Logger;
-
-/**
- * <p>{@link SPDYProxyEngine} implements a SPDY to SPDY proxy, that is, converts SPDY events received by clients into
- * SPDY events for the servers.</p>
- */
-public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
-{
- private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
-
- private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
- private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
-
- private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
- private final SessionFrameListener sessionListener = new ProxySessionFrameListener();
- private final SPDYClient.Factory factory;
- private volatile long connectTimeout = 15000;
- private volatile long timeout = 60000;
-
- public SPDYProxyEngine(SPDYClient.Factory factory)
- {
- this.factory = factory;
- }
-
- public long getConnectTimeout()
- {
- return connectTimeout;
- }
-
- public void setConnectTimeout(long connectTimeout)
- {
- this.connectTimeout = connectTimeout;
- }
-
- public long getTimeout()
- {
- return timeout;
- }
-
- public void setTimeout(long timeout)
- {
- this.timeout = timeout;
- }
-
- public StreamFrameListener proxy(final Stream clientStream, SynInfo clientSynInfo, ProxyEngineSelector.ProxyServerInfo proxyServerInfo)
- {
- Fields headers = new Fields(clientSynInfo.getHeaders(), false);
-
- short serverVersion = getVersion(proxyServerInfo.getProtocol());
- InetSocketAddress address = proxyServerInfo.getAddress();
- Session serverSession = produceSession(proxyServerInfo.getHost(), serverVersion, address);
- if (serverSession == null)
- {
- rst(clientStream);
- return null;
- }
-
- final Session clientSession = clientStream.getSession();
-
- addRequestProxyHeaders(clientStream, headers);
- customizeRequestHeaders(clientStream, headers);
- convert(clientSession.getVersion(), serverVersion, headers);
-
- SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
- StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
- StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
- clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
- serverSession.syn(serverSynInfo, listener, promise);
- return this;
- }
-
- private static short getVersion(String protocol)
- {
- switch (protocol)
- {
- case "spdy/2":
- return SPDY.V2;
- case "spdy/3":
- return SPDY.V3;
- default:
- throw new IllegalArgumentException("Procotol: " + protocol + " is not a known SPDY protocol");
- }
- }
-
- @Override
- public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
- {
- throw new IllegalStateException("We shouldn't receive pushes from clients");
- }
-
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- throw new IllegalStateException("Servers do not receive replies");
- }
-
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- // TODO
- throw new UnsupportedOperationException("Not Yet Implemented");
- }
-
- @Override
- public void onData(Stream clientStream, final DataInfo clientDataInfo)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("C -> P {} on {}", clientDataInfo, clientStream);
-
- ByteBufferDataInfo serverDataInfo = new ByteBufferDataInfo(clientDataInfo.asByteBuffer(false), clientDataInfo.isClose())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- clientDataInfo.consume(delta);
- }
- };
-
- StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
- streamPromise.data(serverDataInfo);
- }
-
- @Override
- public void onFailure(Stream stream, Throwable x)
- {
- LOG.debug(x);
- }
-
- private Session produceSession(String host, short version, InetSocketAddress address)
- {
- try
- {
- Session session = serverSessions.get(host);
- if (session == null)
- {
- SPDYClient client = factory.newSPDYClient(version);
- session = client.connect(address, sessionListener);
- if (LOG.isDebugEnabled())
- LOG.debug("Proxy session connected to {}", address);
- Session existing = serverSessions.putIfAbsent(host, session);
- if (existing != null)
- {
- session.goAway(new GoAwayInfo(), Callback.Adapter.INSTANCE);
- session = existing;
- }
- }
- return session;
- }
- catch (Exception x)
- {
- LOG.debug(x);
- return null;
- }
- }
-
- private void convert(short fromVersion, short toVersion, Fields headers)
- {
- if (fromVersion != toVersion)
- {
- for (HTTPSPDYHeader httpHeader : HTTPSPDYHeader.values())
- {
- Fields.Field header = headers.remove(httpHeader.name(fromVersion));
- if (header != null)
- {
- String toName = httpHeader.name(toVersion);
- for (String value : header.getValues())
- headers.add(toName, value);
- }
- }
- }
- }
-
- private void rst(Stream stream)
- {
- RstInfo rstInfo = new RstInfo(stream.getId(), StreamStatus.REFUSED_STREAM);
- stream.getSession().rst(rstInfo, Callback.Adapter.INSTANCE);
- }
-
- private class ProxyPushStreamFrameListener implements StreamFrameListener
- {
- private PushStreamPromise pushStreamPromise;
-
- private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
- {
- this.pushStreamPromise = pushStreamPromise;
- }
-
- @Override
- public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
- PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
- this.pushStreamPromise.push(newPushStreamPromise);
- return new ProxyPushStreamFrameListener(newPushStreamPromise);
- }
-
- @Override
- public void onReply(Stream stream, ReplyInfo replyInfo)
- {
- // Push streams never send a reply
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void onData(Stream serverStream, final DataInfo serverDataInfo)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
-
- ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- serverDataInfo.consume(delta);
- }
- };
-
- pushStreamPromise.data(clientDataInfo);
- }
-
- @Override
- public void onFailure(Stream stream, Throwable x)
- {
- LOG.debug(x);
- }
- }
-
- private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
- {
- private final Stream receiverStream;
-
- public ProxyStreamFrameListener(Stream receiverStream)
- {
- this.receiverStream = receiverStream;
- }
-
- @Override
- public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("S -> P {} on {}");
- PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
- PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
- receiverStream.push(newPushInfo, pushStreamPromise);
- return new ProxyPushStreamFrameListener(pushStreamPromise);
- }
-
- @Override
- public void onReply(final Stream stream, ReplyInfo replyInfo)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("S -> P {} on {}", replyInfo, stream);
- final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
- replyInfo.isClose());
- reply(stream, clientReplyInfo);
- }
-
- private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
- {
- receiverStream.reply(clientReplyInfo, new Callback()
- {
- @Override
- public void succeeded()
- {
- if (LOG.isDebugEnabled())
- LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
- }
-
- @Override
- public void failed(Throwable x)
- {
- LOG.debug(x);
- rst(receiverStream);
- }
- });
- }
-
- @Override
- public void onHeaders(Stream stream, HeadersInfo headersInfo)
- {
- // TODO
- throw new UnsupportedOperationException("Not Yet Implemented");
- }
-
- @Override
- public void onData(final Stream stream, final DataInfo dataInfo)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("S -> P {} on {}", dataInfo, stream);
- data(stream, dataInfo);
- }
-
- private void data(final Stream stream, final DataInfo serverDataInfo)
- {
- final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
- {
- @Override
- public void consume(int delta)
- {
- super.consume(delta);
- serverDataInfo.consume(delta);
- }
- };
-
- receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
- {
- @Override
- public void succeeded()
- {
- if (LOG.isDebugEnabled())
- LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
- }
-
- @Override
- public void failed(Throwable x)
- {
- LOG.debug(x);
- rst(receiverStream);
- }
- });
- }
- }
-
- /**
- * <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
- * versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
- * buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
- * before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
- * server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
- * a SPDY v3 server (and hence with flow control).</p>
- */
- private class StreamPromise implements Promise<Stream>
- {
- private final Queue<DataInfoCallback> queue = new LinkedList<>();
- private final Stream senderStream;
- private final Info info;
- private Stream receiverStream;
-
- private StreamPromise(Stream senderStream, Info info)
- {
- this.senderStream = senderStream;
- this.info = info;
- }
-
- @Override
- public void succeeded(Stream stream)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
-
- stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
-
- DataInfoCallback dataInfoCallback;
- synchronized (queue)
- {
- this.receiverStream = stream;
- dataInfoCallback = queue.peek();
- if (dataInfoCallback != null)
- {
- if (dataInfoCallback.flushing)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
- dataInfoCallback = null;
- }
- else
- {
- dataInfoCallback.flushing = true;
- if (LOG.isDebugEnabled())
- LOG.debug("SYN completed, queue size {}", queue.size());
- }
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("SYN completed, queue empty");
- }
- }
- if (dataInfoCallback != null)
- flush(stream, dataInfoCallback);
- }
-
- @Override
- public void failed(Throwable x)
- {
- LOG.debug(x);
- rst(senderStream);
- }
-
- public void data(DataInfo dataInfo)
- {
- Stream receiverStream;
- DataInfoCallback dataInfoCallbackToFlush = null;
- DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
- synchronized (queue)
- {
- queue.offer(dataInfoCallBackToQueue);
- receiverStream = this.receiverStream;
- if (receiverStream != null)
- {
- dataInfoCallbackToFlush = queue.peek();
- if (dataInfoCallbackToFlush.flushing)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
- receiverStream = null;
- }
- else
- {
- dataInfoCallbackToFlush.flushing = true;
- if (LOG.isDebugEnabled())
- LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
- }
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
- }
- }
- if (receiverStream != null)
- flush(receiverStream, dataInfoCallbackToFlush);
- }
-
- private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
- receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
- }
-
- private class DataInfoCallback implements Callback
- {
- private final DataInfo dataInfo;
- private boolean flushing;
-
- private DataInfoCallback(DataInfo dataInfo)
- {
- this.dataInfo = dataInfo;
- }
-
- @Override
- public void succeeded()
- {
- Stream serverStream;
- DataInfoCallback dataInfoCallback;
- synchronized (queue)
- {
- serverStream = StreamPromise.this.receiverStream;
- assert serverStream != null;
- dataInfoCallback = queue.poll();
- assert dataInfoCallback == this;
- dataInfoCallback = queue.peek();
- if (dataInfoCallback != null)
- {
- assert !dataInfoCallback.flushing;
- dataInfoCallback.flushing = true;
- if (LOG.isDebugEnabled())
- LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("Completed {}, queue empty", dataInfo);
- }
- }
- if (dataInfoCallback != null)
- flush(serverStream, dataInfoCallback);
- }
-
- @Override
- public void failed(Throwable x)
- {
- LOG.debug(x);
- rst(senderStream);
- }
- }
-
- public Stream getSenderStream()
- {
- return senderStream;
- }
-
- public Info getInfo()
- {
- return info;
- }
-
- public Stream getReceiverStream()
- {
- synchronized (queue)
- {
- return receiverStream;
- }
- }
- }
-
- private class PushStreamPromise extends StreamPromise
- {
- private volatile PushStreamPromise pushStreamPromise;
-
- private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
- {
- super(senderStream, pushInfo);
- }
-
- @Override
- public void succeeded(Stream receiverStream)
- {
- super.succeeded(receiverStream);
-
- if (LOG.isDebugEnabled())
- LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
-
- PushStreamPromise promise = pushStreamPromise;
- if (promise != null)
- receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
- }
-
- public void push(PushStreamPromise pushStreamPromise)
- {
- Stream receiverStream = getReceiverStream();
-
- if (receiverStream != null)
- receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
- else
- this.pushStreamPromise = pushStreamPromise;
- }
- }
-
- private class ProxySessionFrameListener extends SessionFrameListener.Adapter
- {
- @Override
- public void onRst(Session serverSession, RstInfo serverRstInfo)
- {
- Stream serverStream = serverSession.getStream(serverRstInfo.getStreamId());
- if (serverStream != null)
- {
- Stream clientStream = (Stream)serverStream.getAttribute(CLIENT_STREAM_ATTRIBUTE);
- if (clientStream != null)
- {
- Session clientSession = clientStream.getSession();
- RstInfo clientRstInfo = new RstInfo(clientStream.getId(), serverRstInfo.getStreamStatus());
- clientSession.rst(clientRstInfo, Callback.Adapter.INSTANCE);
- }
- }
- }
-
- @Override
- public void onGoAway(Session serverSession, GoAwayResultInfo goAwayResultInfo)
- {
- serverSessions.values().remove(serverSession);
- }
- }
-
- private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
- {
- Fields headersToConvert = pushInfo.getHeaders();
- Fields headers = convertHeaders(from, to, headersToConvert);
- return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
- }
-
- private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
- {
- Fields headers = new Fields(headersToConvert, false);
- addResponseProxyHeaders(from, headers);
- customizeResponseHeaders(from, headers);
- convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
- return headers;
- }
-}

Back to the top