Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Valenta2002-04-09 15:59:52 +0000
committerMichael Valenta2002-04-09 15:59:52 +0000
commit446685a7ccac4415e59357ab74a96095b6f2d5b6 (patch)
treecf2dc916a40825275286b24c09de1eb004eb2f36
parentffbdefe7719766fdf48d0eb56e0a6c32f4c2a6f8 (diff)
downloadeclipse.platform.team-446685a7ccac4415e59357ab74a96095b6f2d5b6.tar.gz
eclipse.platform.team-446685a7ccac4415e59357ab74a96095b6f2d5b6.tar.xz
eclipse.platform.team-446685a7ccac4415e59357ab74a96095b6f2d5b6.zip
Patch for cancellation
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java2
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java7
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties7
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java25
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java22
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java4
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java273
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java289
8 files changed, 398 insertions, 231 deletions
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java
index 9348e433a..766b25657 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java
@@ -53,7 +53,7 @@ public class CVSProviderPlugin extends Plugin {
// communication timeout with the server
public static final int DEFAULT_TIMEOUT = 60;
// file transfer compression level (0 - 9)
- public static final int DEFAULT_COMPRESSION_LEVEL = 0;
+ public static final int DEFAULT_COMPRESSION_LEVEL = 6;
// cvs plugin extension points and ids
public static final String ID = "org.eclipse.team.cvs.core"; //$NON-NLS-1$
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java
index d7a48f3ae..0ef4862d9 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java
@@ -21,7 +21,6 @@ import java.util.zip.GZIPOutputStream;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.NullProgressMonitor;
import org.eclipse.team.ccvs.core.CVSProviderPlugin;
-import org.eclipse.team.ccvs.core.CVSProviderPlugin;
import org.eclipse.team.ccvs.core.ICVSFile;
import org.eclipse.team.ccvs.core.ICVSFolder;
import org.eclipse.team.ccvs.core.ICVSRepositoryLocation;
@@ -559,6 +558,7 @@ public class Session {
try {
InputStream in = file.getContents();
try {
+ boolean compressed = false;
byte[] buffer = new byte[TRANSFER_BUFFER_SIZE];
if (! isBinary && IS_CRLF_PLATFORM || compressionLevel != 0) {
// this affects the file size, spool the converted copy to an in-memory buffer
@@ -568,6 +568,7 @@ public class Session {
if (compressionLevel != 0) {
try {
zout = new GZIPOutputStream(bout); // apparently does not support specifying compression level
+ compressed = true;
} catch (IOException e) {
throw CVSException.wrapException(e);
}
@@ -576,8 +577,8 @@ public class Session {
}
for (int count; (count = in.read(buffer)) != -1;) zout.write(buffer, 0, count);
zout.close();
- byte[] contents = bout.toByteArray();
in.close();
+ byte[] contents = bout.toByteArray();
in = new ByteArrayInputStream(contents);
size = contents.length;
}
@@ -591,7 +592,7 @@ public class Session {
};
// send the file
String sizeLine = Long.toString(size);
- if (compressionLevel != 0) sizeLine = "z" + sizeLine;
+ if (compressed) sizeLine = "z" + sizeLine;
writeLine(sizeLine);
for (int count; (count = in.read(buffer)) != -1;) out.write(buffer, 0, count);
} finally {
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties
index a1cece4e6..42ea269bf 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties
@@ -242,5 +242,8 @@ EclipseSynchronizer.ErrorSettingIgnorePattern=Cannot set ignored pattern on {0}
EclipseSynchronizer.ErrorDeletingFolderSync=Could not delete all CVS folders
SyncFileChangeListener.errorSettingTeamPrivateFlag=Error setting team-private flag on resource
-PollingInputStream.readTimeout=Timeout during read operation
-PollingOutputStream.writeTimeout=Timeout during write operation
+PollingInputStream.readTimeout=Timeout while reading from input stream
+PollingInputStream.closeTimeout=Timeout while closing input stream
+PollingOutputStream.writeTimeout=Timeout while writing to output stream
+PollingOutputStream.closeTimeout=Timeout while closing output stream
+TimeoutOutputStream.cannotWriteToStream=Cannot write to output stream
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java
index 931b470c3..6e3f81e13 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java
@@ -48,6 +48,31 @@ public class PollingInputStream extends FilterInputStream {
/**
* Wraps the underlying stream's method.
+ * It may be important to wait for an input stream to be closed because it
+ * holds an implicit lock on a system resoure (such as a file) while it is
+ * open. Closing a stream may take time if the underlying stream is still
+ * servicing a previous request.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times
+ * @throws IOException if an i/o error occurs
+ */
+ public void close() throws IOException {
+ int attempts = 0;
+ for (;;) {
+ try {
+ in.close();
+ return;
+ } catch (InterruptedIOException e) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ if (++attempts == numAttempts)
+ throw new InterruptedIOException(Policy.bind("PollingInputStream.closeTimeout"));
+ if (DEBUG) System.out.println("close retry=" + attempts);
+ }
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
* @throws OperationCanceledException if the progress monitor is canceled
* @throws InterruptedIOException if the underlying operation times out numAttempts times
* and no data was received, bytesTransferred will be zero
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java
index c3597893b..5e6ce7ff5 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java
@@ -141,10 +141,22 @@ public class PollingOutputStream extends FilterOutputStream {
* @throws IOException if an i/o error occurs
*/
public void close() throws IOException {
- try {
- flush();
- } finally {
- out.close();
- }
+ int attempts = numAttempts - 1; // fail fast if flush() does times out
+ try {
+ flush();
+ attempts = 0;
+ } finally {
+ for (;;) {
+ try {
+ out.close();
+ return;
+ } catch (InterruptedIOException e) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ if (++attempts == numAttempts)
+ throw new InterruptedIOException(Policy.bind("PollingOutputStream.closeTimeout"));
+ if (DEBUG) System.out.println("close retry=" + attempts);
+ }
+ }
+ }
}
} \ No newline at end of file
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java
index 17498cc42..54489979f 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java
@@ -90,9 +90,9 @@ public class SizeConstrainedInputStream extends FilterInputStream {
* @throws IOException if an i/o error occurs
*/
public int read(byte[] buffer, int offset, int length) throws IOException {
- if (length >= bytesRemaining) {
+ if (length > bytesRemaining) {
+ if (bytesRemaining == 0) return -1;
length = (int) bytesRemaining;
- if (length == 0) return -1;
}
try {
int count = in.read(buffer, offset, length);
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java
index 72c8eb495..744e50249 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java
@@ -29,58 +29,72 @@ import java.io.InterruptedIOException;
public class TimeoutInputStream extends FilterInputStream {
private byte[] iobuffer; // circular buffer
private int head = 0; // points to first unread byte
- private int length = 0; // number of remaining unread bytes, -1 if closed
-
- private long readTimeout;
- private long closeTimeout;
- private Thread thread = new Thread(new FillBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$
+ private int length = 0; // number of remaining unread bytes
+ private boolean eof = false; // if true, EOF encountered
+ private boolean closeRequested = false; // if true, close requested
private IOException ioe = null;
private RuntimeException re = null;
+ private long readTimeout; // read() timeout in millis
+ private long closeTimeout; // close() timeout in millis, or -1
+ private Thread thread;
+
/**
* Creates a timeout wrapper for an input stream.
* @param in the underlying input stream
* @param bufferSize the buffer size in bytes; should be large enough to mitigate
* Thread synchronization and context switching overhead
* @param readTimeout the number of milliseconds to block for a read() or skip() before
- * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ * throwing an InterruptedIOException; 0 blocks indefinitely
* @param closeTimeout the number of milliseconds to block for a close() before throwing
- * an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ * an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background
*/
public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) {
super(in);
this.iobuffer = new byte[bufferSize];
this.readTimeout = readTimeout;
this.closeTimeout = closeTimeout;
+ thread = new Thread(new FillBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$
thread.setDaemon(true);
thread.start();
}
/**
* Wraps the underlying stream's method.
+ * It may be important to wait for a stream to actually be closed because it
+ * holds an implicit lock on a system resoure (such as a file) while it is
+ * open. Closing a stream may take time if the underlying stream is still
+ * servicing a previous request.
+ * @throws InterruptedIOException if the timeout expired
* @throws IOException if an i/o error occurs
*/
public void close() throws IOException {
- if (thread == null) return;
- Thread oldThread = thread;
- thread = null;
- oldThread.interrupt();
- if (closeTimeout != -1) {
- try {
- oldThread.join(closeTimeout);
- } catch (InterruptedException e) {
- }
+ Thread oldThread;
+ synchronized (this) {
+ if (thread == null) return;
+ oldThread = thread;
+ closeRequested = true;
+ thread.interrupt();
+ checkError();
+ }
+ if (closeTimeout == -1) return;
+ try {
+ oldThread.join(closeTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
}
synchronized (this) {
- if (ioe != null) throw ioe;
- if (re != null) throw re;
+ checkError();
+ if (thread != null) throw new InterruptedIOException();
}
}
/**
* Returns the number of unread bytes in the buffer.
+ * @throws IOException if an i/o error occurs
*/
public synchronized int available() throws IOException {
+ if (length == 0) checkError();
return length > 0 ? length : 0;
}
@@ -91,12 +105,17 @@ public class TimeoutInputStream extends FilterInputStream {
* @throws IOException if an i/o error occurs
*/
public synchronized int read() throws IOException {
- syncfill();
- if (length == -1) return -1;
- int b = iobuffer[head++];
+ if (length == 0) checkError();
+ syncFill();
+ if (length == 0) {
+ checkError();
+ if (eof) return -1;
+ throw new InterruptedIOException();
+ }
+ int b = iobuffer[head++] & 255;
if (head == iobuffer.length) head = 0;
length--;
- asyncfill();
+ asyncFill();
return b;
}
@@ -107,9 +126,13 @@ public class TimeoutInputStream extends FilterInputStream {
* @throws IOException if an i/o error occurs
*/
public synchronized int read(byte[] buffer, int off, int len) throws IOException {
- if (len == 0) return 0;
- syncfill();
- if (length == -1) return -1;
+ if (length == 0) checkError();
+ syncFill();
+ if (length == 0) {
+ checkError();
+ if (eof) return -1;
+ throw new InterruptedIOException();
+ }
int pos = off;
if (len > length) len = length;
while (len-- > 0) {
@@ -117,7 +140,7 @@ public class TimeoutInputStream extends FilterInputStream {
if (head == iobuffer.length) head = 0;
length--;
}
- asyncfill();
+ asyncFill();
return pos - off;
}
@@ -128,16 +151,23 @@ public class TimeoutInputStream extends FilterInputStream {
* @throws IOException if an i/o error occurs
*/
public synchronized long skip(long count) throws IOException {
+ if (length == 0) checkError();
long amount = 0;
try {
- while (count != 0 && length != -1) {
+ while (count != 0) {
int skip = (count > length) ? length : (int) count;
head = (head + skip) % iobuffer.length;
length -= skip;
amount += skip;
count -= skip;
- syncfill();
+ syncFill();
+ if (length == 0) {
+ checkError();
+ if (eof) break;
+ throw new InterruptedIOException();
+ }
}
+ asyncFill();
return amount;
} catch (InterruptedIOException e) {
e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT
@@ -152,102 +182,129 @@ public class TimeoutInputStream extends FilterInputStream {
return false;
}
- private void syncfill() throws IOException {
- if (length == 0) {
- asyncfill();
- if (readTimeout != -1) {
- try {
- wait(readTimeout);
- } catch (InterruptedException e) {
- }
- }
- if (length == 0) {
- throw new InterruptedIOException();
- }
+ /*
+ * Waits for the buffer to fill if it is empty and the stream has not reached EOF.
+ * The buffer might still be empty when this method returns if the operation timed out
+ * or EOF was encountered.
+ */
+ private void syncFill() throws IOException {
+ if (length != 0 || eof) return;
+ notify();
+ try {
+ wait(readTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
}
}
- private void asyncfill() throws IOException {
- try {
- if (ioe != null) {
- IOException e = ioe;
- ioe = null;
- throw e;
- }
- if (re != null) {
- RuntimeException e = re;
- re = null;
- throw e;
- }
- } finally {
- if (length != -1 && length != iobuffer.length) {
- notify();
- }
+ /*
+ * Notifies the background thread that some bytes were read so that it can fill the buffer
+ * asynchronously in the background.
+ */
+ private void asyncFill() {
+ if ((length != iobuffer.length && ! eof) || closeRequested) notify();
+ }
+
+ /*
+ * Checks if exceptions are pending and throws the next one, if any.
+ */
+ private void checkError() throws IOException {
+ if (ioe != null) {
+ IOException e = ioe;
+ ioe = null;
+ throw e;
+ }
+ if (re != null) {
+ RuntimeException e = re;
+ re = null;
+ throw e;
}
}
private class FillBufferRunnable implements Runnable {
+ private final Object lock = TimeoutInputStream.this;
+
public void run() {
- final Object lock = TimeoutInputStream.this;
try {
- boolean eof = false;
- for (;;) {
- int off, len;
+ readUntilDone();
+ waitUntilClosed();
+ } catch (IOException e) {
+ synchronized (lock) { ioe = e; }
+ waitUntilClosed();
+ } catch (RuntimeException e) {
+ synchronized (lock) { re = e; }
+ waitUntilClosed();
+ } finally {
+ /*** Closes the stream and sets thread to null when done ***/
+ try {
+ in.close();
+ } catch (IOException e) {
+ synchronized (lock) { ioe = e; }
+ } catch (RuntimeException e) {
+ synchronized (lock) { re = e; }
+ } finally {
synchronized (lock) {
+ eof = true;
+ thread = null;
+ lock.notify();
+ }
+ }
+ }
+ }
+
+ /**
+ * Reads bytes into the buffer until EOF, closed, or error.
+ */
+ private void readUntilDone() throws IOException {
+ boolean pause = false;
+ for (;;) {
+ int off, len;
+ synchronized (lock) {
+ for (;;) {
+ if (closeRequested || eof) return; // quit signal
+ if (length != iobuffer.length && ! pause) break;
+ pause = false;
try {
- while (thread != null && (length == iobuffer.length || eof || ioe != null || re != null)) {
- lock.wait();
- }
- if (thread == null) return; // quit signal
+ lock.wait();
} catch (InterruptedException e) {
- return; // alternative quit signal
- }
- off = (head + length) % iobuffer.length;
- len = ((head > off) ? head : iobuffer.length) - off;
- }
- try {
- // the i/o operation might block without releasing the lock,
- // so we do this outside of the synchronized block
- int count = in.read(iobuffer, off, len);
- if (count == -1) eof = true;
- synchronized (lock) {
- if (eof) {
- if (length == 0) length = -1;
- } else {
- length += count;
- }
- if (count != 0) lock.notify();
- }
- } catch (InterruptedIOException e) {
- int count = e.bytesTransferred; // keep partial transfer
- e.bytesTransferred = 0; // not relevant if rethrown
- synchronized (lock) {
- if (length != -1) length += count;
- ioe = e;
- lock.notify();
- }
- } catch (IOException e) {
- synchronized (lock) {
- ioe = e;
- lock.notify();
- }
- } catch (RuntimeException e) {
- synchronized (lock) {
- re = e;
- lock.notify();
+ closeRequested = true; // alternate quit signal
}
}
+ off = (head + length) % iobuffer.length;
+ len = ((head > off) ? head : iobuffer.length) - off;
}
- } finally {
+ int count;
try {
- in.close();
- } catch (IOException e) {
- synchronized (lock) {
- ioe = e;
- }
- } catch (RuntimeException e) {
- synchronized (lock) {
- re = e;
+ // the i/o operation might block without releasing the lock,
+ // so we do this outside of the synchronized block
+ count = in.read(iobuffer, off, len);
+ } catch (InterruptedIOException e) {
+ // keep partial transfer
+ count = e.bytesTransferred;
+ e.bytesTransferred = 0;
+ synchronized (lock) { ioe = e; }
+ }
+ synchronized (lock) {
+ if (count == -1) return;
+ if (count == 0) pause = true;
+ length += count;
+ lock.notify();
+ }
+ }
+ }
+
+ /**
+ * Waits until we have been requested to close the stream.
+ */
+ private void waitUntilClosed() {
+ synchronized (lock) {
+ eof = true;
+ lock.notify();
+ while (! closeRequested) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ closeRequested = true; // alternate quit signal
}
}
}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java
index 201b804ba..11b085f5f 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java
@@ -15,6 +15,8 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import org.eclipse.team.internal.ccvs.core.Policy;
+
/**
* Wraps an output stream that blocks indefinitely to simulate timeouts on write(),
* flush(), and close(). The resulting output stream is buffered and supports
@@ -30,57 +32,65 @@ public class TimeoutOutputStream extends FilterOutputStream {
private byte[] iobuffer; // circular buffer
private int head = 0; // points to first unwritten byte
private int length = 0; // number of remaining unwritten bytes
-
- private long writeTimeout;
- private long closeTimeout;
- private boolean flushRequested = false;
- private Thread thread = new Thread(new CommitBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$
+ private boolean cannotWrite = false; // if true, writes will not be honoured
+ private boolean closeRequested = false; // if true, close requested
+ private boolean flushRequested = false; // if true, flush requested
private IOException ioe = null;
private RuntimeException re = null;
+ private long writeTimeout; // write() timeout in millis
+ private long closeTimeout; // close() timeout in millis, or -1
+ private Thread thread;
+
/**
* Creates a timeout wrapper for an output stream.
* @param out the underlying input stream
* @param bufferSize the buffer size in bytes; should be large enough to mitigate
* Thread synchronization and context switching overhead
* @param writeTimeout the number of milliseconds to block for a write() or flush() before
- * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ * throwing an InterruptedIOException; 0 blocks indefinitely
* @param closeTimeout the number of milliseconds to block for a close() before throwing
- * an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ * an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background
*/
public TimeoutOutputStream(OutputStream out, int bufferSize, long writeTimeout, long closeTimeout) {
super(out);
this.iobuffer = new byte[bufferSize];
this.writeTimeout = writeTimeout;
this.closeTimeout = closeTimeout;
+ thread = new Thread(new CommitBufferRunnable(), "TimeoutOutputStream");//$NON-NLS-1$
thread.setDaemon(true);
thread.start();
}
/**
* Wraps the underlying stream's method.
+ * It may be important to wait for a stream to actually be closed because it
+ * holds an implicit lock on a system resoure (such as a file) while it is
+ * open. Closing a stream may take time if the underlying stream is still
+ * servicing a previous request.
* @throws InterruptedIOException if the timeout expired, bytesTransferred will
* reflect the number of bytes flushed from the buffer
* @throws IOException if an i/o error occurs
*/
public void close() throws IOException {
- if (thread == null) return;
+ Thread oldThread;
+ synchronized (this) {
+ if (thread == null) return;
+ oldThread = thread;
+ closeRequested = true;
+ flushRequested = true;
+ thread.interrupt();
+ checkError();
+ }
+ if (closeTimeout == -1) return;
try {
- flush();
- } finally {
- Thread oldThread = thread;
- thread = null;
- oldThread.interrupt();
- if (closeTimeout != -1) {
- try {
- oldThread.join(closeTimeout);
- } catch (InterruptedException e) {
- }
- }
- synchronized (this) {
- if (ioe != null) throw ioe;
- if (re != null) throw re;
- }
+ oldThread.join(closeTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
+ }
+ synchronized (this) {
+ checkError();
+ if (thread != null) throw new InterruptedIOException();
}
}
@@ -91,10 +101,15 @@ public class TimeoutOutputStream extends FilterOutputStream {
* @throws IOException if an i/o error occurs
*/
public synchronized void write(int b) throws IOException {
- if (length == iobuffer.length) synccommit();
+ if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream"));
+ checkError();
+ if (length == iobuffer.length) {
+ syncCommit();
+ if (length == iobuffer.length) throw new InterruptedIOException();
+ }
iobuffer[(head + length) % iobuffer.length] = (byte) b;
length++;
- asynccommit();
+ asyncCommit();
}
/**
@@ -104,19 +119,24 @@ public class TimeoutOutputStream extends FilterOutputStream {
* @throws IOException if an i/o error occurs
*/
public synchronized void write(byte[] buffer, int off, int len) throws IOException {
+ if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream"));
+ checkError();
int amount = 0;
try {
while (len-- > 0) {
- if (length == iobuffer.length) synccommit();
+ if (length == iobuffer.length) {
+ syncCommit();
+ if (length == iobuffer.length) throw new InterruptedIOException();
+ }
iobuffer[(head + length) % iobuffer.length] = buffer[off++];
length++;
amount++;
}
- asynccommit();
} catch (InterruptedIOException e) {
e.bytesTransferred = amount;
throw e;
}
+ asyncCommit();
}
/**
@@ -126,116 +146,165 @@ public class TimeoutOutputStream extends FilterOutputStream {
* @throws IOException if an i/o error occurs
*/
public synchronized void flush() throws IOException {
- int oldLength = length;
+ if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream"));
+ checkError();
flushRequested = true;
- InterruptedIOException iioe = null;
+ int amount = 0;
try {
- synccommit();
- if (length == 0) return;
- iioe = new InterruptedIOException();
+ while (flushRequested && length != 0) {
+ int oldLength = length;
+ syncCommit();
+ amount += oldLength - length;
+ if (length == oldLength) throw new InterruptedIOException();
+ }
} catch (InterruptedIOException e) {
- iioe = e;
+ e.bytesTransferred = amount;
+ throw e;
}
- iioe.bytesTransferred = oldLength - length;
- throw iioe;
+ asyncCommit();
}
- private void synccommit() throws IOException {
- asynccommit();
- if (length != 0) {
- int oldLength = length;
- if (writeTimeout != -1) {
- try {
- wait(writeTimeout);
- } catch (InterruptedException e) {
- }
- }
- if (length == oldLength) {
- throw new InterruptedIOException();
- }
+ /*
+ * Waits for the buffer to drain.
+ * The buffer might still be at the same level when this method returns if the operation timed out.
+ */
+ private void syncCommit() throws IOException {
+ notify();
+ try {
+ wait(writeTimeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
}
+ checkError();
+ if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream"));
}
- private void asynccommit() throws IOException {
- try {
- if (ioe != null) {
- IOException e = ioe;
- ioe = null;
- throw e;
- }
- if (re != null) {
- RuntimeException e = re;
- re = null;
- throw e;
- }
- } finally {
- if (length != 0 || flushRequested) notify();
+ /*
+ * Notifies the background thread that some bytes were written so that it can drain the buffer
+ * asynchronously in the background.
+ */
+ private void asyncCommit() throws IOException {
+ if (length != 0 || flushRequested || closeRequested) notify();
+ }
+
+ /*
+ * Checks if exceptions are pending and throws the next one, if any.
+ */
+ private void checkError() throws IOException {
+ if (ioe != null) {
+ IOException e = ioe;
+ ioe = null;
+ throw e;
+ }
+ if (re != null) {
+ RuntimeException e = re;
+ re = null;
+ throw e;
}
}
private class CommitBufferRunnable implements Runnable {
+ private final Object lock = TimeoutOutputStream.this;
+
public void run() {
- final Object lock = TimeoutOutputStream.this;
- boolean running = true;
try {
- for (;;) {
- int off, len;
+ writeUntilDone();
+ waitUntilClosed();
+ } catch (IOException e) {
+ synchronized (lock) { ioe = e; }
+ waitUntilClosed();
+ } catch (RuntimeException e) {
+ synchronized (lock) { re = e; }
+ waitUntilClosed();
+ } finally {
+ /*** Closes the stream and sets thread to null when done ***/
+ try {
+ out.close();
+ } catch (IOException e) {
+ synchronized (lock) { ioe = e; }
+ } catch (RuntimeException e) {
+ synchronized (lock) { re = e; }
+ } finally {
synchronized (lock) {
+ cannotWrite = true;
+ thread = null;
+ lock.notify();
+ }
+ }
+ }
+ }
+
+ /**
+ * Writes bytes from the buffer until closed and buffer is empty
+ */
+ private void writeUntilDone() throws IOException {
+ boolean pause = false;
+ boolean mustFlush = false;
+ for (;;) {
+ int off, len;
+ synchronized (lock) {
+ for (;;) {
+ if (closeRequested && length == 0) return; // quit signal
+ if ((mustFlush || flushRequested || length != 0) && ! pause) break;
+ pause = false;
try {
- while (running && thread != null && ((length == 0 && ! flushRequested) || ioe != null || re != null)) {
- lock.wait();
- }
- if (thread == null) running = false; // quit signal
- if (! running && length == 0) return;
+ lock.wait();
} catch (InterruptedException e) {
- running = false; // alternative quit signal
+ closeRequested = true; // alternate quit signal
}
- off = head;
- len = iobuffer.length - head;
- if (len > length) len = length;
}
+ off = head;
+ len = iobuffer.length - head;
+ if (len > length) len = length;
+ if (flushRequested) {
+ mustFlush = true;
+ flushRequested = false;
+ }
+ }
+ if (len != 0) {
try {
// the i/o operation might block without releasing the lock,
// so we do this outside of the synchronized block
- if (len != 0) out.write(iobuffer, off, len);
- if (flushRequested) out.flush();
- synchronized (lock) {
- flushRequested = false;
- head = (head + len) % iobuffer.length;
- length -= len;
- lock.notify();
- }
+ out.write(iobuffer, off, len);
} catch (InterruptedIOException e) {
- len = e.bytesTransferred; // keep partial transfer
- e.bytesTransferred = 0; // not relevant if rethrown
- synchronized (lock) {
- head = (head + len) % iobuffer.length;
- length -= len;
- ioe = e;
- lock.notify();
- }
- } catch (IOException e) {
- synchronized (lock) {
- ioe = e;
- lock.notify();
- }
- } catch (RuntimeException e) {
- synchronized (lock) {
- re = e;
- lock.notify();
- }
+ len = e.bytesTransferred;
+ e.bytesTransferred = 0;
+ if (len == 0) pause = true;
+ synchronized (lock) { ioe = e; }
}
- }
- } finally {
- try {
- out.close();
- } catch (IOException e) {
synchronized (lock) {
- ioe = e;
- }
- } catch (RuntimeException e) {
+ head = (head + len) % iobuffer.length;
+ length -= len;
+ lock.notify();
+ }
+ } else {
+ try {
+ out.flush();
+ mustFlush = false;
+ } catch (InterruptedIOException e) {
+ if (e.bytesTransferred == 0) pause = true;
+ e.bytesTransferred = 0;
+ synchronized (lock) { ioe = e; }
+ }
synchronized (lock) {
- re = e;
+ lock.notify();
+ }
+ }
+ }
+ }
+
+ /**
+ * Waits until we have been requested to close the stream.
+ */
+ private void waitUntilClosed() {
+ synchronized (lock) {
+ cannotWrite = true;
+ lock.notify();
+ while (! closeRequested) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ closeRequested = true; // alternate quit signal
}
}
}

Back to the top