Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEike Stepper2008-11-13 19:39:25 +0000
committerEike Stepper2008-11-13 19:39:25 +0000
commit0256e30cbcaf878a10c28fa40577fc14d2db47f9 (patch)
treec39b82347df6070282e3ee0ce46e930f7559da30 /plugins/org.eclipse.net4j/src
parentb0f890250078e2596203bcefddaeb49d4cd7a8ba (diff)
downloadcdo-0256e30cbcaf878a10c28fa40577fc14d2db47f9.tar.gz
cdo-0256e30cbcaf878a10c28fa40577fc14d2db47f9.tar.xz
cdo-0256e30cbcaf878a10c28fa40577fc14d2db47f9.zip
[251751] Provide progress monitoring for commit operations
https://bugs.eclipse.org/bugs/show_bug.cgi?id=251751
Diffstat (limited to 'plugins/org.eclipse.net4j/src')
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java35
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java36
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java36
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java53
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java42
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java102
-rw-r--r--plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/RetryFailOverStrategy.java49
7 files changed, 185 insertions, 168 deletions
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
index efee74ca17..6e6d5033df 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Request.java
@@ -12,11 +12,8 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import java.io.IOException;
-
/**
* @author Eike Stepper
*/
@@ -56,37 +53,9 @@ public abstract class Request extends SignalActor
}
@Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- IChannel channel = null;
-
- for (;;)
- {
- try
- {
- channel = getProtocol().getChannel();
- doOutput(out);
- break;
- }
- catch (IOException ex)
- {
- if (getProtocol().handleFailOver(this, channel))
- {
- resetting();
- }
- else
- {
- throw ex;
- }
- }
- }
- }
-
- /**
- * @since 2.0
- */
- protected void resetting()
+ void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
{
+ doOutput(out);
}
protected abstract void requesting(ExtendedDataOutputStream out) throws Exception;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
index ce91b3e60c..cd3633ba1a 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithConfirmation.java
@@ -12,11 +12,9 @@ package org.eclipse.net4j.signal;
import org.eclipse.net4j.buffer.BufferInputStream;
import org.eclipse.net4j.buffer.BufferOutputStream;
-import org.eclipse.net4j.channel.IChannel;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -94,38 +92,10 @@ public abstract class RequestWithConfirmation<RESULT> extends SignalActor
}
@Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- IChannel channel = null;
-
- for (;;)
- {
- try
- {
- channel = getProtocol().getChannel();
- doOutput(out);
- doInput(in);
- break;
- }
- catch (IOException ex)
- {
- if (getProtocol().handleFailOver(this, channel))
- {
- resetting();
- }
- else
- {
- throw ex;
- }
- }
- }
- }
-
- /**
- * @since 2.0
- */
- protected void resetting()
+ void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
{
+ doOutput(out);
+ doInput(in);
}
protected abstract void requesting(ExtendedDataOutputStream out) throws Exception;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
index 82cafef25d..4e17375f5e 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/RequestWithMonitoring.java
@@ -16,8 +16,8 @@ import org.eclipse.net4j.util.ImplementationError;
import org.eclipse.net4j.util.concurrent.ConcurrencyUtil;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.net4j.util.om.monitor.Monitor;
+import org.eclipse.net4j.util.om.monitor.OMMonitor;
import org.eclipse.internal.net4j.bundle.OM;
@@ -105,23 +105,6 @@ public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmat
}
@Override
- protected void execute(BufferInputStream in, BufferOutputStream out) throws Exception
- {
- try
- {
- super.execute(in, out);
- }
- finally
- {
- remoteMonitor.done();
- remoteMonitor = null;
-
- mainMonitor.done();
- mainMonitor = null;
- }
- }
-
- @Override
protected final void requesting(ExtendedDataOutputStream out) throws Exception
{
int remoteWork = 100 - getRequestingWorkPercent() - getConfirmingWorkPercent();
@@ -219,6 +202,23 @@ public abstract class RequestWithMonitoring<RESULT> extends RequestWithConfirmat
return 25;
}
+ @Override
+ void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ try
+ {
+ super.doExecute(in, out);
+ }
+ finally
+ {
+ remoteMonitor.done();
+ remoteMonitor = null;
+
+ mainMonitor.done();
+ mainMonitor = null;
+ }
+ }
+
void setMonitorProgress(int totalWork, int work)
{
getBufferInputStream().restartTimeout();
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
index ddb271a1c1..5d135ce017 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/Signal.java
@@ -15,7 +15,6 @@ import org.eclipse.net4j.buffer.BufferOutputStream;
import org.eclipse.net4j.util.ReflectUtil;
import org.eclipse.net4j.util.io.ExtendedDataInputStream;
import org.eclipse.net4j.util.io.ExtendedDataOutputStream;
-import org.eclipse.net4j.util.io.IORuntimeException;
import org.eclipse.net4j.util.om.trace.ContextTracer;
import org.eclipse.internal.net4j.bundle.OM;
@@ -205,56 +204,28 @@ public abstract class Signal implements Runnable
return null;
}
- protected InputStream wrapInputStream(InputStream in)
+ protected InputStream wrapInputStream(InputStream in) throws IOException
{
- try
- {
- currentStream = protocol.wrapInputStream(in);
- return (InputStream)currentStream;
- }
- catch (IOException ex)
- {
- throw new IORuntimeException(ex);
- }
+ currentStream = protocol.wrapInputStream(in);
+ return (InputStream)currentStream;
}
- protected OutputStream wrapOutputStream(OutputStream out)
+ protected OutputStream wrapOutputStream(OutputStream out) throws IOException
{
- try
- {
- currentStream = protocol.wrapOutputStream(out);
- return (OutputStream)currentStream;
- }
- catch (IOException ex)
- {
- throw new IORuntimeException(ex);
- }
+ currentStream = protocol.wrapOutputStream(out);
+ return (OutputStream)currentStream;
}
- protected void finishInputStream(InputStream in)
+ protected void finishInputStream(InputStream in) throws IOException
{
- try
- {
- currentStream = null;
- protocol.finishInputStream(in);
- }
- catch (IOException ex)
- {
- throw new IORuntimeException(ex);
- }
+ currentStream = null;
+ protocol.finishInputStream(in);
}
- protected void finishOutputStream(OutputStream out)
+ protected void finishOutputStream(OutputStream out) throws IOException
{
- try
- {
- currentStream = null;
- protocol.finishOutputStream(out);
- }
- catch (IOException ex)
- {
- throw new IORuntimeException(ex);
- }
+ currentStream = null;
+ protocol.finishOutputStream(out);
}
protected abstract void execute(BufferInputStream in, BufferOutputStream out) throws Exception;
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
index a46333efd9..d1ee5b9bf3 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalActor.java
@@ -10,6 +10,12 @@
**************************************************************************/
package org.eclipse.net4j.signal;
+import org.eclipse.net4j.buffer.BufferInputStream;
+import org.eclipse.net4j.buffer.BufferOutputStream;
+import org.eclipse.net4j.channel.IChannel;
+
+import java.io.IOException;
+
/**
* @author Eike Stepper
*/
@@ -42,6 +48,42 @@ public abstract class SignalActor extends Signal
setCorrelationID(protocol.getNextCorrelationID());
}
+ /**
+ * @since 2.0
+ */
+ protected void resetting()
+ {
+ }
+
+ @Override
+ protected final void execute(BufferInputStream in, BufferOutputStream out) throws Exception
+ {
+ IChannel channel = null;
+
+ for (;;)
+ {
+ try
+ {
+ channel = getProtocol().getChannel();
+ doExecute(in, out);
+ break;
+ }
+ catch (IOException ex)
+ {
+ if (getProtocol().handleFailOver(this, channel))
+ {
+ resetting();
+ }
+ else
+ {
+ throw ex;
+ }
+ }
+ }
+ }
+
+ abstract void doExecute(BufferInputStream in, BufferOutputStream out) throws Exception;
+
@Override
String getInputMeaning()
{
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
index d21022579f..3bc88800ad 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/SignalProtocol.java
@@ -190,42 +190,6 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
return true;
}
- public InputStream wrapInputStream(InputStream in) throws IOException
- {
- if (streamWrapper != null)
- {
- in = streamWrapper.wrapInputStream(in);
- }
-
- return in;
- }
-
- public OutputStream wrapOutputStream(OutputStream out) throws IOException
- {
- if (streamWrapper != null)
- {
- out = streamWrapper.wrapOutputStream(out);
- }
-
- return out;
- }
-
- public void finishInputStream(InputStream in) throws IOException
- {
- if (streamWrapper != null)
- {
- streamWrapper.finishInputStream(in);
- }
- }
-
- public void finishOutputStream(OutputStream out) throws IOException
- {
- if (streamWrapper != null)
- {
- streamWrapper.finishOutputStream(out);
- }
- }
-
public void handleBuffer(IBuffer buffer)
{
ByteBuffer byteBuffer = buffer.getByteBuffer();
@@ -247,13 +211,17 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
short signalID = byteBuffer.getShort();
if (TRACER.isEnabled())
{
- TRACER.trace("Got signal id " + signalID); //$NON-NLS-1$
+ TRACER.trace("Got signalID: " + signalID); //$NON-NLS-1$
}
signal = provideSignalReactor(signalID);
signal.setCorrelationID(-correlationID);
signal.setBufferInputStream(new SignalInputStream(getTimeout()));
- signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false));
+ if (signal instanceof IndicationWithResponse)
+ {
+ signal.setBufferOutputStream(new SignalOutputStream(-correlationID, signalID, false));
+ }
+
signals.put(-correlationID, signal);
getExecutorService().execute(signal);
}
@@ -369,13 +337,53 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
return correlationID;
}
+ InputStream wrapInputStream(InputStream in) throws IOException
+ {
+ if (streamWrapper != null)
+ {
+ in = streamWrapper.wrapInputStream(in);
+ }
+
+ return in;
+ }
+
+ OutputStream wrapOutputStream(OutputStream out) throws IOException
+ {
+ if (streamWrapper != null)
+ {
+ out = streamWrapper.wrapOutputStream(out);
+ }
+
+ return out;
+ }
+
+ void finishInputStream(InputStream in) throws IOException
+ {
+ if (streamWrapper != null)
+ {
+ streamWrapper.finishInputStream(in);
+ }
+ }
+
+ void finishOutputStream(OutputStream out) throws IOException
+ {
+ if (streamWrapper != null)
+ {
+ streamWrapper.finishOutputStream(out);
+ }
+ }
+
void startSignal(SignalActor signalActor, long timeout) throws Exception
{
checkArg(signalActor.getProtocol() == this, "Wrong protocol");
short signalID = signalActor.getID();
int correlationID = signalActor.getCorrelationID();
signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true));
- signalActor.setBufferInputStream(new SignalInputStream(timeout));
+ if (signalActor instanceof RequestWithConfirmation)
+ {
+ signalActor.setBufferInputStream(new SignalInputStream(timeout));
+ }
+
synchronized (signals)
{
signals.put(correlationID, signalActor);
@@ -402,12 +410,24 @@ public abstract class SignalProtocol<INFRA_STRUCTURE> extends Protocol<INFRA_STR
{
synchronized (failOverStrategy)
{
- if (!failingOver && originalChannel == getChannel())
+ failingOver = true;
+ if (originalChannel == getChannel())
{
- failingOver = true;
failOverStrategy.handleFailOver(this);
}
+ // Set new OutputStream
+ int correlationID = signalActor.getCorrelationID();
+ short signalID = signalActor.getID();
+ signalActor.setBufferOutputStream(new SignalOutputStream(correlationID, signalID, true));
+
+ // Set new InputStream
+ if (signalActor instanceof RequestWithConfirmation)
+ {
+ long timeout = signalActor.getBufferInputStream().getMillisBeforeTimeout();
+ signalActor.setBufferInputStream(new SignalInputStream(timeout));
+ }
+
return true;
}
}
diff --git a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/RetryFailOverStrategy.java b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/RetryFailOverStrategy.java
index dced8a9073..97252768b8 100644
--- a/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/RetryFailOverStrategy.java
+++ b/plugins/org.eclipse.net4j/src/org/eclipse/net4j/signal/failover/RetryFailOverStrategy.java
@@ -12,20 +12,65 @@ package org.eclipse.net4j.signal.failover;
import org.eclipse.net4j.connector.IConnector;
import org.eclipse.net4j.signal.ISignalProtocol;
+import org.eclipse.net4j.util.WrappedException;
+
+import org.eclipse.internal.net4j.bundle.OM;
/**
* @author Eike Stepper
*/
public class RetryFailOverStrategy extends NOOPFailOverStrategy
{
- public RetryFailOverStrategy(IConnector connector)
+ /**
+ * @since 2.0
+ */
+ public static final int RETRY_FOREVER = Integer.MAX_VALUE;
+
+ private int retries;
+
+ /**
+ * @since 2.0
+ */
+ public RetryFailOverStrategy(IConnector connector, int retries)
{
super(connector);
+ this.retries = retries;
+ }
+
+ public RetryFailOverStrategy(IConnector connector)
+ {
+ this(connector, RETRY_FOREVER);
+ }
+
+ /**
+ * @since 2.0
+ */
+ public int getRetries()
+ {
+ return retries;
}
@Override
public void handleFailOver(ISignalProtocol<?> protocol)
{
- handleOpen(protocol);
+ Exception exception = null;
+ for (int i = 0; i < retries; i++)
+ {
+ try
+ {
+ handleOpen(protocol);
+ return;
+ }
+ catch (Exception ex)
+ {
+ OM.LOG.error(ex);
+ exception = ex;
+ }
+ }
+
+ if (exception != null)
+ {
+ throw WrappedException.wrap(exception);
+ }
}
}

Back to the top