diff options
author | Eike Stepper | 2012-06-06 09:15:05 +0000 |
---|---|---|
committer | Eike Stepper | 2012-06-06 09:15:05 +0000 |
commit | beaf88a6d18849a3d00476ca046a111f636ace5a (patch) | |
tree | b4dbc0ddc54963632692f0d0db83877dabd73a57 /plugins/org.eclipse.net4j | |
parent | e5bc42789a5fa73d5f2c9a2dac4f0d0c177f9b63 (diff) | |
download | cdo-beaf88a6d18849a3d00476ca046a111f636ace5a.tar.gz cdo-beaf88a6d18849a3d00476ca046a111f636ace5a.tar.xz cdo-beaf88a6d18849a3d00476ca046a111f636ace5a.zip |
Update Javadocs
Diffstat (limited to 'plugins/org.eclipse.net4j')
35 files changed, 2992 insertions, 2911 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/doc-files/acceptors.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/doc-files/acceptors.png Binary files differnew file mode 100644 index 0000000000..7a16b6ef01 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/doc-files/acceptors.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/package-info.java index 4d3df6c7bd..a97cf9611f 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/acceptor/package-info.java @@ -1,15 +1,18 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * The Net4j transport layer concepts for dealing with acceptors.
- */
-package org.eclipse.net4j.acceptor;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * The Net4j transport layer concepts for dealing with acceptors. + * <p> + * <img src="doc-files/acceptors.png" title="Diagram Acceptors" border="0"/> + */ +package org.eclipse.net4j.acceptor; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/doc-files/buffers.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/doc-files/buffers.png Binary files differnew file mode 100644 index 0000000000..b7f851ae36 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/doc-files/buffers.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/package-info.java index b3f9f777b4..a2566f6390 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/buffer/package-info.java @@ -1,15 +1,18 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * The Net4j transport layer concepts for dealing with buffers.
- */
-package org.eclipse.net4j.buffer;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * The Net4j transport layer concepts for dealing with buffers. + * <p> + * <img src="doc-files/buffers.png" title="Diagram Buffers" border="0"/> + */ +package org.eclipse.net4j.buffer; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/doc-files/channels.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/doc-files/channels.png Binary files differnew file mode 100644 index 0000000000..d45c55116f --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/doc-files/channels.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/package-info.java index 70c33b6374..cc098193c6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/channel/package-info.java @@ -1,15 +1,18 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * The Net4j transport layer concepts for dealing with channels.
- */
-package org.eclipse.net4j.channel;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * The Net4j transport layer concepts for dealing with channels. + * <p> + * <img src="doc-files/channels.png" title="Diagram Channels" border="0"/> + */ +package org.eclipse.net4j.channel; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/doc-files/connectors.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/doc-files/connectors.png Binary files differnew file mode 100644 index 0000000000..741ec4598e --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/doc-files/connectors.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/package-info.java index aa7dd0a97b..387496c248 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/connector/package-info.java @@ -1,15 +1,18 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * The Net4j transport layer concepts for dealing with connectors.
- */
-package org.eclipse.net4j.connector;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * The Net4j transport layer concepts for dealing with connectors. + * <p> + * <img src="doc-files/connectors.png" title="Diagram Connectors" border="0"/> + */ +package org.eclipse.net4j.connector; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/doc-files/architecture.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/doc-files/architecture.png Binary files differnew file mode 100644 index 0000000000..93eb57f313 --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/doc-files/architecture.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/package-info.java index 13356f72d0..7e4a6921ea 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/package-info.java @@ -1,36 +1,27 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * The Net4j transport layer.
- * The five main interfaces of the transport layer are:
- * <ul>
- * <li>{@link org.eclipse.net4j.buffer.IBuffer}</li>
- * <li>{@link org.eclipse.net4j.channel.IChannel}</li>
- * <li>{@link org.eclipse.net4j.acceptor.IAcceptor}</li>
- * <li>{@link org.eclipse.net4j.connector.IConnector}</li>
- * <li>{@link org.eclipse.net4j.protocol.IProtocol}</li>
- * </ul>
- * <p>
- *
- * <dt><b>Sequence Diagram: Communication Process</b></dt>
- * <dd> <img src="doc-files/CommunicationProcess.jpg" title="Communication Process" border="0"
- * usemap="#CommunicationProcess.jpg"/></dd>
- * <p>
- * <MAP NAME="CommunicationProcess.jpg">
- * <AREA SHAPE="RECT" COORDS="128,94,247,123" HREF="IConnector.html">
- * <AREA SHAPE="RECT" COORDS="648,95,767,123" HREF="IConnector.html">
- * <AREA SHAPE="RECT" COORDS="509,254,608,283" HREF="IChannel.html">
- * <AREA SHAPE="RECT" COORDS="287,355,387,383" HREF="IChannel.html">
- * <AREA SHAPE="RECT" COORDS="818,195,897,222" HREF="IProtocol.html">
- * </MAP>
- */
-package org.eclipse.net4j;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * The Net4j transport layer. + * <p> + * <img src="doc-files/architecture.png" title="Diagram Architecture" border="0"/> + * <p> + * The five main interfaces of the transport layer are: + * <ul> + * <li>{@link org.eclipse.net4j.buffer.IBuffer}</li> + * <li>{@link org.eclipse.net4j.channel.IChannel}</li> + * <li>{@link org.eclipse.net4j.acceptor.IAcceptor}</li> + * <li>{@link org.eclipse.net4j.connector.IConnector}</li> + * <li>{@link org.eclipse.net4j.protocol.IProtocol}</li> + * </ul> + */ +package org.eclipse.net4j; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java index 882111c565..2e3bcc8d0b 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocol.java @@ -1,41 +1,43 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.protocol;
-
-import org.eclipse.net4j.ILocationAware;
-import org.eclipse.net4j.buffer.IBufferHandler;
-import org.eclipse.net4j.buffer.IBufferProvider;
-import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.util.security.IUserAware;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * @author Eike Stepper
- */
-public interface IProtocol<INFRA_STRUCTURE> extends IUserAware, ILocationAware, IBufferHandler
-{
- public String getType();
-
- public IChannel getChannel();
-
- public void setChannel(IChannel channel);
-
- public INFRA_STRUCTURE getInfraStructure();
-
- public void setInfraStructure(INFRA_STRUCTURE infraStructure);
-
- public IBufferProvider getBufferProvider();
-
- public ExecutorService getExecutorService();
-
- public void setExecutorService(ExecutorService executorService);
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.protocol; + +import org.eclipse.net4j.ILocationAware; +import org.eclipse.net4j.buffer.IBufferHandler; +import org.eclipse.net4j.buffer.IBufferProvider; +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.util.security.IUserAware; + +import java.util.concurrent.ExecutorService; + +/** + * A {@link #getType() typed} {@link IBufferHandler buffer handler} for a {@link #getChannel() channel}. + * + * @author Eike Stepper + */ +public interface IProtocol<INFRA_STRUCTURE> extends IUserAware, ILocationAware, IBufferHandler +{ + public String getType(); + + public IChannel getChannel(); + + public void setChannel(IChannel channel); + + public INFRA_STRUCTURE getInfraStructure(); + + public void setInfraStructure(INFRA_STRUCTURE infraStructure); + + public IBufferProvider getBufferProvider(); + + public ExecutorService getExecutorService(); + + public void setExecutorService(ExecutorService executorService); +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocolProvider.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocolProvider.java index ce7ecd9b0e..5aa3f5d298 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocolProvider.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/IProtocolProvider.java @@ -1,22 +1,24 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.protocol;
-
-/**
- * @author Eike Stepper
- */
-public interface IProtocolProvider
-{
- /**
- * @since 2.0
- */
- public IProtocol<?> getProtocol(String type);
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.protocol; + +/** + * Provides {@link IProtocol protocol} instances for given types. + * + * @author Eike Stepper + */ +public interface IProtocolProvider +{ + /** + * @since 2.0 + */ + public IProtocol<?> getProtocol(String type); +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/doc-files/protocols.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/doc-files/protocols.png Binary files differnew file mode 100644 index 0000000000..968099381c --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/doc-files/protocols.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/package-info.java index b68bc1d0e6..4e90149fee 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/protocol/package-info.java @@ -1,15 +1,18 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * The Net4j transport layer concepts for dealing with protocols.
- */
-package org.eclipse.net4j.protocol;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * The Net4j transport layer concepts for dealing with protocols. + * <p> + * <img src="doc-files/protocols.png" title="Diagram Protocols" border="0"/> + */ +package org.eclipse.net4j.protocol; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java index 32737022b8..ccb5fc6fa4 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/ISignalProtocol.java @@ -1,43 +1,45 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.protocol.IProtocol;
-import org.eclipse.net4j.util.event.INotifier;
-import org.eclipse.net4j.util.io.IStreamWrapper;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public interface ISignalProtocol<INFRA_STRUCTURE> extends IProtocol<INFRA_STRUCTURE>, INotifier
-{
- public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT;
-
- public static final long DEFAULT_TIMEOUT = 10 * 1000L;
-
- public long getTimeout();
-
- public void setTimeout(long timeout);
-
- public IStreamWrapper getStreamWrapper();
-
- public void setStreamWrapper(IStreamWrapper streamWrapper);
-
- public void addStreamWrapper(IStreamWrapper streamWrapper);
-
- public IChannel open(IConnector connector);
-
- public void close();
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.protocol.IProtocol; +import org.eclipse.net4j.util.event.INotifier; +import org.eclipse.net4j.util.io.IStreamWrapper; + +/** + * A {@link IProtocol protocol} that consists of a number of stream-based {@link Signal signals}. + * + * @author Eike Stepper + * @since 2.0 + */ +public interface ISignalProtocol<INFRA_STRUCTURE> extends IProtocol<INFRA_STRUCTURE>, INotifier +{ + public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT; + + public static final long DEFAULT_TIMEOUT = 10 * 1000L; + + public long getTimeout(); + + public void setTimeout(long timeout); + + public IStreamWrapper getStreamWrapper(); + + public void setStreamWrapper(IStreamWrapper streamWrapper); + + public void addStreamWrapper(IStreamWrapper streamWrapper); + + public IChannel open(IConnector connector); + + public void close(); +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java index 756526aad9..ebef17ad4a 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Indication.java @@ -1,59 +1,61 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-
-/**
- * @author Eike Stepper
- */
-public abstract class Indication extends SignalReactor
-{
- /**
- * @since 2.0
- */
- public Indication(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public Indication(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public Indication(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- @Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- doInput(in);
- }
-
- @Override
- void doExtendedInput(ExtendedDataInputStream in) throws Exception
- {
- indicating(in);
- }
-
- protected abstract void indicating(ExtendedDataInputStream in) throws Exception;
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; + +/** + * Represents the receiver side of a one-way {@link Signal signal}, i.e., one with no response. + * + * @author Eike Stepper + */ +public abstract class Indication extends SignalReactor +{ + /** + * @since 2.0 + */ + public Indication(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public Indication(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public Indication(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + doInput(in); + } + + @Override + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + indicating(in); + } + + protected abstract void indicating(ExtendedDataInputStream in) throws Exception; +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java index 6241a29522..4d25822c1e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithMonitoring.java @@ -1,172 +1,174 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.monitor.OMMonitor;
-import org.eclipse.net4j.util.om.monitor.TimeoutMonitor;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public abstract class IndicationWithMonitoring extends IndicationWithResponse
-{
- private ReportingMonitor monitor;
-
- /**
- * @since 2.0
- */
- public IndicationWithMonitoring(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public IndicationWithMonitoring(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public IndicationWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- @Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- try
- {
- super.execute(in, out);
- }
- finally
- {
- if (monitor != null)
- {
- monitor.done();
- monitor = null;
- }
- }
- }
-
- @Override
- protected final void indicating(ExtendedDataInputStream in) throws Exception
- {
- int monitorProgressSeconds = in.readInt();
- int monitorTimeoutSeconds = in.readInt();
-
- monitor = new ReportingMonitor(monitorProgressSeconds, monitorTimeoutSeconds);
- monitor.begin(OMMonitor.HUNDRED);
-
- indicating(in, monitor.fork(getIndicatingWorkPercent()));
- }
-
- @Override
- protected final void responding(ExtendedDataOutputStream out) throws Exception
- {
- responding(out, monitor.fork(OMMonitor.HUNDRED - getIndicatingWorkPercent()));
- }
-
- protected abstract void indicating(ExtendedDataInputStream in, OMMonitor monitor) throws Exception;
-
- protected abstract void responding(ExtendedDataOutputStream out, OMMonitor monitor) throws Exception;
-
- /**
- * @since 2.0
- */
- protected ExecutorService getMonitoringExecutorService()
- {
- return getProtocol().getExecutorService();
- }
-
- protected int getIndicatingWorkPercent()
- {
- return 99;
- }
-
- void setMonitorCanceled()
- {
- monitor.cancel();
- }
-
- /**
- * @author Eike Stepper
- */
- private final class ReportingMonitor extends TimeoutMonitor
- {
- private TimerTask sendProgressTask = new TimerTask()
- {
- @Override
- public void run()
- {
- try
- {
- sendProgress();
- }
- catch (Exception ex)
- {
- OM.LOG.error("ReportingMonitorTask failed", ex);
- }
- }
- };
-
- public ReportingMonitor(int monitorProgressSeconds, int monitorTimeoutSeconds)
- {
- super(1000L * monitorTimeoutSeconds);
- long period = 1000L * monitorProgressSeconds;
- scheduleAtFixedRate(sendProgressTask, period, period);
- }
-
- @Override
- public void cancel(RuntimeException cancelException)
- {
- sendProgressTask.cancel();
- super.cancel(cancelException);
- }
-
- @Override
- public void done()
- {
- sendProgressTask.cancel();
- super.done();
- }
-
- private void sendProgress()
- {
- try
- {
- new MonitorProgressRequest(getProtocol(), -getCorrelationID(), getTotalWork(), getWork()).sendAsync();
- }
- catch (Exception ex)
- {
- if (LifecycleUtil.isActive(getProtocol().getChannel()))
- {
- OM.LOG.error(ex);
- }
- }
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.monitor.OMMonitor; +import org.eclipse.net4j.util.om.monitor.TimeoutMonitor; + +import org.eclipse.internal.net4j.bundle.OM; + +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; + +/** + * Represents the receiver side of a two-way {@link IndicationWithResponse signal} with additional support for remote progress monitoring. + * + * @author Eike Stepper + * @since 2.0 + */ +public abstract class IndicationWithMonitoring extends IndicationWithResponse +{ + private ReportingMonitor monitor; + + /** + * @since 2.0 + */ + public IndicationWithMonitoring(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public IndicationWithMonitoring(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public IndicationWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + try + { + super.execute(in, out); + } + finally + { + if (monitor != null) + { + monitor.done(); + monitor = null; + } + } + } + + @Override + protected final void indicating(ExtendedDataInputStream in) throws Exception + { + int monitorProgressSeconds = in.readInt(); + int monitorTimeoutSeconds = in.readInt(); + + monitor = new ReportingMonitor(monitorProgressSeconds, monitorTimeoutSeconds); + monitor.begin(OMMonitor.HUNDRED); + + indicating(in, monitor.fork(getIndicatingWorkPercent())); + } + + @Override + protected final void responding(ExtendedDataOutputStream out) throws Exception + { + responding(out, monitor.fork(OMMonitor.HUNDRED - getIndicatingWorkPercent())); + } + + protected abstract void indicating(ExtendedDataInputStream in, OMMonitor monitor) throws Exception; + + protected abstract void responding(ExtendedDataOutputStream out, OMMonitor monitor) throws Exception; + + /** + * @since 2.0 + */ + protected ExecutorService getMonitoringExecutorService() + { + return getProtocol().getExecutorService(); + } + + protected int getIndicatingWorkPercent() + { + return 99; + } + + void setMonitorCanceled() + { + monitor.cancel(); + } + + /** + * @author Eike Stepper + */ + private final class ReportingMonitor extends TimeoutMonitor + { + private TimerTask sendProgressTask = new TimerTask() + { + @Override + public void run() + { + try + { + sendProgress(); + } + catch (Exception ex) + { + OM.LOG.error("ReportingMonitorTask failed", ex); + } + } + }; + + public ReportingMonitor(int monitorProgressSeconds, int monitorTimeoutSeconds) + { + super(1000L * monitorTimeoutSeconds); + long period = 1000L * monitorProgressSeconds; + scheduleAtFixedRate(sendProgressTask, period, period); + } + + @Override + public void cancel(RuntimeException cancelException) + { + sendProgressTask.cancel(); + super.cancel(cancelException); + } + + @Override + public void done() + { + sendProgressTask.cancel(); + super.done(); + } + + private void sendProgress() + { + try + { + new MonitorProgressRequest(getProtocol(), -getCorrelationID(), getTotalWork(), getWork()).sendAsync(); + } + catch (Exception ex) + { + if (LifecycleUtil.isActive(getProtocol().getChannel())) + { + OM.LOG.error(ex); + } + } + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java index 209f2f1bd3..b5b7412793 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/IndicationWithResponse.java @@ -1,106 +1,108 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.StringUtil;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-
-/**
- * @author Eike Stepper
- */
-public abstract class IndicationWithResponse extends SignalReactor
-{
- /**
- * @since 2.0
- */
- public IndicationWithResponse(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public IndicationWithResponse(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public IndicationWithResponse(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- /**
- * @since 2.0
- */
- protected String getExceptionMessage(Throwable t)
- {
- return StringUtil.formatException(t);
- }
-
- @Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- boolean responding = false;
-
- try
- {
- doInput(in);
- responding = true;
- doOutput(out);
- }
- catch (Error ex)
- {
- sendExceptionSignal(ex, responding);
- throw ex;
- }
- catch (Exception ex)
- {
- sendExceptionSignal(ex, responding);
- throw ex;
- }
- }
-
- protected abstract void indicating(ExtendedDataInputStream in) throws Exception;
-
- /**
- * <b>Important Note:</b> The response must not be empty, i.e. the stream must be used at least to write a
- * <code>boolean</code>. Otherwise synchronization problems will result!
- */
- protected abstract void responding(ExtendedDataOutputStream out) throws Exception;
-
- @Override
- void doExtendedInput(ExtendedDataInputStream in) throws Exception
- {
- indicating(in);
- }
-
- @Override
- void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
- {
- responding(out);
- }
-
- void sendExceptionSignal(Throwable t, boolean responding) throws Exception
- {
- SignalProtocol<?> protocol = getProtocol();
- int correlationID = -getCorrelationID();
- String message = getExceptionMessage(t);
- new RemoteExceptionRequest(protocol, correlationID, responding, message, t).sendAsync();
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.StringUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; + +/** + * Represents the receiver side of a two-way {@link SignalReactor signal}, i.e., one with a response. + * + * @author Eike Stepper + */ +public abstract class IndicationWithResponse extends SignalReactor +{ + /** + * @since 2.0 + */ + public IndicationWithResponse(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public IndicationWithResponse(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public IndicationWithResponse(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + /** + * @since 2.0 + */ + protected String getExceptionMessage(Throwable t) + { + return StringUtil.formatException(t); + } + + @Override + protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + boolean responding = false; + + try + { + doInput(in); + responding = true; + doOutput(out); + } + catch (Error ex) + { + sendExceptionSignal(ex, responding); + throw ex; + } + catch (Exception ex) + { + sendExceptionSignal(ex, responding); + throw ex; + } + } + + protected abstract void indicating(ExtendedDataInputStream in) throws Exception; + + /** + * <b>Important Note:</b> The response must not be empty, i.e. the stream must be used at least to write a + * <code>boolean</code>. Otherwise synchronization problems will result! + */ + protected abstract void responding(ExtendedDataOutputStream out) throws Exception; + + @Override + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + indicating(in); + } + + @Override + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + responding(out); + } + + void sendExceptionSignal(Throwable t, boolean responding) throws Exception + { + SignalProtocol<?> protocol = getProtocol(); + int correlationID = -getCorrelationID(); + String message = getExceptionMessage(t); + new RemoteExceptionRequest(protocol, correlationID, responding, message, t).sendAsync(); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java index 6c805c51bf..60ba0fa671 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RemoteException.java @@ -1,74 +1,76 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public class RemoteException extends RuntimeException
-{
- private static final long serialVersionUID = 1L;
-
- private boolean whileResponding;
-
- private transient RequestWithConfirmation<?> localRequest;
-
- private StackTraceElement[] localStackTrace;
-
- /**
- * @since 4.0
- */
- public RemoteException(Throwable remoteCause, RequestWithConfirmation<?> localRequest, boolean whileResponding)
- {
- super(remoteCause);
- this.localRequest = localRequest;
- this.whileResponding = whileResponding;
- }
-
- public RemoteException(String message, boolean whileResponding)
- {
- super(message);
- this.whileResponding = whileResponding;
- }
-
- public boolean whileResponding()
- {
- return whileResponding;
- }
-
- /**
- * @since 4.0
- */
- public RequestWithConfirmation<?> getLocalRequest()
- {
- return localRequest;
- }
-
- /**
- * @since 4.0
- */
- public void setLocalStacktrace(StackTraceElement[] stackTrace)
- {
- localStackTrace = stackTrace;
- }
-
- /**
- * Returns the local stack as it stood at the time that the <i>remote</i> exception was detected <i>locally</i>. Note
- * that no local problem occurred at the point in the code identified by this stacktrace.
- *
- * @since 4.0
- */
- public StackTraceElement[] getLocalStackTrace()
- {
- return localStackTrace;
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +/** + * An exception that wraps an exception that has been thrown during the execution of a remote {@link SignalReactor signal}. + * + * @author Eike Stepper + * @since 2.0 + */ +public class RemoteException extends RuntimeException +{ + private static final long serialVersionUID = 1L; + + private boolean whileResponding; + + private transient RequestWithConfirmation<?> localRequest; + + private StackTraceElement[] localStackTrace; + + /** + * @since 4.0 + */ + public RemoteException(Throwable remoteCause, RequestWithConfirmation<?> localRequest, boolean whileResponding) + { + super(remoteCause); + this.localRequest = localRequest; + this.whileResponding = whileResponding; + } + + public RemoteException(String message, boolean whileResponding) + { + super(message); + this.whileResponding = whileResponding; + } + + public boolean whileResponding() + { + return whileResponding; + } + + /** + * @since 4.0 + */ + public RequestWithConfirmation<?> getLocalRequest() + { + return localRequest; + } + + /** + * @since 4.0 + */ + public void setLocalStacktrace(StackTraceElement[] stackTrace) + { + localStackTrace = stackTrace; + } + + /** + * Returns the local stack as it stood at the time that the <i>remote</i> exception was detected <i>locally</i>. Note + * that no local problem occurred at the point in the code identified by this stacktrace. + * + * @since 4.0 + */ + public StackTraceElement[] getLocalStackTrace() + { + return localStackTrace; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java index d933142d48..430347652e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java @@ -1,67 +1,69 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-
-/**
- * @author Eike Stepper
- */
-public abstract class Request extends SignalActor
-{
- /**
- * @since 2.0
- */
- public Request(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public Request(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public Request(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- /**
- * @since 2.0
- */
- public void sendAsync() throws Exception
- {
- getProtocol().startSignal(this, getProtocol().getTimeout());
- }
-
- @Override
- void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- doOutput(out);
- }
-
- protected abstract void requesting(ExtendedDataOutputStream out) throws Exception;
-
- @Override
- void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
- {
- requesting(out);
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; + +/** + * Represents the sender side of a one-way {@link Signal signal}, i.e., one with no response. + * + * @author Eike Stepper + */ +public abstract class Request extends SignalActor +{ + /** + * @since 2.0 + */ + public Request(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public Request(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public Request(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + /** + * @since 2.0 + */ + public void sendAsync() throws Exception + { + getProtocol().startSignal(this, getProtocol().getTimeout()); + } + + @Override + void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception + { + doOutput(out); + } + + protected abstract void requesting(ExtendedDataOutputStream out) throws Exception; + + @Override + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + requesting(out); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java index 062caed720..c32c50a81e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java @@ -1,141 +1,143 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-/**
- * @author Eike Stepper
- */
-public abstract class RequestWithConfirmation<RESULT> extends SignalActor
-{
- private RESULT result;
-
- /**
- * @since 2.0
- */
- public RequestWithConfirmation(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public RequestWithConfirmation(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public RequestWithConfirmation(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- /**
- * @since 2.0
- */
- public Future<RESULT> sendAsync()
- {
- ExecutorService executorService = getAsyncExecutorService();
- return executorService.submit(new Callable<RESULT>()
- {
- public RESULT call() throws Exception
- {
- return doSend(getProtocol().getTimeout());
- }
- });
- }
-
- /**
- * @since 2.0
- */
- public RESULT send() throws Exception, RemoteException
- {
- return doSend(getProtocol().getTimeout());
- }
-
- /**
- * @since 2.0
- */
- public RESULT send(long timeout) throws Exception, RemoteException
- {
- return doSend(timeout);
- }
-
- RESULT doSend(long timeout) throws Exception
- {
- result = null;
- getProtocol().startSignal(this, timeout);
- return result;
- }
-
- /**
- * @since 2.0
- */
- protected ExecutorService getAsyncExecutorService()
- {
- return getProtocol().getExecutorService();
- }
-
- @Override
- void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- doOutput(out);
- doInput(in);
- }
-
- protected abstract void requesting(ExtendedDataOutputStream out) throws Exception;
-
- /**
- * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a
- * <code>boolean</code>. Otherwise synchronization problems will result!
- */
- protected abstract RESULT confirming(ExtendedDataInputStream in) throws Exception;
-
- @Override
- void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
- {
- requesting(out);
- }
-
- @Override
- void doExtendedInput(ExtendedDataInputStream in) throws Exception
- {
- result = confirming(in);
- }
-
- void setRemoteException(Throwable t, boolean responding)
- {
- RemoteException remoteException = getRemoteException(t, responding);
- getBufferInputStream().setException(remoteException);
- }
-
- private RemoteException getRemoteException(Throwable t, boolean responding)
- {
- if (t instanceof RemoteException)
- {
- return (RemoteException)t;
- }
-
- return new RemoteException(t, this, responding);
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * Represents the sender side of a two-way {@link SignalActor signal}, i.e., one with a response. + * + * @author Eike Stepper + */ +public abstract class RequestWithConfirmation<RESULT> extends SignalActor +{ + private RESULT result; + + /** + * @since 2.0 + */ + public RequestWithConfirmation(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public RequestWithConfirmation(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public RequestWithConfirmation(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + /** + * @since 2.0 + */ + public Future<RESULT> sendAsync() + { + ExecutorService executorService = getAsyncExecutorService(); + return executorService.submit(new Callable<RESULT>() + { + public RESULT call() throws Exception + { + return doSend(getProtocol().getTimeout()); + } + }); + } + + /** + * @since 2.0 + */ + public RESULT send() throws Exception, RemoteException + { + return doSend(getProtocol().getTimeout()); + } + + /** + * @since 2.0 + */ + public RESULT send(long timeout) throws Exception, RemoteException + { + return doSend(timeout); + } + + RESULT doSend(long timeout) throws Exception + { + result = null; + getProtocol().startSignal(this, timeout); + return result; + } + + /** + * @since 2.0 + */ + protected ExecutorService getAsyncExecutorService() + { + return getProtocol().getExecutorService(); + } + + @Override + void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception + { + doOutput(out); + doInput(in); + } + + protected abstract void requesting(ExtendedDataOutputStream out) throws Exception; + + /** + * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a + * <code>boolean</code>. Otherwise synchronization problems will result! + */ + protected abstract RESULT confirming(ExtendedDataInputStream in) throws Exception; + + @Override + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + requesting(out); + } + + @Override + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + result = confirming(in); + } + + void setRemoteException(Throwable t, boolean responding) + { + RemoteException remoteException = getRemoteException(t, responding); + getBufferInputStream().setException(remoteException); + } + + private RemoteException getRemoteException(Throwable t, boolean responding) + { + if (t instanceof RemoteException) + { + return (RemoteException)t; + } + + return new RemoteException(t, this, responding); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java index ebcdd509e0..f396e91b39 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java @@ -1,302 +1,304 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.ImplementationError;
-import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.om.monitor.Monitor;
-import org.eclipse.net4j.util.om.monitor.OMMonitor;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT>
-{
- /**
- * @since 2.0
- */
- public static final long DEFAULT_CANCELATION_POLL_INTERVAL = 100;
-
- /**
- * @since 2.0
- */
- public static final int DEFAULT_MONITOR_PROGRESS_SECONDS = 1;
-
- /**
- * @since 2.0
- */
- public static final int DEFAULT_MONITOR_TIMEOUT_SECONDS = 10;
-
- private OMMonitor mainMonitor;
-
- private OMMonitor remoteMonitor;
-
- private Object monitorLock = new Object();
-
- /**
- * @since 2.0
- */
- public RequestWithMonitoring(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public RequestWithMonitoring(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public RequestWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- @Override
- public Future<RESULT> sendAsync()
- {
- initMainMonitor(null);
- return super.sendAsync();
- }
-
- public Future<RESULT> sendAsync(OMMonitor monitor)
- {
- initMainMonitor(monitor);
- return super.sendAsync();
- }
-
- @Override
- public RESULT send() throws Exception, RemoteException
- {
- initMainMonitor(null);
- return super.send();
- }
-
- @Override
- public RESULT send(long timeout) throws Exception, RemoteException
- {
- initMainMonitor(null);
- return super.send(timeout);
- }
-
- public RESULT send(OMMonitor monitor) throws Exception, RemoteException
- {
- initMainMonitor(monitor);
- return super.send();
- }
-
- public RESULT send(long timeout, OMMonitor monitor) throws Exception, RemoteException
- {
- initMainMonitor(monitor);
- return super.send(timeout);
- }
-
- @Override
- protected final void requesting(ExtendedDataOutputStream out) throws Exception
- {
- double remoteWork = OMMonitor.HUNDRED - getRequestingWorkPercent() - getConfirmingWorkPercent();
- if (remoteWork < OMMonitor.ZERO)
- {
- throw new ImplementationError("Remote work must not be negative: " + remoteWork); //$NON-NLS-1$
- }
-
- mainMonitor.begin(OMMonitor.HUNDRED);
- OMMonitor subMonitor = mainMonitor.fork(remoteWork);
- synchronized (monitorLock)
- {
- remoteMonitor = subMonitor;
- }
-
- ExecutorService executorService = getCancelationExecutorService();
- if (executorService != null)
- {
- executorService.execute(new Runnable()
- {
- public void run()
- {
- while (mainMonitor != null)
- {
- ConcurrencyUtil.sleep(getCancelationPollInterval());
- if (mainMonitor != null && mainMonitor.isCanceled())
- {
- try
- {
- new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync();
- }
- catch (Exception ex)
- {
- OM.LOG.error(ex);
- }
-
- return;
- }
- }
- }
- });
- }
-
- out.writeInt(getMonitorProgressSeconds());
- out.writeInt(getMonitorTimeoutSeconds());
- requesting(out, mainMonitor.fork(getRequestingWorkPercent()));
- }
-
- @Override
- protected final RESULT confirming(ExtendedDataInputStream in) throws Exception
- {
- return confirming(in, mainMonitor.fork(getConfirmingWorkPercent()));
- }
-
- protected abstract void requesting(ExtendedDataOutputStream out, OMMonitor monitor) throws Exception;
-
- /**
- * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a
- * <code>boolean</code>. Otherwise synchronization problems will result!
- */
- protected abstract RESULT confirming(ExtendedDataInputStream in, OMMonitor monitor) throws Exception;
-
- /**
- * @since 2.0
- */
- protected ExecutorService getCancelationExecutorService()
- {
- return getProtocol().getExecutorService();
- }
-
- /**
- * @since 2.0
- */
- protected long getCancelationPollInterval()
- {
- return DEFAULT_CANCELATION_POLL_INTERVAL;
- }
-
- /**
- * @since 2.0
- */
- protected int getMonitorProgressSeconds()
- {
- return DEFAULT_MONITOR_PROGRESS_SECONDS;
- }
-
- /**
- * @since 2.0
- */
- protected int getMonitorTimeoutSeconds()
- {
- return DEFAULT_MONITOR_TIMEOUT_SECONDS;
- }
-
- /**
- * @since 2.0
- */
- protected int getRequestingWorkPercent()
- {
- return 2;
- }
-
- /**
- * @since 2.0
- */
- protected int getConfirmingWorkPercent()
- {
- return 1;
- }
-
- @Override
- void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- try
- {
- super.doExecute(in, out);
- }
- finally
- {
- synchronized (monitorLock)
- {
- try
- {
- if (remoteMonitor != null)
- {
- remoteMonitor.done();
- }
- }
- catch (Exception ex)
- {
- OM.LOG.error(ex);
- }
- finally
- {
- remoteMonitor = null;
- }
- }
-
- try
- {
- if (mainMonitor != null)
- {
- mainMonitor.done();
- }
- }
- finally
- {
- mainMonitor = null;
- }
- }
- }
-
- void setMonitorProgress(double totalWork, double work)
- {
- getBufferInputStream().restartTimeout();
- synchronized (monitorLock)
- {
- if (remoteMonitor != null)
- {
- if (!remoteMonitor.hasBegun())
- {
- remoteMonitor.begin(totalWork);
- remoteMonitor.worked(work);
- }
- else
- {
- double oldRatio = remoteMonitor.getWork() / remoteMonitor.getTotalWork();
- double newRatio = work / totalWork;
-
- double newWork = newRatio - oldRatio;
- newWork *= remoteMonitor.getTotalWork();
- if (newWork >= OMMonitor.ZERO)
- {
- remoteMonitor.worked(newWork);
- }
- }
- }
- }
- }
-
- private void initMainMonitor(OMMonitor monitor)
- {
- mainMonitor = monitor == null ? new Monitor() : monitor;
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.ImplementationError; +import org.eclipse.net4j.util.concurrent.ConcurrencyUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.om.monitor.Monitor; +import org.eclipse.net4j.util.om.monitor.OMMonitor; + +import org.eclipse.internal.net4j.bundle.OM; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * Represents the sender side of a two-way {@link IndicationWithResponse signal} with additional support for remote progress monitoring. + * + * @author Eike Stepper + * @since 2.0 + */ +public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmation<RESULT> +{ + /** + * @since 2.0 + */ + public static final long DEFAULT_CANCELATION_POLL_INTERVAL = 100; + + /** + * @since 2.0 + */ + public static final int DEFAULT_MONITOR_PROGRESS_SECONDS = 1; + + /** + * @since 2.0 + */ + public static final int DEFAULT_MONITOR_TIMEOUT_SECONDS = 10; + + private OMMonitor mainMonitor; + + private OMMonitor remoteMonitor; + + private Object monitorLock = new Object(); + + /** + * @since 2.0 + */ + public RequestWithMonitoring(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public RequestWithMonitoring(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public RequestWithMonitoring(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + public Future<RESULT> sendAsync() + { + initMainMonitor(null); + return super.sendAsync(); + } + + public Future<RESULT> sendAsync(OMMonitor monitor) + { + initMainMonitor(monitor); + return super.sendAsync(); + } + + @Override + public RESULT send() throws Exception, RemoteException + { + initMainMonitor(null); + return super.send(); + } + + @Override + public RESULT send(long timeout) throws Exception, RemoteException + { + initMainMonitor(null); + return super.send(timeout); + } + + public RESULT send(OMMonitor monitor) throws Exception, RemoteException + { + initMainMonitor(monitor); + return super.send(); + } + + public RESULT send(long timeout, OMMonitor monitor) throws Exception, RemoteException + { + initMainMonitor(monitor); + return super.send(timeout); + } + + @Override + protected final void requesting(ExtendedDataOutputStream out) throws Exception + { + double remoteWork = OMMonitor.HUNDRED - getRequestingWorkPercent() - getConfirmingWorkPercent(); + if (remoteWork < OMMonitor.ZERO) + { + throw new ImplementationError("Remote work must not be negative: " + remoteWork); //$NON-NLS-1$ + } + + mainMonitor.begin(OMMonitor.HUNDRED); + OMMonitor subMonitor = mainMonitor.fork(remoteWork); + synchronized (monitorLock) + { + remoteMonitor = subMonitor; + } + + ExecutorService executorService = getCancelationExecutorService(); + if (executorService != null) + { + executorService.execute(new Runnable() + { + public void run() + { + while (mainMonitor != null) + { + ConcurrencyUtil.sleep(getCancelationPollInterval()); + if (mainMonitor != null && mainMonitor.isCanceled()) + { + try + { + new MonitorCanceledRequest(getProtocol(), getCorrelationID()).sendAsync(); + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + + return; + } + } + } + }); + } + + out.writeInt(getMonitorProgressSeconds()); + out.writeInt(getMonitorTimeoutSeconds()); + requesting(out, mainMonitor.fork(getRequestingWorkPercent())); + } + + @Override + protected final RESULT confirming(ExtendedDataInputStream in) throws Exception + { + return confirming(in, mainMonitor.fork(getConfirmingWorkPercent())); + } + + protected abstract void requesting(ExtendedDataOutputStream out, OMMonitor monitor) throws Exception; + + /** + * <b>Important Note:</b> The confirmation must not be empty, i.e. the stream must be used at least to read a + * <code>boolean</code>. Otherwise synchronization problems will result! + */ + protected abstract RESULT confirming(ExtendedDataInputStream in, OMMonitor monitor) throws Exception; + + /** + * @since 2.0 + */ + protected ExecutorService getCancelationExecutorService() + { + return getProtocol().getExecutorService(); + } + + /** + * @since 2.0 + */ + protected long getCancelationPollInterval() + { + return DEFAULT_CANCELATION_POLL_INTERVAL; + } + + /** + * @since 2.0 + */ + protected int getMonitorProgressSeconds() + { + return DEFAULT_MONITOR_PROGRESS_SECONDS; + } + + /** + * @since 2.0 + */ + protected int getMonitorTimeoutSeconds() + { + return DEFAULT_MONITOR_TIMEOUT_SECONDS; + } + + /** + * @since 2.0 + */ + protected int getRequestingWorkPercent() + { + return 2; + } + + /** + * @since 2.0 + */ + protected int getConfirmingWorkPercent() + { + return 1; + } + + @Override + void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception + { + try + { + super.doExecute(in, out); + } + finally + { + synchronized (monitorLock) + { + try + { + if (remoteMonitor != null) + { + remoteMonitor.done(); + } + } + catch (Exception ex) + { + OM.LOG.error(ex); + } + finally + { + remoteMonitor = null; + } + } + + try + { + if (mainMonitor != null) + { + mainMonitor.done(); + } + } + finally + { + mainMonitor = null; + } + } + } + + void setMonitorProgress(double totalWork, double work) + { + getBufferInputStream().restartTimeout(); + synchronized (monitorLock) + { + if (remoteMonitor != null) + { + if (!remoteMonitor.hasBegun()) + { + remoteMonitor.begin(totalWork); + remoteMonitor.worked(work); + } + else + { + double oldRatio = remoteMonitor.getWork() / remoteMonitor.getTotalWork(); + double newRatio = work / totalWork; + + double newWork = newRatio - oldRatio; + newWork *= remoteMonitor.getTotalWork(); + if (newWork >= OMMonitor.ZERO) + { + remoteMonitor.worked(newWork); + } + } + } + } + } + + private void initMainMonitor(OMMonitor monitor) + { + mainMonitor = monitor == null ? new Monitor() : monitor; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java index 03ebd2b755..f20dabd880 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java @@ -1,353 +1,355 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.util.ReflectUtil;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.io.IOTimeoutException;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.MessageFormat;
-
-/**
- * @author Eike Stepper
- */
-public abstract class Signal implements Runnable
-{
- /**
- * @since 2.0
- */
- public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT;
-
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Signal.class);
-
- private SignalProtocol<?> protocol;
-
- private short id;
-
- private String name;
-
- private int correlationID;
-
- private BufferInputStream bufferInputStream;
-
- private BufferOutputStream bufferOutputStream;
-
- private Object currentStream;
-
- /**
- * Both implementation classes of a logical signal must have the same signalID. The signalID of a user signals must be
- * equal to or greater than zero.
- *
- * @since 2.0
- */
- public Signal(SignalProtocol<?> protocol, short id, String name)
- {
- this.protocol = protocol;
- this.id = id;
- this.name = name;
- }
-
- /**
- * @since 2.0
- * @see #Signal(SignalProtocol, short, String)
- */
- public Signal(SignalProtocol<?> protocol, short id)
- {
- this(protocol, id, null);
- }
-
- /**
- * @since 2.0
- * @see #Signal(SignalProtocol, short, String)
- */
- public Signal(SignalProtocol<?> protocol, Enum<?> literal)
- {
- this(protocol, (short)literal.ordinal(), literal.name());
- }
-
- public SignalProtocol<?> getProtocol()
- {
- LifecycleUtil.checkActive(protocol);
- return protocol;
- }
-
- /**
- * Returns the short integer ID of this signal that is unique among all signals of the associated
- * {@link #getProtocol() protocol}.
- *
- * @since 2.0
- */
- public final short getID()
- {
- return id;
- }
-
- /**
- * @since 2.0
- */
- public String getName()
- {
- if (name == null)
- {
- // Needs no synchronization because any thread would set the same value.
- name = ReflectUtil.getSimpleClassName(this);
- }
-
- return name;
- }
-
- /**
- * @since 2.0
- */
- public final int getCorrelationID()
- {
- return correlationID;
- }
-
- /**
- * @since 2.0
- */
- @Override
- public String toString()
- {
- return MessageFormat.format("Signal[protocol={0}, id={1}, name={2}, correlation={3}]", getProtocol().getType(), //$NON-NLS-1$
- getID(), getName(), getCorrelationID());
- }
-
- public final void run()
- {
- String threadName = null;
-
- try
- {
- if (OM.SET_SIGNAL_THREAD_NAME)
- {
- threadName = getClass().getSimpleName();
- Thread.currentThread().setName(threadName);
- }
-
- runSync();
- }
- catch (Exception ex)
- {
- if (getProtocol().isActive())
- {
- OM.LOG.error(ex);
- }
- else
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Exception while protocol is inactive", ex); //$NON-NLS-1$
- }
- }
- }
- finally
- {
- if (threadName != null)
- {
- Thread.currentThread().setName(threadName + "(FINISHED)"); //$NON-NLS-1$
- }
- }
- }
-
- protected final BufferInputStream getBufferInputStream()
- {
- return bufferInputStream;
- }
-
- protected final BufferOutputStream getBufferOutputStream()
- {
- return bufferOutputStream;
- }
-
- /**
- * @since 2.0
- */
- protected final void flush() throws IOException
- {
- if (currentStream instanceof OutputStream)
- {
- ((OutputStream)currentStream).flush();
- }
- }
-
- /**
- * @since 2.0
- */
- protected InputStream getCurrentInputStream()
- {
- if (currentStream instanceof InputStream)
- {
- return (InputStream)currentStream;
- }
-
- return null;
- }
-
- /**
- * @since 2.0
- */
- protected OutputStream getCurrentOutputStream()
- {
- if (currentStream instanceof OutputStream)
- {
- return (OutputStream)currentStream;
- }
-
- return null;
- }
-
- protected InputStream wrapInputStream(InputStream in) throws IOException
- {
- currentStream = getProtocol().wrapInputStream(in);
- return (InputStream)currentStream;
- }
-
- protected OutputStream wrapOutputStream(OutputStream out) throws IOException
- {
- currentStream = getProtocol().wrapOutputStream(out);
- return (OutputStream)currentStream;
- }
-
- protected void finishInputStream(InputStream in) throws IOException
- {
- currentStream = null;
- getProtocol().finishInputStream(in);
- }
-
- protected void finishOutputStream(OutputStream out) throws IOException
- {
- currentStream = null;
- getProtocol().finishOutputStream(out);
- }
-
- protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception;
-
- void runSync() throws Exception
- {
- Exception exception = null;
-
- try
- {
- execute(bufferInputStream, bufferOutputStream);
- }
- catch (IOTimeoutException ex) // Thrown from BufferInputStream
- {
- exception = ex.createTimeoutException();
- throw exception;
- }
- catch (Exception ex)
- {
- exception = ex;
- throw exception;
- }
- finally
- {
- getProtocol().stopSignal(this, exception);
- }
- }
-
- void setCorrelationID(int correlationID)
- {
- this.correlationID = correlationID;
- }
-
- void setBufferInputStream(BufferInputStream inputStream)
- {
- bufferInputStream = inputStream;
- }
-
- void setBufferOutputStream(BufferOutputStream outputStream)
- {
- bufferOutputStream = outputStream;
- }
-
- void doOutput(BufferOutputStream out) throws Exception
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("================ {0}: {1}", getOutputMeaning(), this); //$NON-NLS-1$
- }
-
- OutputStream wrappedOutputStream = wrapOutputStream(out);
- ExtendedDataOutputStream extended = ExtendedDataOutputStream.wrap(wrappedOutputStream);
-
- try
- {
- doExtendedOutput(extended);
- }
- catch (Error ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw ex;
- }
- finally
- {
- finishOutputStream(wrappedOutputStream);
- }
-
- out.flushWithEOS();
- }
-
- void doInput(BufferInputStream in) throws Exception
- {
- if (TRACER.isEnabled())
- {
- TRACER.format("================ {0}: {1}", getInputMeaning(), this); //$NON-NLS-1$
- }
-
- InputStream wrappedInputStream = wrapInputStream(in);
- ExtendedDataInputStream extended = ExtendedDataInputStream.wrap(wrappedInputStream);
-
- try
- {
- doExtendedInput(extended);
- }
- catch (Error ex)
- {
- throw ex;
- }
- catch (Exception ex)
- {
- throw ex;
- }
- finally
- {
- finishInputStream(wrappedInputStream);
- }
- }
-
- void doExtendedOutput(ExtendedDataOutputStream out) throws Exception
- {
- }
-
- void doExtendedInput(ExtendedDataInputStream in) throws Exception
- {
- }
-
- abstract String getOutputMeaning();
-
- abstract String getInputMeaning();
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; +import org.eclipse.net4j.util.ReflectUtil; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.io.IOTimeoutException; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.MessageFormat; + +/** + * Represents a single communications use-case in the scope of a {@link ISignalProtocol signal protocol}. + * + * @author Eike Stepper + */ +public abstract class Signal implements Runnable +{ + /** + * @since 2.0 + */ + public static final long NO_TIMEOUT = BufferInputStream.NO_TIMEOUT; + + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, Signal.class); + + private SignalProtocol<?> protocol; + + private short id; + + private String name; + + private int correlationID; + + private BufferInputStream bufferInputStream; + + private BufferOutputStream bufferOutputStream; + + private Object currentStream; + + /** + * Both implementation classes of a logical signal must have the same signalID. The signalID of a user signals must be + * equal to or greater than zero. + * + * @since 2.0 + */ + public Signal(SignalProtocol<?> protocol, short id, String name) + { + this.protocol = protocol; + this.id = id; + this.name = name; + } + + /** + * @since 2.0 + * @see #Signal(SignalProtocol, short, String) + */ + public Signal(SignalProtocol<?> protocol, short id) + { + this(protocol, id, null); + } + + /** + * @since 2.0 + * @see #Signal(SignalProtocol, short, String) + */ + public Signal(SignalProtocol<?> protocol, Enum<?> literal) + { + this(protocol, (short)literal.ordinal(), literal.name()); + } + + public SignalProtocol<?> getProtocol() + { + LifecycleUtil.checkActive(protocol); + return protocol; + } + + /** + * Returns the short integer ID of this signal that is unique among all signals of the associated + * {@link #getProtocol() protocol}. + * + * @since 2.0 + */ + public final short getID() + { + return id; + } + + /** + * @since 2.0 + */ + public String getName() + { + if (name == null) + { + // Needs no synchronization because any thread would set the same value. + name = ReflectUtil.getSimpleClassName(this); + } + + return name; + } + + /** + * @since 2.0 + */ + public final int getCorrelationID() + { + return correlationID; + } + + /** + * @since 2.0 + */ + @Override + public String toString() + { + return MessageFormat.format("Signal[protocol={0}, id={1}, name={2}, correlation={3}]", getProtocol().getType(), //$NON-NLS-1$ + getID(), getName(), getCorrelationID()); + } + + public final void run() + { + String threadName = null; + + try + { + if (OM.SET_SIGNAL_THREAD_NAME) + { + threadName = getClass().getSimpleName(); + Thread.currentThread().setName(threadName); + } + + runSync(); + } + catch (Exception ex) + { + if (getProtocol().isActive()) + { + OM.LOG.error(ex); + } + else + { + if (TRACER.isEnabled()) + { + TRACER.trace("Exception while protocol is inactive", ex); //$NON-NLS-1$ + } + } + } + finally + { + if (threadName != null) + { + Thread.currentThread().setName(threadName + "(FINISHED)"); //$NON-NLS-1$ + } + } + } + + protected final BufferInputStream getBufferInputStream() + { + return bufferInputStream; + } + + protected final BufferOutputStream getBufferOutputStream() + { + return bufferOutputStream; + } + + /** + * @since 2.0 + */ + protected final void flush() throws IOException + { + if (currentStream instanceof OutputStream) + { + ((OutputStream)currentStream).flush(); + } + } + + /** + * @since 2.0 + */ + protected InputStream getCurrentInputStream() + { + if (currentStream instanceof InputStream) + { + return (InputStream)currentStream; + } + + return null; + } + + /** + * @since 2.0 + */ + protected OutputStream getCurrentOutputStream() + { + if (currentStream instanceof OutputStream) + { + return (OutputStream)currentStream; + } + + return null; + } + + protected InputStream wrapInputStream(InputStream in) throws IOException + { + currentStream = getProtocol().wrapInputStream(in); + return (InputStream)currentStream; + } + + protected OutputStream wrapOutputStream(OutputStream out) throws IOException + { + currentStream = getProtocol().wrapOutputStream(out); + return (OutputStream)currentStream; + } + + protected void finishInputStream(InputStream in) throws IOException + { + currentStream = null; + getProtocol().finishInputStream(in); + } + + protected void finishOutputStream(OutputStream out) throws IOException + { + currentStream = null; + getProtocol().finishOutputStream(out); + } + + protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception; + + void runSync() throws Exception + { + Exception exception = null; + + try + { + execute(bufferInputStream, bufferOutputStream); + } + catch (IOTimeoutException ex) // Thrown from BufferInputStream + { + exception = ex.createTimeoutException(); + throw exception; + } + catch (Exception ex) + { + exception = ex; + throw exception; + } + finally + { + getProtocol().stopSignal(this, exception); + } + } + + void setCorrelationID(int correlationID) + { + this.correlationID = correlationID; + } + + void setBufferInputStream(BufferInputStream inputStream) + { + bufferInputStream = inputStream; + } + + void setBufferOutputStream(BufferOutputStream outputStream) + { + bufferOutputStream = outputStream; + } + + void doOutput(BufferOutputStream out) throws Exception + { + if (TRACER.isEnabled()) + { + TRACER.format("================ {0}: {1}", getOutputMeaning(), this); //$NON-NLS-1$ + } + + OutputStream wrappedOutputStream = wrapOutputStream(out); + ExtendedDataOutputStream extended = ExtendedDataOutputStream.wrap(wrappedOutputStream); + + try + { + doExtendedOutput(extended); + } + catch (Error ex) + { + throw ex; + } + catch (Exception ex) + { + throw ex; + } + finally + { + finishOutputStream(wrappedOutputStream); + } + + out.flushWithEOS(); + } + + void doInput(BufferInputStream in) throws Exception + { + if (TRACER.isEnabled()) + { + TRACER.format("================ {0}: {1}", getInputMeaning(), this); //$NON-NLS-1$ + } + + InputStream wrappedInputStream = wrapInputStream(in); + ExtendedDataInputStream extended = ExtendedDataInputStream.wrap(wrappedInputStream); + + try + { + doExtendedInput(extended); + } + catch (Error ex) + { + throw ex; + } + catch (Exception ex) + { + throw ex; + } + finally + { + finishInputStream(wrappedInputStream); + } + } + + void doExtendedOutput(ExtendedDataOutputStream out) throws Exception + { + } + + void doExtendedInput(ExtendedDataInputStream in) throws Exception + { + } + + abstract String getOutputMeaning(); + + abstract String getInputMeaning(); +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java index 8223b337ef..9c511c33cc 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java @@ -1,67 +1,69 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.BufferOutputStream;
-
-/**
- * @author Eike Stepper
- */
-public abstract class SignalActor extends Signal
-{
- /**
- * @since 2.0
- */
- public SignalActor(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- setCorrelationID(protocol.getNextCorrelationID());
- }
-
- /**
- * @since 2.0
- */
- public SignalActor(SignalProtocol<?> protocol, short id)
- {
- super(protocol, id);
- setCorrelationID(protocol.getNextCorrelationID());
- }
-
- /**
- * @since 2.0
- */
- public SignalActor(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- setCorrelationID(protocol.getNextCorrelationID());
- }
-
- @Override
- protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- doExecute(in, out);
- }
-
- abstract void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception;
-
- @Override
- String getInputMeaning()
- {
- return "Confirming"; //$NON-NLS-1$
- }
-
- @Override
- String getOutputMeaning()
- {
- return "Requesting"; //$NON-NLS-1$
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.BufferOutputStream; + +/** + * Represents the sender side of a {@link Signal signal}. + * + * @author Eike Stepper + */ +public abstract class SignalActor extends Signal +{ + /** + * @since 2.0 + */ + public SignalActor(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + setCorrelationID(protocol.getNextCorrelationID()); + } + + /** + * @since 2.0 + */ + public SignalActor(SignalProtocol<?> protocol, short id) + { + super(protocol, id); + setCorrelationID(protocol.getNextCorrelationID()); + } + + /** + * @since 2.0 + */ + public SignalActor(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + setCorrelationID(protocol.getNextCorrelationID()); + } + + @Override + protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception + { + doExecute(in, out); + } + + abstract void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception; + + @Override + String getInputMeaning() + { + return "Confirming"; //$NON-NLS-1$ + } + + @Override + String getOutputMeaning() + { + return "Requesting"; //$NON-NLS-1$ + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java index 235a1f9568..c179d87c9d 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalCounter.java @@ -1,56 +1,67 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.util.collection.HashBag;
-import org.eclipse.net4j.util.event.IEvent;
-import org.eclipse.net4j.util.event.IListener;
-
-/**
- * @author Eike Stepper
- * @since 3.0
- */
-public final class SignalCounter implements IListener
-{
- private HashBag<Class<? extends Signal>> signals = new HashBag<Class<? extends Signal>>();
-
- public SignalCounter()
- {
- }
-
- public int getCountFor(Class<? extends Signal> signal)
- {
- synchronized (signals)
- {
- return signals.getCounterFor(signal);
- }
- }
-
- public void clearCounts()
- {
- synchronized (signals)
- {
- signals.clear();
- }
- }
-
- public void notifyEvent(IEvent event)
- {
- if (event instanceof SignalFinishedEvent)
- {
- synchronized (signals)
- {
- SignalFinishedEvent<?> e = (SignalFinishedEvent<?>)event;
- signals.add(e.getSignal().getClass());
- }
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.collection.HashBag; +import org.eclipse.net4j.util.event.IEvent; +import org.eclipse.net4j.util.event.IListener; + +/** + * Provides {@link Signal signal} execution counts when + * {@link SignalProtocol#addListener(IListener) attached} to a {@link ISignalProtocol signal protocol}. + * + * @author Eike Stepper + * @since 3.0 + */ +public final class SignalCounter implements IListener +{ + private HashBag<Class<? extends Signal>> signals = new HashBag<Class<? extends Signal>>(); + + public SignalCounter() + { + } + + /** + * @since 4.1 + */ + public SignalCounter(ISignalProtocol<?> protocol) + { + protocol.addListener(this); + } + + public int getCountFor(Class<? extends Signal> signal) + { + synchronized (signals) + { + return signals.getCounterFor(signal); + } + } + + public void clearCounts() + { + synchronized (signals) + { + signals.clear(); + } + } + + public void notifyEvent(IEvent event) + { + if (event instanceof SignalFinishedEvent) + { + synchronized (signals) + { + SignalFinishedEvent<?> e = (SignalFinishedEvent<?>)event; + signals.add(e.getSignal().getClass()); + } + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalFinishedEvent.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalFinishedEvent.java index 93f5a37a40..7eb2014cc3 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalFinishedEvent.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalFinishedEvent.java @@ -1,64 +1,68 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.util.event.Event;
-
-/**
- * @author Eike Stepper
- * @since 3.0
- * @noextend This interface is not intended to be extended by clients.
- */
-public class SignalFinishedEvent<INFRA_STRUCTURE> extends Event
-{
- private static final long serialVersionUID = 1L;
-
- private Signal signal;
-
- private Exception exception;
-
- SignalFinishedEvent(ISignalProtocol<INFRA_STRUCTURE> source, Signal signal, Exception exception)
- {
- super(source);
- this.signal = signal;
- this.exception = exception;
- }
-
- @Override
- public ISignalProtocol<INFRA_STRUCTURE> getSource()
- {
- @SuppressWarnings("unchecked")
- ISignalProtocol<INFRA_STRUCTURE> source = (ISignalProtocol<INFRA_STRUCTURE>)super.getSource();
- return source;
- }
-
- public Signal getSignal()
- {
- return signal;
- }
-
- public Exception getException()
- {
- return exception;
- }
-
- @Override
- protected String formatAdditionalParameters()
- {
- String result = "signal=" + signal.getClass().getSimpleName();
- if (exception != null)
- {
- result += ", exception=" + exception.getClass().getSimpleName();
- }
-
- return result;
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.event.Event; +import org.eclipse.net4j.util.event.IEvent; + +/** + * An {@link IEvent event} fired from a {@link ISignalProtocol signal protocol} when + * the local execution of a scheduled {@link #getSignal() signal} has finished. + * + * @author Eike Stepper + * @since 3.0 + * @noextend This interface is not intended to be extended by clients. + */ +public class SignalFinishedEvent<INFRA_STRUCTURE> extends Event +{ + private static final long serialVersionUID = 1L; + + private Signal signal; + + private Exception exception; + + SignalFinishedEvent(ISignalProtocol<INFRA_STRUCTURE> source, Signal signal, Exception exception) + { + super(source); + this.signal = signal; + this.exception = exception; + } + + @Override + public ISignalProtocol<INFRA_STRUCTURE> getSource() + { + @SuppressWarnings("unchecked") + ISignalProtocol<INFRA_STRUCTURE> source = (ISignalProtocol<INFRA_STRUCTURE>)super.getSource(); + return source; + } + + public Signal getSignal() + { + return signal; + } + + public Exception getException() + { + return exception; + } + + @Override + protected String formatAdditionalParameters() + { + String result = "signal=" + signal.getClass().getSimpleName(); + if (exception != null) + { + result += ", exception=" + exception.getClass().getSimpleName(); + } + + return result; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java index 2dd2c42768..6d984ab898 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java @@ -1,636 +1,646 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- * Andre Dietisheim - maintenance
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.buffer.BufferInputStream;
-import org.eclipse.net4j.buffer.IBuffer;
-import org.eclipse.net4j.buffer.IBufferProvider;
-import org.eclipse.net4j.channel.ChannelOutputStream;
-import org.eclipse.net4j.channel.IChannel;
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.util.WrappedException;
-import org.eclipse.net4j.util.event.Event;
-import org.eclipse.net4j.util.event.IListener;
-import org.eclipse.net4j.util.io.IORuntimeException;
-import org.eclipse.net4j.util.io.IStreamWrapper;
-import org.eclipse.net4j.util.io.StreamWrapperChain;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.log.OMLogger;
-import org.eclipse.net4j.util.om.trace.ContextTracer;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import org.eclipse.spi.net4j.Protocol;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.text.MessageFormat;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author Eike Stepper
- */
-public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> implements
- ISignalProtocol<INFRA_STRUCTURE>
-{
- /**
- * @since 2.0
- */
- public static final short SIGNAL_REMOTE_EXCEPTION = -1;
-
- /**
- * @since 2.0
- */
- public static final short SIGNAL_MONITOR_CANCELED = -2;
-
- /**
- * @since 2.0
- */
- public static final short SIGNAL_MONITOR_PROGRESS = -3;
-
- /**
- * @since 4.1
- */
- public static final short SIGNAL_SET_TIMEOUT = -4;
-
- private static final int MIN_CORRELATION_ID = 1;
-
- private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE;
-
- private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, SignalProtocol.class);
-
- private static final ContextTracer STREAM_TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, SignalProtocol.class);
-
- private long timeout = DEFAULT_TIMEOUT;
-
- private IStreamWrapper streamWrapper;
-
- private Map<Integer, Signal> signals = new HashMap<Integer, Signal>();
-
- private int nextCorrelationID = MIN_CORRELATION_ID;
-
- private boolean failingOver;
-
- /**
- * @since 2.0
- */
- public SignalProtocol(String type)
- {
- super(type);
- }
-
- /**
- * @since 2.0
- */
- public long getTimeout()
- {
- return timeout;
- }
-
- /**
- * @since 2.0
- */
- public void setTimeout(long timeout)
- {
- long oldTimeout = this.timeout;
- handleSetTimeOut(timeout);
-
- if (oldTimeout != this.timeout && isActive())
- {
- sendSetTimeout();
- }
- }
-
- public IStreamWrapper getStreamWrapper()
- {
- return streamWrapper;
- }
-
- public void setStreamWrapper(IStreamWrapper streamWrapper)
- {
- this.streamWrapper = streamWrapper;
- }
-
- public void addStreamWrapper(IStreamWrapper streamWrapper)
- {
- if (this.streamWrapper == null)
- {
- this.streamWrapper = streamWrapper;
- }
- else
- {
- this.streamWrapper = new StreamWrapperChain(streamWrapper, this.streamWrapper);
- }
- }
-
- /**
- * @since 2.0
- */
- public IChannel open(IConnector connector)
- {
- return connector.openChannel(this);
- }
-
- /**
- * @since 2.0
- */
- public void close()
- {
- LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG);
- }
-
- public boolean waitForSignals(long timeout)
- {
- synchronized (signals)
- {
- while (!signals.isEmpty())
- {
- try
- {
- signals.wait(timeout);
- }
- catch (InterruptedException ex)
- {
- return false;
- }
- }
- }
-
- return true;
- }
-
- /**
- * Handles a given (incoming) buffer. Creates a signal to act upon the given buffer or uses a previously created
- * signal.
- */
- public void handleBuffer(IBuffer buffer)
- {
- ByteBuffer byteBuffer = buffer.getByteBuffer();
- int correlationID = byteBuffer.getInt();
- if (TRACER.isEnabled())
- {
- TRACER.trace("Received buffer for correlation " + correlationID); //$NON-NLS-1$
- }
-
- Signal signal;
- boolean newSignalScheduled = false;
-
- synchronized (signals)
- {
- if (correlationID > 0)
- {
- // Incoming indication
- signal = signals.get(-correlationID);
- if (signal == null)
- {
- short signalID = byteBuffer.getShort();
- if (TRACER.isEnabled())
- {
- TRACER.trace("Got signalID: " + signalID); //$NON-NLS-1$
- }
-
- signal = provideSignalReactor(signalID);
- signal.setCorrelationID(-correlationID);
- signal.setBufferInputStream(new SignalInputStream(getTimeout()));
- if (signal instanceof IndicationWithResponse)
- {
- signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false));
- }
-
- signals.put(-correlationID, signal);
- getExecutorService().execute(signal);
- newSignalScheduled = true;
- }
- }
- else
- {
- // Incoming confirmation
- signal = signals.get(-correlationID);
- }
- }
-
- if (signal != null) // Can be null after timeout
- {
- if (newSignalScheduled)
- {
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SignalScheduledEvent<INFRA_STRUCTURE>(this, signal), listeners);
- }
- }
-
- BufferInputStream inputStream = signal.getBufferInputStream();
- inputStream.handleBuffer(buffer);
- }
- else
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Discarding buffer"); //$NON-NLS-1$
- }
-
- buffer.release();
- }
- }
-
- @Override
- public String toString()
- {
- return MessageFormat.format("SignalProtocol[{0}]", getType()); //$NON-NLS-1$
- }
-
- @Override
- protected void doAfterActivate() throws Exception
- {
- super.doAfterActivate();
-
- if (timeout != DEFAULT_TIMEOUT)
- {
- sendSetTimeout();
- }
- }
-
- @Override
- protected void doBeforeDeactivate() throws Exception
- {
- synchronized (signals)
- {
- // Wait at most 10 seconds for running signals to finish
- int waitMillis = 10 * 1000;
- long stop = System.currentTimeMillis() + waitMillis;
- while (!signals.isEmpty() && System.currentTimeMillis() < stop)
- {
- signals.wait(1000L);
- }
- }
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- synchronized (signals)
- {
- signals.clear();
- }
-
- IChannel channel = getChannel();
- if (channel != null)
- {
- channel.close();
- setChannel(null);
- }
-
- super.doDeactivate();
- }
-
- @Override
- protected void handleChannelDeactivation()
- {
- if (!failingOver)
- {
- super.handleChannelDeactivation();
- }
- }
-
- protected final SignalReactor provideSignalReactor(short signalID)
- {
- checkActive();
- switch (signalID)
- {
- case SIGNAL_REMOTE_EXCEPTION:
- return new RemoteExceptionIndication(this);
-
- case SIGNAL_MONITOR_CANCELED:
- return new MonitorCanceledIndication(this);
-
- case SIGNAL_MONITOR_PROGRESS:
- return new MonitorProgressIndication(this);
-
- case SIGNAL_SET_TIMEOUT:
- return new SetTimeoutIndication(this);
-
- default:
- SignalReactor signal = createSignalReactor(signalID);
- if (signal == null)
- {
- throw new IllegalArgumentException("Invalid signalID " + signalID); //$NON-NLS-1$
- }
-
- return signal;
- }
- }
-
- /**
- * Returns a new signal instance to serve the given signal ID or <code>null</code> if the signal ID is invalid/unknown
- * for this protocol.
- */
- protected SignalReactor createSignalReactor(short signalID)
- {
- return null;
- }
-
- /**
- * Returns <code>true</code> by default, override to change this behaviour.
- *
- * @since 4.1
- */
- protected boolean isSendingTimeoutChanges()
- {
- return true;
- }
-
- synchronized int getNextCorrelationID()
- {
- int correlationID = nextCorrelationID;
- if (nextCorrelationID == MAX_CORRELATION_ID)
- {
- if (TRACER.isEnabled())
- {
- TRACER.trace("Correlation ID wrap-around"); //$NON-NLS-1$
- }
-
- nextCorrelationID = MIN_CORRELATION_ID;
- }
- else
- {
- ++nextCorrelationID;
- }
-
- return correlationID;
- }
-
- InputStream wrapInputStream(InputStream in) throws IOException
- {
- if (streamWrapper != null)
- {
- in = streamWrapper.wrapInputStream(in);
- }
-
- return in;
- }
-
- OutputStream wrapOutputStream(OutputStream out) throws IOException
- {
- if (streamWrapper != null)
- {
- out = streamWrapper.wrapOutputStream(out);
- }
-
- return out;
- }
-
- void finishInputStream(InputStream in) throws IOException
- {
- if (streamWrapper != null)
- {
- streamWrapper.finishInputStream(in);
- }
- }
-
- void finishOutputStream(OutputStream out) throws IOException
- {
- if (streamWrapper != null)
- {
- streamWrapper.finishOutputStream(out);
- }
- }
-
- void startSignal(SignalActor signalActor, long timeout) throws Exception
- {
- checkArg(signalActor.getProtocol() == this, "Wrong protocol"); //$NON-NLS-1$
- short signalID = signalActor.getID();
- int correlationID = signalActor.getCorrelationID();
- signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true));
- if (signalActor instanceof RequestWithConfirmation<?>)
- {
- signalActor.setBufferInputStream(new SignalInputStream(timeout));
- }
-
- synchronized (signals)
- {
- signals.put(correlationID, signalActor);
- }
-
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SignalScheduledEvent<INFRA_STRUCTURE>(this, signalActor), listeners);
- }
-
- signalActor.runSync();
- }
-
- void stopSignal(Signal signal, Exception exception)
- {
- int correlationID = signal.getCorrelationID();
- synchronized (signals)
- {
- signals.remove(correlationID);
- signals.notifyAll();
- }
-
- IListener[] listeners = getListeners();
- if (listeners != null)
- {
- fireEvent(new SignalFinishedEvent<INFRA_STRUCTURE>(this, signal, exception), listeners);
- }
- }
-
- void handleRemoteException(int correlationID, Throwable t, boolean responding)
- {
- synchronized (signals)
- {
- Signal signal = signals.remove(correlationID);
- if (signal instanceof RequestWithConfirmation<?>)
- {
- RequestWithConfirmation<?> request = (RequestWithConfirmation<?>)signal;
- request.setRemoteException(t, responding);
- }
-
- signals.notifyAll();
- }
- }
-
- void handleMonitorProgress(int correlationID, double totalWork, double work)
- {
- synchronized (signals)
- {
- Signal signal = signals.get(correlationID);
- if (signal instanceof RequestWithMonitoring<?>)
- {
- RequestWithMonitoring<?> request = (RequestWithMonitoring<?>)signal;
- request.setMonitorProgress(totalWork, work);
- }
- }
- }
-
- void handleMonitorCanceled(int correlationID)
- {
- synchronized (signals)
- {
- Signal signal = signals.get(correlationID);
- if (signal instanceof IndicationWithMonitoring)
- {
- IndicationWithMonitoring indication = (IndicationWithMonitoring)signal;
- indication.setMonitorCanceled();
- }
- }
- }
-
- void handleSetTimeOut(long timeout)
- {
- long oldTimeout = this.timeout;
- if (oldTimeout != timeout)
- {
- this.timeout = timeout;
- fireEvent(new TimeoutChangedEvent(this, oldTimeout, timeout));
- }
- }
-
- void sendSetTimeout()
- {
- if (isSendingTimeoutChanges())
- {
- try
- {
- new SetTimeoutRequest(this, this.timeout).send();
- }
- catch (Exception ex)
- {
- throw WrappedException.wrap(ex);
- }
- }
- }
-
- /**
- * @author Eike Stepper
- * @since 4.1
- */
- public static final class TimeoutChangedEvent extends Event
- {
- private static final long serialVersionUID = 1L;
-
- private long oldTimeout;
-
- private long newTimeout;
-
- private TimeoutChangedEvent(ISignalProtocol<?> source, long oldTimeout, long newTimeout)
- {
- super(source);
- this.oldTimeout = oldTimeout;
- this.newTimeout = newTimeout;
- }
-
- @Override
- public SignalProtocol<?> getSource()
- {
- return (SignalProtocol<?>)super.getSource();
- }
-
- public long getOldTimeout()
- {
- return oldTimeout;
- }
-
- public long getNewTimeout()
- {
- return newTimeout;
- }
-
- @Override
- public String toString()
- {
- return "TimeoutChangedEvent [oldTimeout=" + oldTimeout + ", newTimeout=" + newTimeout + ", source=" + source
- + "]";
- }
-
- }
-
- /**
- * @author Eike Stepper
- */
- class SignalInputStream extends BufferInputStream
- {
- private long timeout;
-
- public SignalInputStream(long timeout)
- {
- this.timeout = timeout;
- }
-
- @Override
- public long getMillisBeforeTimeout()
- {
- return timeout;
- }
- }
-
- /**
- * @author Eike Stepper
- */
- class SignalOutputStream extends ChannelOutputStream
- {
- public SignalOutputStream(final int correlationID, final short signalID, final boolean addSignalID)
- {
- super(getChannel(), new IBufferProvider()
- {
- private IBufferProvider delegate = getBufferProvider();
-
- private boolean firstBuffer = addSignalID;
-
- public short getBufferCapacity()
- {
- return delegate.getBufferCapacity();
- }
-
- public IBuffer provideBuffer()
- {
- IChannel channel = getChannel();
- if (channel == null)
- {
- throw new IORuntimeException("No channel for protocol " + SignalProtocol.this); //$NON-NLS-1$
- }
-
- IBuffer buffer = delegate.provideBuffer();
- ByteBuffer byteBuffer = buffer.startPutting(channel.getID());
- if (STREAM_TRACER.isEnabled())
- {
- STREAM_TRACER.trace("Providing buffer for correlation " + correlationID); //$NON-NLS-1$
- }
-
- byteBuffer.putInt(correlationID);
- if (firstBuffer)
- {
- if (SignalProtocol.TRACER.isEnabled())
- {
- STREAM_TRACER.trace("Put signal id " + signalID); //$NON-NLS-1$
- }
-
- byteBuffer.putShort(signalID);
- }
-
- firstBuffer = false;
- return buffer;
- }
-
- public void retainBuffer(IBuffer buffer)
- {
- delegate.retainBuffer(buffer);
- }
- });
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + * Andre Dietisheim - maintenance + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.buffer.BufferInputStream; +import org.eclipse.net4j.buffer.IBuffer; +import org.eclipse.net4j.buffer.IBufferProvider; +import org.eclipse.net4j.channel.ChannelOutputStream; +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.util.WrappedException; +import org.eclipse.net4j.util.event.Event; +import org.eclipse.net4j.util.event.IEvent; +import org.eclipse.net4j.util.event.IListener; +import org.eclipse.net4j.util.io.IORuntimeException; +import org.eclipse.net4j.util.io.IStreamWrapper; +import org.eclipse.net4j.util.io.StreamWrapperChain; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.log.OMLogger; +import org.eclipse.net4j.util.om.trace.ContextTracer; + +import org.eclipse.internal.net4j.bundle.OM; + +import org.eclipse.spi.net4j.Protocol; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; + +/** + * The default implementation of a {@link ISignalProtocol signal protocol}. + * <p> + * On the {@link org.eclipse.net4j.ILocationAware.Location#SERVER receiver side(s)} of protocol the + * {@link #createSignalReactor(short) createSignalReactor()} method has to be overridden to + * create appropriate peer instances for incoming {@link Signal signals}. + * + * @author Eike Stepper + */ +public class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STRUCTURE> implements + ISignalProtocol<INFRA_STRUCTURE> +{ + /** + * @since 2.0 + */ + public static final short SIGNAL_REMOTE_EXCEPTION = -1; + + /** + * @since 2.0 + */ + public static final short SIGNAL_MONITOR_CANCELED = -2; + + /** + * @since 2.0 + */ + public static final short SIGNAL_MONITOR_PROGRESS = -3; + + /** + * @since 4.1 + */ + public static final short SIGNAL_SET_TIMEOUT = -4; + + private static final int MIN_CORRELATION_ID = 1; + + private static final int MAX_CORRELATION_ID = Integer.MAX_VALUE; + + private static final ContextTracer TRACER = new ContextTracer(OM.DEBUG_SIGNAL, SignalProtocol.class); + + private static final ContextTracer STREAM_TRACER = new ContextTracer(OM.DEBUG_BUFFER_STREAM, SignalProtocol.class); + + private long timeout = DEFAULT_TIMEOUT; + + private IStreamWrapper streamWrapper; + + private Map<Integer, Signal> signals = new HashMap<Integer, Signal>(); + + private int nextCorrelationID = MIN_CORRELATION_ID; + + private boolean failingOver; + + /** + * @since 2.0 + */ + public SignalProtocol(String type) + { + super(type); + } + + /** + * @since 2.0 + */ + public long getTimeout() + { + return timeout; + } + + /** + * @since 2.0 + */ + public void setTimeout(long timeout) + { + long oldTimeout = this.timeout; + handleSetTimeOut(timeout); + + if (oldTimeout != this.timeout && isActive()) + { + sendSetTimeout(); + } + } + + public IStreamWrapper getStreamWrapper() + { + return streamWrapper; + } + + public void setStreamWrapper(IStreamWrapper streamWrapper) + { + this.streamWrapper = streamWrapper; + } + + public void addStreamWrapper(IStreamWrapper streamWrapper) + { + if (this.streamWrapper == null) + { + this.streamWrapper = streamWrapper; + } + else + { + this.streamWrapper = new StreamWrapperChain(streamWrapper, this.streamWrapper); + } + } + + /** + * @since 2.0 + */ + public IChannel open(IConnector connector) + { + return connector.openChannel(this); + } + + /** + * @since 2.0 + */ + public void close() + { + LifecycleUtil.deactivate(this, OMLogger.Level.DEBUG); + } + + public boolean waitForSignals(long timeout) + { + synchronized (signals) + { + while (!signals.isEmpty()) + { + try + { + signals.wait(timeout); + } + catch (InterruptedException ex) + { + return false; + } + } + } + + return true; + } + + /** + * Handles a given (incoming) buffer. Creates a signal to act upon the given buffer or uses a previously created + * signal. + */ + public void handleBuffer(IBuffer buffer) + { + ByteBuffer byteBuffer = buffer.getByteBuffer(); + int correlationID = byteBuffer.getInt(); + if (TRACER.isEnabled()) + { + TRACER.trace("Received buffer for correlation " + correlationID); //$NON-NLS-1$ + } + + Signal signal; + boolean newSignalScheduled = false; + + synchronized (signals) + { + if (correlationID > 0) + { + // Incoming indication + signal = signals.get(-correlationID); + if (signal == null) + { + short signalID = byteBuffer.getShort(); + if (TRACER.isEnabled()) + { + TRACER.trace("Got signalID: " + signalID); //$NON-NLS-1$ + } + + signal = provideSignalReactor(signalID); + signal.setCorrelationID(-correlationID); + signal.setBufferInputStream(new SignalInputStream(getTimeout())); + if (signal instanceof IndicationWithResponse) + { + signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false)); + } + + signals.put(-correlationID, signal); + getExecutorService().execute(signal); + newSignalScheduled = true; + } + } + else + { + // Incoming confirmation + signal = signals.get(-correlationID); + } + } + + if (signal != null) // Can be null after timeout + { + if (newSignalScheduled) + { + IListener[] listeners = getListeners(); + if (listeners != null) + { + fireEvent(new SignalScheduledEvent<INFRA_STRUCTURE>(this, signal), listeners); + } + } + + BufferInputStream inputStream = signal.getBufferInputStream(); + inputStream.handleBuffer(buffer); + } + else + { + if (TRACER.isEnabled()) + { + TRACER.trace("Discarding buffer"); //$NON-NLS-1$ + } + + buffer.release(); + } + } + + @Override + public String toString() + { + return MessageFormat.format("SignalProtocol[{0}]", getType()); //$NON-NLS-1$ + } + + @Override + protected void doAfterActivate() throws Exception + { + super.doAfterActivate(); + + if (timeout != DEFAULT_TIMEOUT) + { + sendSetTimeout(); + } + } + + @Override + protected void doBeforeDeactivate() throws Exception + { + synchronized (signals) + { + // Wait at most 10 seconds for running signals to finish + int waitMillis = 10 * 1000; + long stop = System.currentTimeMillis() + waitMillis; + while (!signals.isEmpty() && System.currentTimeMillis() < stop) + { + signals.wait(1000L); + } + } + } + + @Override + protected void doDeactivate() throws Exception + { + synchronized (signals) + { + signals.clear(); + } + + IChannel channel = getChannel(); + if (channel != null) + { + channel.close(); + setChannel(null); + } + + super.doDeactivate(); + } + + @Override + protected void handleChannelDeactivation() + { + if (!failingOver) + { + super.handleChannelDeactivation(); + } + } + + protected final SignalReactor provideSignalReactor(short signalID) + { + checkActive(); + switch (signalID) + { + case SIGNAL_REMOTE_EXCEPTION: + return new RemoteExceptionIndication(this); + + case SIGNAL_MONITOR_CANCELED: + return new MonitorCanceledIndication(this); + + case SIGNAL_MONITOR_PROGRESS: + return new MonitorProgressIndication(this); + + case SIGNAL_SET_TIMEOUT: + return new SetTimeoutIndication(this); + + default: + SignalReactor signal = createSignalReactor(signalID); + if (signal == null) + { + throw new IllegalArgumentException("Invalid signalID " + signalID); //$NON-NLS-1$ + } + + return signal; + } + } + + /** + * Returns a new signal instance to serve the given signal ID or <code>null</code> if the signal ID is invalid/unknown + * for this protocol. + */ + protected SignalReactor createSignalReactor(short signalID) + { + return null; + } + + /** + * Returns <code>true</code> by default, override to change this behaviour. + * + * @since 4.1 + */ + protected boolean isSendingTimeoutChanges() + { + return true; + } + + synchronized int getNextCorrelationID() + { + int correlationID = nextCorrelationID; + if (nextCorrelationID == MAX_CORRELATION_ID) + { + if (TRACER.isEnabled()) + { + TRACER.trace("Correlation ID wrap-around"); //$NON-NLS-1$ + } + + nextCorrelationID = MIN_CORRELATION_ID; + } + else + { + ++nextCorrelationID; + } + + return correlationID; + } + + InputStream wrapInputStream(InputStream in) throws IOException + { + if (streamWrapper != null) + { + in = streamWrapper.wrapInputStream(in); + } + + return in; + } + + OutputStream wrapOutputStream(OutputStream out) throws IOException + { + if (streamWrapper != null) + { + out = streamWrapper.wrapOutputStream(out); + } + + return out; + } + + void finishInputStream(InputStream in) throws IOException + { + if (streamWrapper != null) + { + streamWrapper.finishInputStream(in); + } + } + + void finishOutputStream(OutputStream out) throws IOException + { + if (streamWrapper != null) + { + streamWrapper.finishOutputStream(out); + } + } + + void startSignal(SignalActor signalActor, long timeout) throws Exception + { + checkArg(signalActor.getProtocol() == this, "Wrong protocol"); //$NON-NLS-1$ + short signalID = signalActor.getID(); + int correlationID = signalActor.getCorrelationID(); + signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true)); + if (signalActor instanceof RequestWithConfirmation<?>) + { + signalActor.setBufferInputStream(new SignalInputStream(timeout)); + } + + synchronized (signals) + { + signals.put(correlationID, signalActor); + } + + IListener[] listeners = getListeners(); + if (listeners != null) + { + fireEvent(new SignalScheduledEvent<INFRA_STRUCTURE>(this, signalActor), listeners); + } + + signalActor.runSync(); + } + + void stopSignal(Signal signal, Exception exception) + { + int correlationID = signal.getCorrelationID(); + synchronized (signals) + { + signals.remove(correlationID); + signals.notifyAll(); + } + + IListener[] listeners = getListeners(); + if (listeners != null) + { + fireEvent(new SignalFinishedEvent<INFRA_STRUCTURE>(this, signal, exception), listeners); + } + } + + void handleRemoteException(int correlationID, Throwable t, boolean responding) + { + synchronized (signals) + { + Signal signal = signals.remove(correlationID); + if (signal instanceof RequestWithConfirmation<?>) + { + RequestWithConfirmation<?> request = (RequestWithConfirmation<?>)signal; + request.setRemoteException(t, responding); + } + + signals.notifyAll(); + } + } + + void handleMonitorProgress(int correlationID, double totalWork, double work) + { + synchronized (signals) + { + Signal signal = signals.get(correlationID); + if (signal instanceof RequestWithMonitoring<?>) + { + RequestWithMonitoring<?> request = (RequestWithMonitoring<?>)signal; + request.setMonitorProgress(totalWork, work); + } + } + } + + void handleMonitorCanceled(int correlationID) + { + synchronized (signals) + { + Signal signal = signals.get(correlationID); + if (signal instanceof IndicationWithMonitoring) + { + IndicationWithMonitoring indication = (IndicationWithMonitoring)signal; + indication.setMonitorCanceled(); + } + } + } + + void handleSetTimeOut(long timeout) + { + long oldTimeout = this.timeout; + if (oldTimeout != timeout) + { + this.timeout = timeout; + fireEvent(new TimeoutChangedEvent(this, oldTimeout, timeout)); + } + } + + void sendSetTimeout() + { + if (isSendingTimeoutChanges()) + { + try + { + new SetTimeoutRequest(this, this.timeout).send(); + } + catch (Exception ex) + { + throw WrappedException.wrap(ex); + } + } + } + + /** + * An {@link IEvent event} fired from a {@link ISignalProtocol signal protocol} when the protocol {@link ISignalProtocol#setTimeout(long) timeout} + * has been changed. + * + * @author Eike Stepper + * @since 4.1 + */ + public static final class TimeoutChangedEvent extends Event + { + private static final long serialVersionUID = 1L; + + private long oldTimeout; + + private long newTimeout; + + private TimeoutChangedEvent(ISignalProtocol<?> source, long oldTimeout, long newTimeout) + { + super(source); + this.oldTimeout = oldTimeout; + this.newTimeout = newTimeout; + } + + @Override + public SignalProtocol<?> getSource() + { + return (SignalProtocol<?>)super.getSource(); + } + + public long getOldTimeout() + { + return oldTimeout; + } + + public long getNewTimeout() + { + return newTimeout; + } + + @Override + public String toString() + { + return "TimeoutChangedEvent [oldTimeout=" + oldTimeout + ", newTimeout=" + newTimeout + ", source=" + source + + "]"; + } + + } + + /** + * @author Eike Stepper + */ + class SignalInputStream extends BufferInputStream + { + private long timeout; + + public SignalInputStream(long timeout) + { + this.timeout = timeout; + } + + @Override + public long getMillisBeforeTimeout() + { + return timeout; + } + } + + /** + * @author Eike Stepper + */ + class SignalOutputStream extends ChannelOutputStream + { + public SignalOutputStream(final int correlationID, final short signalID, final boolean addSignalID) + { + super(getChannel(), new IBufferProvider() + { + private IBufferProvider delegate = getBufferProvider(); + + private boolean firstBuffer = addSignalID; + + public short getBufferCapacity() + { + return delegate.getBufferCapacity(); + } + + public IBuffer provideBuffer() + { + IChannel channel = getChannel(); + if (channel == null) + { + throw new IORuntimeException("No channel for protocol " + SignalProtocol.this); //$NON-NLS-1$ + } + + IBuffer buffer = delegate.provideBuffer(); + ByteBuffer byteBuffer = buffer.startPutting(channel.getID()); + if (STREAM_TRACER.isEnabled()) + { + STREAM_TRACER.trace("Providing buffer for correlation " + correlationID); //$NON-NLS-1$ + } + + byteBuffer.putInt(correlationID); + if (firstBuffer) + { + if (SignalProtocol.TRACER.isEnabled()) + { + STREAM_TRACER.trace("Put signal id " + signalID); //$NON-NLS-1$ + } + + byteBuffer.putShort(signalID); + } + + firstBuffer = false; + return buffer; + } + + public void retainBuffer(IBuffer buffer) + { + delegate.retainBuffer(buffer); + } + }); + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java index 68ef475cd9..e42acf534c 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalReactor.java @@ -1,53 +1,55 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-/**
- * @author Eike Stepper
- */
-public abstract class SignalReactor extends Signal
-{
- /**
- * @since 2.0
- */
- public SignalReactor(SignalProtocol<?> protocol, short id, String name)
- {
- super(protocol, id, name);
- }
-
- /**
- * @since 2.0
- */
- public SignalReactor(SignalProtocol<?> protocol, short signalID)
- {
- super(protocol, signalID);
- }
-
- /**
- * @since 2.0
- */
- public SignalReactor(SignalProtocol<?> protocol, Enum<?> literal)
- {
- super(protocol, literal);
- }
-
- @Override
- String getInputMeaning()
- {
- return "Indicating"; //$NON-NLS-1$
- }
-
- @Override
- String getOutputMeaning()
- {
- return "Responding"; //$NON-NLS-1$
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +/** + * Represents the receiver side of a {@link Signal signal}. + * + * @author Eike Stepper + */ +public abstract class SignalReactor extends Signal +{ + /** + * @since 2.0 + */ + public SignalReactor(SignalProtocol<?> protocol, short id, String name) + { + super(protocol, id, name); + } + + /** + * @since 2.0 + */ + public SignalReactor(SignalProtocol<?> protocol, short signalID) + { + super(protocol, signalID); + } + + /** + * @since 2.0 + */ + public SignalReactor(SignalProtocol<?> protocol, Enum<?> literal) + { + super(protocol, literal); + } + + @Override + String getInputMeaning() + { + return "Indicating"; //$NON-NLS-1$ + } + + @Override + String getOutputMeaning() + { + return "Responding"; //$NON-NLS-1$ + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalScheduledEvent.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalScheduledEvent.java index 50b94bc846..5c1fd1d682 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalScheduledEvent.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalScheduledEvent.java @@ -1,50 +1,54 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal;
-
-import org.eclipse.net4j.util.event.Event;
-
-/**
- * @author Eike Stepper
- * @since 3.0
- * @noextend This interface is not intended to be extended by clients.
- */
-public class SignalScheduledEvent<INFRA_STRUCTURE> extends Event
-{
- private static final long serialVersionUID = 1L;
-
- private Signal signal;
-
- SignalScheduledEvent(ISignalProtocol<INFRA_STRUCTURE> source, Signal signal)
- {
- super(source);
- this.signal = signal;
- }
-
- @Override
- public ISignalProtocol<INFRA_STRUCTURE> getSource()
- {
- @SuppressWarnings("unchecked")
- ISignalProtocol<INFRA_STRUCTURE> source = (ISignalProtocol<INFRA_STRUCTURE>)super.getSource();
- return source;
- }
-
- public Signal getSignal()
- {
- return signal;
- }
-
- @Override
- protected String formatAdditionalParameters()
- {
- return "signal=" + signal.getClass().getSimpleName();
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal; + +import org.eclipse.net4j.util.event.Event; +import org.eclipse.net4j.util.event.IEvent; + +/** + * An {@link IEvent event} fired from a {@link ISignalProtocol signal protocol} when + * a {@link #getSignal() signal} has been scheduled for local execution. + * + * @author Eike Stepper + * @since 3.0 + * @noextend This interface is not intended to be extended by clients. + */ +public class SignalScheduledEvent<INFRA_STRUCTURE> extends Event +{ + private static final long serialVersionUID = 1L; + + private Signal signal; + + SignalScheduledEvent(ISignalProtocol<INFRA_STRUCTURE> source, Signal signal) + { + super(source); + this.signal = signal; + } + + @Override + public ISignalProtocol<INFRA_STRUCTURE> getSource() + { + @SuppressWarnings("unchecked") + ISignalProtocol<INFRA_STRUCTURE> source = (ISignalProtocol<INFRA_STRUCTURE>)super.getSource(); + return source; + } + + public Signal getSignal() + { + return signal; + } + + @Override + protected String formatAdditionalParameters() + { + return "signal=" + signal.getClass().getSimpleName(); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/doc-files/signals.png b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/doc-files/signals.png Binary files differnew file mode 100644 index 0000000000..af40c9518c --- /dev/null +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/doc-files/signals.png diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/heartbeat/HeartBeatProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/heartbeat/HeartBeatProtocol.java index ac235ec11c..2dfee894f6 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/heartbeat/HeartBeatProtocol.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/heartbeat/HeartBeatProtocol.java @@ -1,352 +1,363 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal.heartbeat;
-
-import org.eclipse.net4j.channel.IChannelMultiplexer;
-import org.eclipse.net4j.connector.IConnector;
-import org.eclipse.net4j.signal.Indication;
-import org.eclipse.net4j.signal.Request;
-import org.eclipse.net4j.signal.SignalProtocol;
-import org.eclipse.net4j.signal.SignalReactor;
-import org.eclipse.net4j.util.WrappedException;
-import org.eclipse.net4j.util.concurrent.Timeouter;
-import org.eclipse.net4j.util.concurrent.TimerLifecycle;
-import org.eclipse.net4j.util.container.IElementProcessor;
-import org.eclipse.net4j.util.container.IManagedContainer;
-import org.eclipse.net4j.util.container.IPluginContainer;
-import org.eclipse.net4j.util.factory.ProductCreationException;
-import org.eclipse.net4j.util.io.ExtendedDataInputStream;
-import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.lifecycle.LifecycleUtil;
-import org.eclipse.net4j.util.om.log.OMLogger;
-
-import org.eclipse.internal.net4j.bundle.OM;
-
-import org.eclipse.spi.net4j.ServerProtocolFactory;
-
-import java.io.IOException;
-import java.util.Timer;
-import java.util.TimerTask;
-
-/**
- * @author Eike Stepper
- * @since 2.0
- */
-public class HeartBeatProtocol extends SignalProtocol<Object>
-{
- public static final String TYPE = "heartbeat"; //$NON-NLS-1$
-
- private static final short SIGNAL_START = 1;
-
- private static final short SIGNAL_HEART_BEAT = 2;
-
- private static final boolean HEART_BEAT = true;
-
- private Timeouter timeouter;
-
- private Timer timer;
-
- /**
- * @since 4.0
- */
- protected HeartBeatProtocol(String type, IConnector connector, Timer timer)
- {
- super(type);
- checkArg(timer, "timer"); //$NON-NLS-1$
- checkArg(connector, "connector"); //$NON-NLS-1$
- this.timer = timer;
- open(connector);
- }
-
- public HeartBeatProtocol(IConnector connector, Timer timer)
- {
- this(TYPE, connector, timer);
- }
-
- /**
- * @since 4.0
- */
- public HeartBeatProtocol(IConnector connector, IManagedContainer container)
- {
- this(connector, getDefaultTimer(container));
- }
-
- public HeartBeatProtocol(IConnector connector)
- {
- this(connector, IPluginContainer.INSTANCE);
- }
-
- public Timer getTimer()
- {
- return timer;
- }
-
- /**
- * Same as <code>start(rate, 2 * rate)</code>.
- *
- * @see #start(long, long)
- */
- public void start(final long rate)
- {
- start(rate, 2L * rate);
- }
-
- public void start(final long rate, long timeout)
- {
- checkActive();
- checkArg(rate > 0, "rate"); //$NON-NLS-1$
- checkArg(timeout >= rate, "timeout"); //$NON-NLS-1$
-
- try
- {
- new Request(this, SIGNAL_START, "Start") //$NON-NLS-1$
- {
- @Override
- protected void requesting(ExtendedDataOutputStream out) throws Exception
- {
- requestingStart(out, rate);
- }
- }.sendAsync();
- }
- catch (Exception ex)
- {
- throw WrappedException.wrap(ex);
- }
-
- if (timeouter == null)
- {
- timeouter = new Timeouter(getTimer(), timeout)
- {
- @Override
- protected void handleTimeout(long untouched)
- {
- HeartBeatProtocol.this.handleTimeout(untouched);
- }
- };
- }
- else
- {
- timeouter.setTimeout(timeout);
- timeouter.touch();
- }
- }
-
- @Override
- protected SignalReactor createSignalReactor(short signalID)
- {
- if (signalID == SIGNAL_HEART_BEAT)
- {
- return new Indication(HeartBeatProtocol.this, SIGNAL_HEART_BEAT, "HeartBeat") //$NON-NLS-1$
- {
- @Override
- protected void indicating(ExtendedDataInputStream in) throws Exception
- {
- checkState(in.readBoolean() == HEART_BEAT, "Invalid heart beat"); //$NON-NLS-1$
- timeouter.touch();
- }
- };
- }
-
- return null;
- }
-
- protected void handleTimeout(long untouched)
- {
- IChannelMultiplexer multiplexer = getChannel().getMultiplexer();
- LifecycleUtil.deactivate(multiplexer, OMLogger.Level.DEBUG);
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- if (timeouter != null)
- {
- timeouter.dispose();
- timeouter = null;
- }
-
- super.doDeactivate();
- }
-
- /**
- * @since 4.0
- */
- protected void requestingStart(ExtendedDataOutputStream out, long rate) throws IOException
- {
- out.writeLong(rate);
- }
-
- public static Timer getDefaultTimer(IManagedContainer container)
- {
- return TimerLifecycle.DaemonFactory.getTimer(container, null);
- }
-
- /**
- * @author Eike Stepper
- */
- public static class Server extends SignalProtocol<Object>
- {
- private long heartBeatRate;
-
- private Timer heartBeatTimer;
-
- private TimerTask heartBeatTimerTask;
-
- /**
- * @since 4.0
- */
- protected Server(String type)
- {
- super(type);
- }
-
- public Server()
- {
- this(TYPE);
- }
-
- public Timer getHeartBeatTimer()
- {
- return heartBeatTimer;
- }
-
- public void setHeartBeatTimer(Timer heartBeatTimer)
- {
- checkInactive();
- this.heartBeatTimer = heartBeatTimer;
- }
-
- @Override
- protected SignalReactor createSignalReactor(short signalID)
- {
- if (signalID == SIGNAL_START)
- {
- return new Indication(Server.this, SIGNAL_START, "Start") //$NON-NLS-1$
- {
- @Override
- protected void indicating(ExtendedDataInputStream in) throws Exception
- {
- indicatingStart(in);
- }
- };
- }
-
- return null;
- }
-
- @Override
- protected void doBeforeActivate() throws Exception
- {
- super.doBeforeActivate();
- checkState(heartBeatTimer, "heartBeatTimer"); //$NON-NLS-1$
- }
-
- @Override
- protected void doDeactivate() throws Exception
- {
- cancelHeartBeatTask();
- super.doDeactivate();
- }
-
- /**
- * @since 4.0
- */
- protected void indicatingStart(ExtendedDataInputStream in) throws IOException
- {
- heartBeatRate = in.readLong();
- cancelHeartBeatTask();
- scheduleHeartBeatTask();
- }
-
- private void scheduleHeartBeatTask()
- {
- heartBeatTimerTask = new TimerTask()
- {
- @Override
- public void run()
- {
- try
- {
- new Request(Server.this, SIGNAL_HEART_BEAT, "HeartBeat") //$NON-NLS-1$
- {
- @Override
- protected void requesting(ExtendedDataOutputStream out) throws Exception
- {
- out.writeBoolean(HEART_BEAT);
- }
- }.sendAsync();
- }
- catch (Exception ex)
- {
- OM.LOG.error("HeartBeatProtocolTask failed", ex);
- }
- }
- };
-
- heartBeatTimer.schedule(heartBeatTimerTask, 0L, heartBeatRate);
- }
-
- private void cancelHeartBeatTask()
- {
- if (heartBeatTimerTask != null)
- {
- heartBeatTimerTask.cancel();
- heartBeatTimerTask = null;
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public static class Factory extends ServerProtocolFactory
- {
- public Factory()
- {
- super(TYPE);
- }
-
- public Object create(String description) throws ProductCreationException
- {
- return new HeartBeatProtocol.Server();
- }
- }
-
- /**
- * @author Eike Stepper
- */
- public static class TimerInjector implements IElementProcessor
- {
- public TimerInjector()
- {
- }
-
- public Object process(IManagedContainer container, String productGroup, String factoryType, String description,
- Object element)
- {
- if (element instanceof Server)
- {
- Server server = (Server)element;
- if (server.getHeartBeatTimer() == null)
- {
- server.setHeartBeatTimer(getTimer(container));
- }
- }
-
- return element;
- }
-
- protected Timer getTimer(IManagedContainer container)
- {
- return getDefaultTimer(container);
- }
- }
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal.heartbeat; + +import org.eclipse.net4j.channel.IChannel; +import org.eclipse.net4j.channel.IChannelMultiplexer; +import org.eclipse.net4j.connector.IConnector; +import org.eclipse.net4j.signal.Indication; +import org.eclipse.net4j.signal.Request; +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.signal.SignalReactor; +import org.eclipse.net4j.util.WrappedException; +import org.eclipse.net4j.util.concurrent.Timeouter; +import org.eclipse.net4j.util.concurrent.TimerLifecycle; +import org.eclipse.net4j.util.container.IElementProcessor; +import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.net4j.util.container.IPluginContainer; +import org.eclipse.net4j.util.factory.ProductCreationException; +import org.eclipse.net4j.util.io.ExtendedDataInputStream; +import org.eclipse.net4j.util.io.ExtendedDataOutputStream; +import org.eclipse.net4j.util.lifecycle.LifecycleUtil; +import org.eclipse.net4j.util.om.log.OMLogger; + +import org.eclipse.internal.net4j.bundle.OM; + +import org.eclipse.spi.net4j.ServerProtocolFactory; + +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; + +/** + * A {@link SignalProtocol signal protocol} that keeps the {@link IConnector connector} of its {@link IChannel channel} open + * or provides early deactivation feedback by exchanging periodic heart beats. + * + * @author Eike Stepper + * @since 2.0 + */ +public class HeartBeatProtocol extends SignalProtocol<Object> +{ + public static final String TYPE = "heartbeat"; //$NON-NLS-1$ + + private static final short SIGNAL_START = 1; + + private static final short SIGNAL_HEART_BEAT = 2; + + private static final boolean HEART_BEAT = true; + + private Timeouter timeouter; + + private Timer timer; + + /** + * @since 4.0 + */ + protected HeartBeatProtocol(String type, IConnector connector, Timer timer) + { + super(type); + checkArg(timer, "timer"); //$NON-NLS-1$ + checkArg(connector, "connector"); //$NON-NLS-1$ + this.timer = timer; + open(connector); + } + + public HeartBeatProtocol(IConnector connector, Timer timer) + { + this(TYPE, connector, timer); + } + + /** + * @since 4.0 + */ + public HeartBeatProtocol(IConnector connector, IManagedContainer container) + { + this(connector, getDefaultTimer(container)); + } + + public HeartBeatProtocol(IConnector connector) + { + this(connector, IPluginContainer.INSTANCE); + } + + public Timer getTimer() + { + return timer; + } + + /** + * Same as <code>start(rate, 2 * rate)</code>. + * + * @see #start(long, long) + */ + public void start(final long rate) + { + start(rate, 2L * rate); + } + + public void start(final long rate, long timeout) + { + checkActive(); + checkArg(rate > 0, "rate"); //$NON-NLS-1$ + checkArg(timeout >= rate, "timeout"); //$NON-NLS-1$ + + try + { + new Request(this, SIGNAL_START, "Start") //$NON-NLS-1$ + { + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + requestingStart(out, rate); + } + }.sendAsync(); + } + catch (Exception ex) + { + throw WrappedException.wrap(ex); + } + + if (timeouter == null) + { + timeouter = new Timeouter(getTimer(), timeout) + { + @Override + protected void handleTimeout(long untouched) + { + HeartBeatProtocol.this.handleTimeout(untouched); + } + }; + } + else + { + timeouter.setTimeout(timeout); + timeouter.touch(); + } + } + + @Override + protected SignalReactor createSignalReactor(short signalID) + { + if (signalID == SIGNAL_HEART_BEAT) + { + return new Indication(HeartBeatProtocol.this, SIGNAL_HEART_BEAT, "HeartBeat") //$NON-NLS-1$ + { + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + checkState(in.readBoolean() == HEART_BEAT, "Invalid heart beat"); //$NON-NLS-1$ + timeouter.touch(); + } + }; + } + + return null; + } + + protected void handleTimeout(long untouched) + { + IChannelMultiplexer multiplexer = getChannel().getMultiplexer(); + LifecycleUtil.deactivate(multiplexer, OMLogger.Level.DEBUG); + } + + @Override + protected void doDeactivate() throws Exception + { + if (timeouter != null) + { + timeouter.dispose(); + timeouter = null; + } + + super.doDeactivate(); + } + + /** + * @since 4.0 + */ + protected void requestingStart(ExtendedDataOutputStream out, long rate) throws IOException + { + out.writeLong(rate); + } + + public static Timer getDefaultTimer(IManagedContainer container) + { + return TimerLifecycle.DaemonFactory.getTimer(container, null); + } + + /** + * The server-side implementation of a {@link HeartBeatProtocol heart beat protocol}. + * + * @author Eike Stepper + */ + public static class Server extends SignalProtocol<Object> + { + private long heartBeatRate; + + private Timer heartBeatTimer; + + private TimerTask heartBeatTimerTask; + + /** + * @since 4.0 + */ + protected Server(String type) + { + super(type); + } + + public Server() + { + this(TYPE); + } + + public Timer getHeartBeatTimer() + { + return heartBeatTimer; + } + + public void setHeartBeatTimer(Timer heartBeatTimer) + { + checkInactive(); + this.heartBeatTimer = heartBeatTimer; + } + + @Override + protected SignalReactor createSignalReactor(short signalID) + { + if (signalID == SIGNAL_START) + { + return new Indication(Server.this, SIGNAL_START, "Start") //$NON-NLS-1$ + { + @Override + protected void indicating(ExtendedDataInputStream in) throws Exception + { + indicatingStart(in); + } + }; + } + + return null; + } + + @Override + protected void doBeforeActivate() throws Exception + { + super.doBeforeActivate(); + checkState(heartBeatTimer, "heartBeatTimer"); //$NON-NLS-1$ + } + + @Override + protected void doDeactivate() throws Exception + { + cancelHeartBeatTask(); + super.doDeactivate(); + } + + /** + * @since 4.0 + */ + protected void indicatingStart(ExtendedDataInputStream in) throws IOException + { + heartBeatRate = in.readLong(); + cancelHeartBeatTask(); + scheduleHeartBeatTask(); + } + + private void scheduleHeartBeatTask() + { + heartBeatTimerTask = new TimerTask() + { + @Override + public void run() + { + try + { + new Request(Server.this, SIGNAL_HEART_BEAT, "HeartBeat") //$NON-NLS-1$ + { + @Override + protected void requesting(ExtendedDataOutputStream out) throws Exception + { + out.writeBoolean(HEART_BEAT); + } + }.sendAsync(); + } + catch (Exception ex) + { + OM.LOG.error("HeartBeatProtocolTask failed", ex); + } + } + }; + + heartBeatTimer.schedule(heartBeatTimerTask, 0L, heartBeatRate); + } + + private void cancelHeartBeatTask() + { + if (heartBeatTimerTask != null) + { + heartBeatTimerTask.cancel(); + heartBeatTimerTask = null; + } + } + + /** + * Creates server-side {@link Server heart beat protocol} instances. + * + * @author Eike Stepper + */ + public static class Factory extends ServerProtocolFactory + { + public Factory() + { + super(TYPE); + } + + public Object create(String description) throws ProductCreationException + { + return new HeartBeatProtocol.Server(); + } + } + + /** + * An {@link IElementProcessor element post processor} that injects a {@link #getTimer(IManagedContainer) timer} + * into server-side {@link Server heart beat protocol} instances. + * + * @author Eike Stepper + */ + public static class TimerInjector implements IElementProcessor + { + public TimerInjector() + { + } + + public Object process(IManagedContainer container, String productGroup, String factoryType, String description, + Object element) + { + if (element instanceof Server) + { + Server server = (Server)element; + if (server.getHeartBeatTimer() == null) + { + server.setHeartBeatTimer(getTimer(container)); + } + } + + return element; + } + + protected Timer getTimer(IManagedContainer container) + { + return getDefaultTimer(container); + } + } + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/package-info.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/package-info.java index 3aebcbe561..a8bd3c9e6e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/package-info.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/package-info.java @@ -1,16 +1,18 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-
-/**
- * A framework for request/response based communication on top of
- * the Net4j transport layer.
- */
-package org.eclipse.net4j.signal;
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ + +/** + * A framework for request/response based communication on top of the Net4j transport layer. + * <p> + * <img src="doc-files/signals.png" title="Diagram Signals" border="0"/> + */ +package org.eclipse.net4j.signal; + diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/GZIPStreamWrapperInjector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/GZIPStreamWrapperInjector.java index cd9757602f..d0e058cb1e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/GZIPStreamWrapperInjector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/GZIPStreamWrapperInjector.java @@ -1,26 +1,28 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal.wrapping;
-
-import org.eclipse.net4j.util.io.GZIPStreamWrapper;
-
-/**
- * @author Eike Stepper
- */
-public class GZIPStreamWrapperInjector extends StreamWrapperInjector
-{
- public static final GZIPStreamWrapper STREAM_WRAPPER = new GZIPStreamWrapper();
-
- public GZIPStreamWrapperInjector(String protocolID)
- {
- super(protocolID, STREAM_WRAPPER);
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal.wrapping; + +import org.eclipse.net4j.util.io.GZIPStreamWrapper; + +/** + * An {@link StreamWrapperInjector injector} that injects {@link GZIPStreamWrapper} instances. + * + * @author Eike Stepper + */ +public class GZIPStreamWrapperInjector extends StreamWrapperInjector +{ + public static final GZIPStreamWrapper STREAM_WRAPPER = new GZIPStreamWrapper(); + + public GZIPStreamWrapperInjector(String protocolID) + { + super(protocolID, STREAM_WRAPPER); + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/StreamWrapperInjector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/StreamWrapperInjector.java index 2b08f18829..b9488dd807 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/StreamWrapperInjector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/StreamWrapperInjector.java @@ -1,76 +1,79 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal.wrapping;
-
-import org.eclipse.net4j.signal.SignalProtocol;
-import org.eclipse.net4j.util.ObjectUtil;
-import org.eclipse.net4j.util.container.IElementProcessor;
-import org.eclipse.net4j.util.container.IManagedContainer;
-import org.eclipse.net4j.util.io.IStreamWrapper;
-
-/**
- * @author Eike Stepper
- */
-public class StreamWrapperInjector implements IElementProcessor
-{
- private String protocolID;
-
- private IStreamWrapper streamWrapper;
-
- public StreamWrapperInjector(String protocolID, IStreamWrapper streamWrapper)
- {
- this.protocolID = protocolID;
- this.streamWrapper = streamWrapper;
- }
-
- public String getProtocolID()
- {
- return protocolID;
- }
-
- public IStreamWrapper getStreamWrapper()
- {
- return streamWrapper;
- }
-
- public Object process(IManagedContainer container, String productGroup, String factoryType, String description,
- Object element)
- {
- if (element instanceof SignalProtocol<?>)
- {
- SignalProtocol<?> signalProtocol = (SignalProtocol<?>)element;
- if (shouldInject(container, productGroup, factoryType, description, signalProtocol))
- {
- element = inject(container, productGroup, factoryType, description, signalProtocol);
- }
- }
-
- return element;
- }
-
- protected boolean shouldInject(IManagedContainer container, String productGroup, String factoryType,
- String description, SignalProtocol<?> signalProtocol)
- {
- if (signalProtocol.getStreamWrapper() == streamWrapper)
- {
- return false;
- }
-
- return ObjectUtil.equals(signalProtocol.getType(), protocolID);
- }
-
- protected Object inject(IManagedContainer container, String productGroup, String factoryType, String description,
- SignalProtocol<?> signalProtocol)
- {
- signalProtocol.addStreamWrapper(streamWrapper);
- return signalProtocol;
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal.wrapping; + +import org.eclipse.net4j.signal.SignalProtocol; +import org.eclipse.net4j.util.ObjectUtil; +import org.eclipse.net4j.util.container.IElementProcessor; +import org.eclipse.net4j.util.container.IManagedContainer; +import org.eclipse.net4j.util.io.IStreamWrapper; + +/** + * An {@link IElementProcessor element post processor} that injects a {@link #getStreamWrapper() stream wrapper} + * into {@link SignalProtocol signal protocol} instances. + * + * @author Eike Stepper + */ +public class StreamWrapperInjector implements IElementProcessor +{ + private String protocolID; + + private IStreamWrapper streamWrapper; + + public StreamWrapperInjector(String protocolID, IStreamWrapper streamWrapper) + { + this.protocolID = protocolID; + this.streamWrapper = streamWrapper; + } + + public String getProtocolID() + { + return protocolID; + } + + public IStreamWrapper getStreamWrapper() + { + return streamWrapper; + } + + public Object process(IManagedContainer container, String productGroup, String factoryType, String description, + Object element) + { + if (element instanceof SignalProtocol<?>) + { + SignalProtocol<?> signalProtocol = (SignalProtocol<?>)element; + if (shouldInject(container, productGroup, factoryType, description, signalProtocol)) + { + element = inject(container, productGroup, factoryType, description, signalProtocol); + } + } + + return element; + } + + protected boolean shouldInject(IManagedContainer container, String productGroup, String factoryType, + String description, SignalProtocol<?> signalProtocol) + { + if (signalProtocol.getStreamWrapper() == streamWrapper) + { + return false; + } + + return ObjectUtil.equals(signalProtocol.getType(), protocolID); + } + + protected Object inject(IManagedContainer container, String productGroup, String factoryType, String description, + SignalProtocol<?> signalProtocol) + { + signalProtocol.addStreamWrapper(streamWrapper); + return signalProtocol; + } +} diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/XORStreamWrapperInjector.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/XORStreamWrapperInjector.java index e35b671d74..10fd8c131e 100644 --- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/XORStreamWrapperInjector.java +++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/wrapping/XORStreamWrapperInjector.java @@ -1,24 +1,26 @@ -/*
- * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * Eike Stepper - initial API and implementation
- */
-package org.eclipse.net4j.signal.wrapping;
-
-import org.eclipse.net4j.util.io.XORStreamWrapper;
-
-/**
- * @author Eike Stepper
- */
-public class XORStreamWrapperInjector extends StreamWrapperInjector
-{
- public XORStreamWrapperInjector(String protocolID, int[] key)
- {
- super(protocolID, new XORStreamWrapper(key));
- }
-}
+/* + * Copyright (c) 2004 - 2012 Eike Stepper (Berlin, Germany) and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + */ +package org.eclipse.net4j.signal.wrapping; + +import org.eclipse.net4j.util.io.XORStreamWrapper; + +/** + * An {@link StreamWrapperInjector injector} that injects {@link XORStreamWrapper} instances. + * + * @author Eike Stepper + */ +public class XORStreamWrapperInjector extends StreamWrapperInjector +{ + public XORStreamWrapperInjector(String protocolID, int[] key) + { + super(protocolID, new XORStreamWrapper(key)); + } +} |