From 446685a7ccac4415e59357ab74a96095b6f2d5b6 Mon Sep 17 00:00:00 2001 From: Michael Valenta Date: Tue, 9 Apr 2002 15:59:52 +0000 Subject: Patch for cancellation --- .../team/internal/ccvs/core/CVSProviderPlugin.java | 2 +- .../team/internal/ccvs/core/client/Session.java | 7 +- .../team/internal/ccvs/core/messages.properties | 7 +- .../ccvs/core/streams/PollingInputStream.java | 25 ++ .../ccvs/core/streams/PollingOutputStream.java | 22 +- .../core/streams/SizeConstrainedInputStream.java | 4 +- .../ccvs/core/streams/TimeoutInputStream.java | 273 +++++++++++-------- .../ccvs/core/streams/TimeoutOutputStream.java | 289 +++++++++++++-------- 8 files changed, 398 insertions(+), 231 deletions(-) (limited to 'bundles/org.eclipse.team.cvs.core/src') diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java index 9348e433a..766b25657 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java @@ -53,7 +53,7 @@ public class CVSProviderPlugin extends Plugin { // communication timeout with the server public static final int DEFAULT_TIMEOUT = 60; // file transfer compression level (0 - 9) - public static final int DEFAULT_COMPRESSION_LEVEL = 0; + public static final int DEFAULT_COMPRESSION_LEVEL = 6; // cvs plugin extension points and ids public static final String ID = "org.eclipse.team.cvs.core"; //$NON-NLS-1$ diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java index d7a48f3ae..0ef4862d9 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java @@ -21,7 +21,6 @@ import java.util.zip.GZIPOutputStream; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.NullProgressMonitor; import org.eclipse.team.ccvs.core.CVSProviderPlugin; -import org.eclipse.team.ccvs.core.CVSProviderPlugin; import org.eclipse.team.ccvs.core.ICVSFile; import org.eclipse.team.ccvs.core.ICVSFolder; import org.eclipse.team.ccvs.core.ICVSRepositoryLocation; @@ -559,6 +558,7 @@ public class Session { try { InputStream in = file.getContents(); try { + boolean compressed = false; byte[] buffer = new byte[TRANSFER_BUFFER_SIZE]; if (! isBinary && IS_CRLF_PLATFORM || compressionLevel != 0) { // this affects the file size, spool the converted copy to an in-memory buffer @@ -568,6 +568,7 @@ public class Session { if (compressionLevel != 0) { try { zout = new GZIPOutputStream(bout); // apparently does not support specifying compression level + compressed = true; } catch (IOException e) { throw CVSException.wrapException(e); } @@ -576,8 +577,8 @@ public class Session { } for (int count; (count = in.read(buffer)) != -1;) zout.write(buffer, 0, count); zout.close(); - byte[] contents = bout.toByteArray(); in.close(); + byte[] contents = bout.toByteArray(); in = new ByteArrayInputStream(contents); size = contents.length; } @@ -591,7 +592,7 @@ public class Session { }; // send the file String sizeLine = Long.toString(size); - if (compressionLevel != 0) sizeLine = "z" + sizeLine; + if (compressed) sizeLine = "z" + sizeLine; writeLine(sizeLine); for (int count; (count = in.read(buffer)) != -1;) out.write(buffer, 0, count); } finally { diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties index a1cece4e6..42ea269bf 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties @@ -242,5 +242,8 @@ EclipseSynchronizer.ErrorSettingIgnorePattern=Cannot set ignored pattern on {0} EclipseSynchronizer.ErrorDeletingFolderSync=Could not delete all CVS folders SyncFileChangeListener.errorSettingTeamPrivateFlag=Error setting team-private flag on resource -PollingInputStream.readTimeout=Timeout during read operation -PollingOutputStream.writeTimeout=Timeout during write operation +PollingInputStream.readTimeout=Timeout while reading from input stream +PollingInputStream.closeTimeout=Timeout while closing input stream +PollingOutputStream.writeTimeout=Timeout while writing to output stream +PollingOutputStream.closeTimeout=Timeout while closing output stream +TimeoutOutputStream.cannotWriteToStream=Cannot write to output stream diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java index 931b470c3..6e3f81e13 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java @@ -46,6 +46,31 @@ public class PollingInputStream extends FilterInputStream { this.monitor = monitor; } + /** + * Wraps the underlying stream's method. + * It may be important to wait for an input stream to be closed because it + * holds an implicit lock on a system resoure (such as a file) while it is + * open. Closing a stream may take time if the underlying stream is still + * servicing a previous request. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times + * @throws IOException if an i/o error occurs + */ + public void close() throws IOException { + int attempts = 0; + for (;;) { + try { + in.close(); + return; + } catch (InterruptedIOException e) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + if (++attempts == numAttempts) + throw new InterruptedIOException(Policy.bind("PollingInputStream.closeTimeout")); + if (DEBUG) System.out.println("close retry=" + attempts); + } + } + } + /** * Wraps the underlying stream's method. * @throws OperationCanceledException if the progress monitor is canceled diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java index c3597893b..5e6ce7ff5 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java @@ -141,10 +141,22 @@ public class PollingOutputStream extends FilterOutputStream { * @throws IOException if an i/o error occurs */ public void close() throws IOException { - try { - flush(); - } finally { - out.close(); - } + int attempts = numAttempts - 1; // fail fast if flush() does times out + try { + flush(); + attempts = 0; + } finally { + for (;;) { + try { + out.close(); + return; + } catch (InterruptedIOException e) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + if (++attempts == numAttempts) + throw new InterruptedIOException(Policy.bind("PollingOutputStream.closeTimeout")); + if (DEBUG) System.out.println("close retry=" + attempts); + } + } + } } } \ No newline at end of file diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java index 17498cc42..54489979f 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java @@ -90,9 +90,9 @@ public class SizeConstrainedInputStream extends FilterInputStream { * @throws IOException if an i/o error occurs */ public int read(byte[] buffer, int offset, int length) throws IOException { - if (length >= bytesRemaining) { + if (length > bytesRemaining) { + if (bytesRemaining == 0) return -1; length = (int) bytesRemaining; - if (length == 0) return -1; } try { int count = in.read(buffer, offset, length); diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java index 72c8eb495..744e50249 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java @@ -29,58 +29,72 @@ import java.io.InterruptedIOException; public class TimeoutInputStream extends FilterInputStream { private byte[] iobuffer; // circular buffer private int head = 0; // points to first unread byte - private int length = 0; // number of remaining unread bytes, -1 if closed - - private long readTimeout; - private long closeTimeout; - private Thread thread = new Thread(new FillBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$ + private int length = 0; // number of remaining unread bytes + private boolean eof = false; // if true, EOF encountered + private boolean closeRequested = false; // if true, close requested private IOException ioe = null; private RuntimeException re = null; + private long readTimeout; // read() timeout in millis + private long closeTimeout; // close() timeout in millis, or -1 + private Thread thread; + /** * Creates a timeout wrapper for an input stream. * @param in the underlying input stream * @param bufferSize the buffer size in bytes; should be large enough to mitigate * Thread synchronization and context switching overhead * @param readTimeout the number of milliseconds to block for a read() or skip() before - * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block + * throwing an InterruptedIOException; 0 blocks indefinitely * @param closeTimeout the number of milliseconds to block for a close() before throwing - * an InterruptedIOException; 0 blocks indefinitely, -1 does not block + * an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background */ public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) { super(in); this.iobuffer = new byte[bufferSize]; this.readTimeout = readTimeout; this.closeTimeout = closeTimeout; + thread = new Thread(new FillBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$ thread.setDaemon(true); thread.start(); } /** * Wraps the underlying stream's method. + * It may be important to wait for a stream to actually be closed because it + * holds an implicit lock on a system resoure (such as a file) while it is + * open. Closing a stream may take time if the underlying stream is still + * servicing a previous request. + * @throws InterruptedIOException if the timeout expired * @throws IOException if an i/o error occurs */ public void close() throws IOException { - if (thread == null) return; - Thread oldThread = thread; - thread = null; - oldThread.interrupt(); - if (closeTimeout != -1) { - try { - oldThread.join(closeTimeout); - } catch (InterruptedException e) { - } + Thread oldThread; + synchronized (this) { + if (thread == null) return; + oldThread = thread; + closeRequested = true; + thread.interrupt(); + checkError(); + } + if (closeTimeout == -1) return; + try { + oldThread.join(closeTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we weren't expecting to be interrupted } synchronized (this) { - if (ioe != null) throw ioe; - if (re != null) throw re; + checkError(); + if (thread != null) throw new InterruptedIOException(); } } /** * Returns the number of unread bytes in the buffer. + * @throws IOException if an i/o error occurs */ public synchronized int available() throws IOException { + if (length == 0) checkError(); return length > 0 ? length : 0; } @@ -91,12 +105,17 @@ public class TimeoutInputStream extends FilterInputStream { * @throws IOException if an i/o error occurs */ public synchronized int read() throws IOException { - syncfill(); - if (length == -1) return -1; - int b = iobuffer[head++]; + if (length == 0) checkError(); + syncFill(); + if (length == 0) { + checkError(); + if (eof) return -1; + throw new InterruptedIOException(); + } + int b = iobuffer[head++] & 255; if (head == iobuffer.length) head = 0; length--; - asyncfill(); + asyncFill(); return b; } @@ -107,9 +126,13 @@ public class TimeoutInputStream extends FilterInputStream { * @throws IOException if an i/o error occurs */ public synchronized int read(byte[] buffer, int off, int len) throws IOException { - if (len == 0) return 0; - syncfill(); - if (length == -1) return -1; + if (length == 0) checkError(); + syncFill(); + if (length == 0) { + checkError(); + if (eof) return -1; + throw new InterruptedIOException(); + } int pos = off; if (len > length) len = length; while (len-- > 0) { @@ -117,7 +140,7 @@ public class TimeoutInputStream extends FilterInputStream { if (head == iobuffer.length) head = 0; length--; } - asyncfill(); + asyncFill(); return pos - off; } @@ -128,16 +151,23 @@ public class TimeoutInputStream extends FilterInputStream { * @throws IOException if an i/o error occurs */ public synchronized long skip(long count) throws IOException { + if (length == 0) checkError(); long amount = 0; try { - while (count != 0 && length != -1) { + while (count != 0) { int skip = (count > length) ? length : (int) count; head = (head + skip) % iobuffer.length; length -= skip; amount += skip; count -= skip; - syncfill(); + syncFill(); + if (length == 0) { + checkError(); + if (eof) break; + throw new InterruptedIOException(); + } } + asyncFill(); return amount; } catch (InterruptedIOException e) { e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT @@ -152,102 +182,129 @@ public class TimeoutInputStream extends FilterInputStream { return false; } - private void syncfill() throws IOException { - if (length == 0) { - asyncfill(); - if (readTimeout != -1) { - try { - wait(readTimeout); - } catch (InterruptedException e) { - } - } - if (length == 0) { - throw new InterruptedIOException(); - } + /* + * Waits for the buffer to fill if it is empty and the stream has not reached EOF. + * The buffer might still be empty when this method returns if the operation timed out + * or EOF was encountered. + */ + private void syncFill() throws IOException { + if (length != 0 || eof) return; + notify(); + try { + wait(readTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we weren't expecting to be interrupted } } - private void asyncfill() throws IOException { - try { - if (ioe != null) { - IOException e = ioe; - ioe = null; - throw e; - } - if (re != null) { - RuntimeException e = re; - re = null; - throw e; - } - } finally { - if (length != -1 && length != iobuffer.length) { - notify(); - } + /* + * Notifies the background thread that some bytes were read so that it can fill the buffer + * asynchronously in the background. + */ + private void asyncFill() { + if ((length != iobuffer.length && ! eof) || closeRequested) notify(); + } + + /* + * Checks if exceptions are pending and throws the next one, if any. + */ + private void checkError() throws IOException { + if (ioe != null) { + IOException e = ioe; + ioe = null; + throw e; + } + if (re != null) { + RuntimeException e = re; + re = null; + throw e; } } private class FillBufferRunnable implements Runnable { + private final Object lock = TimeoutInputStream.this; + public void run() { - final Object lock = TimeoutInputStream.this; try { - boolean eof = false; - for (;;) { - int off, len; + readUntilDone(); + waitUntilClosed(); + } catch (IOException e) { + synchronized (lock) { ioe = e; } + waitUntilClosed(); + } catch (RuntimeException e) { + synchronized (lock) { re = e; } + waitUntilClosed(); + } finally { + /*** Closes the stream and sets thread to null when done ***/ + try { + in.close(); + } catch (IOException e) { + synchronized (lock) { ioe = e; } + } catch (RuntimeException e) { + synchronized (lock) { re = e; } + } finally { synchronized (lock) { + eof = true; + thread = null; + lock.notify(); + } + } + } + } + + /** + * Reads bytes into the buffer until EOF, closed, or error. + */ + private void readUntilDone() throws IOException { + boolean pause = false; + for (;;) { + int off, len; + synchronized (lock) { + for (;;) { + if (closeRequested || eof) return; // quit signal + if (length != iobuffer.length && ! pause) break; + pause = false; try { - while (thread != null && (length == iobuffer.length || eof || ioe != null || re != null)) { - lock.wait(); - } - if (thread == null) return; // quit signal + lock.wait(); } catch (InterruptedException e) { - return; // alternative quit signal - } - off = (head + length) % iobuffer.length; - len = ((head > off) ? head : iobuffer.length) - off; - } - try { - // the i/o operation might block without releasing the lock, - // so we do this outside of the synchronized block - int count = in.read(iobuffer, off, len); - if (count == -1) eof = true; - synchronized (lock) { - if (eof) { - if (length == 0) length = -1; - } else { - length += count; - } - if (count != 0) lock.notify(); - } - } catch (InterruptedIOException e) { - int count = e.bytesTransferred; // keep partial transfer - e.bytesTransferred = 0; // not relevant if rethrown - synchronized (lock) { - if (length != -1) length += count; - ioe = e; - lock.notify(); - } - } catch (IOException e) { - synchronized (lock) { - ioe = e; - lock.notify(); - } - } catch (RuntimeException e) { - synchronized (lock) { - re = e; - lock.notify(); + closeRequested = true; // alternate quit signal } } + off = (head + length) % iobuffer.length; + len = ((head > off) ? head : iobuffer.length) - off; } - } finally { + int count; try { - in.close(); - } catch (IOException e) { - synchronized (lock) { - ioe = e; - } - } catch (RuntimeException e) { - synchronized (lock) { - re = e; + // the i/o operation might block without releasing the lock, + // so we do this outside of the synchronized block + count = in.read(iobuffer, off, len); + } catch (InterruptedIOException e) { + // keep partial transfer + count = e.bytesTransferred; + e.bytesTransferred = 0; + synchronized (lock) { ioe = e; } + } + synchronized (lock) { + if (count == -1) return; + if (count == 0) pause = true; + length += count; + lock.notify(); + } + } + } + + /** + * Waits until we have been requested to close the stream. + */ + private void waitUntilClosed() { + synchronized (lock) { + eof = true; + lock.notify(); + while (! closeRequested) { + try { + lock.wait(); + } catch (InterruptedException e) { + closeRequested = true; // alternate quit signal } } } diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java index 201b804ba..11b085f5f 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java @@ -15,6 +15,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import org.eclipse.team.internal.ccvs.core.Policy; + /** * Wraps an output stream that blocks indefinitely to simulate timeouts on write(), * flush(), and close(). The resulting output stream is buffered and supports @@ -30,57 +32,65 @@ public class TimeoutOutputStream extends FilterOutputStream { private byte[] iobuffer; // circular buffer private int head = 0; // points to first unwritten byte private int length = 0; // number of remaining unwritten bytes - - private long writeTimeout; - private long closeTimeout; - private boolean flushRequested = false; - private Thread thread = new Thread(new CommitBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$ + private boolean cannotWrite = false; // if true, writes will not be honoured + private boolean closeRequested = false; // if true, close requested + private boolean flushRequested = false; // if true, flush requested private IOException ioe = null; private RuntimeException re = null; + private long writeTimeout; // write() timeout in millis + private long closeTimeout; // close() timeout in millis, or -1 + private Thread thread; + /** * Creates a timeout wrapper for an output stream. * @param out the underlying input stream * @param bufferSize the buffer size in bytes; should be large enough to mitigate * Thread synchronization and context switching overhead * @param writeTimeout the number of milliseconds to block for a write() or flush() before - * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block + * throwing an InterruptedIOException; 0 blocks indefinitely * @param closeTimeout the number of milliseconds to block for a close() before throwing - * an InterruptedIOException; 0 blocks indefinitely, -1 does not block + * an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background */ public TimeoutOutputStream(OutputStream out, int bufferSize, long writeTimeout, long closeTimeout) { super(out); this.iobuffer = new byte[bufferSize]; this.writeTimeout = writeTimeout; this.closeTimeout = closeTimeout; + thread = new Thread(new CommitBufferRunnable(), "TimeoutOutputStream");//$NON-NLS-1$ thread.setDaemon(true); thread.start(); } /** * Wraps the underlying stream's method. + * It may be important to wait for a stream to actually be closed because it + * holds an implicit lock on a system resoure (such as a file) while it is + * open. Closing a stream may take time if the underlying stream is still + * servicing a previous request. * @throws InterruptedIOException if the timeout expired, bytesTransferred will * reflect the number of bytes flushed from the buffer * @throws IOException if an i/o error occurs */ public void close() throws IOException { - if (thread == null) return; + Thread oldThread; + synchronized (this) { + if (thread == null) return; + oldThread = thread; + closeRequested = true; + flushRequested = true; + thread.interrupt(); + checkError(); + } + if (closeTimeout == -1) return; try { - flush(); - } finally { - Thread oldThread = thread; - thread = null; - oldThread.interrupt(); - if (closeTimeout != -1) { - try { - oldThread.join(closeTimeout); - } catch (InterruptedException e) { - } - } - synchronized (this) { - if (ioe != null) throw ioe; - if (re != null) throw re; - } + oldThread.join(closeTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we weren't expecting to be interrupted + } + synchronized (this) { + checkError(); + if (thread != null) throw new InterruptedIOException(); } } @@ -91,10 +101,15 @@ public class TimeoutOutputStream extends FilterOutputStream { * @throws IOException if an i/o error occurs */ public synchronized void write(int b) throws IOException { - if (length == iobuffer.length) synccommit(); + if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream")); + checkError(); + if (length == iobuffer.length) { + syncCommit(); + if (length == iobuffer.length) throw new InterruptedIOException(); + } iobuffer[(head + length) % iobuffer.length] = (byte) b; length++; - asynccommit(); + asyncCommit(); } /** @@ -104,19 +119,24 @@ public class TimeoutOutputStream extends FilterOutputStream { * @throws IOException if an i/o error occurs */ public synchronized void write(byte[] buffer, int off, int len) throws IOException { + if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream")); + checkError(); int amount = 0; try { while (len-- > 0) { - if (length == iobuffer.length) synccommit(); + if (length == iobuffer.length) { + syncCommit(); + if (length == iobuffer.length) throw new InterruptedIOException(); + } iobuffer[(head + length) % iobuffer.length] = buffer[off++]; length++; amount++; } - asynccommit(); } catch (InterruptedIOException e) { e.bytesTransferred = amount; throw e; } + asyncCommit(); } /** @@ -126,116 +146,165 @@ public class TimeoutOutputStream extends FilterOutputStream { * @throws IOException if an i/o error occurs */ public synchronized void flush() throws IOException { - int oldLength = length; + if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream")); + checkError(); flushRequested = true; - InterruptedIOException iioe = null; + int amount = 0; try { - synccommit(); - if (length == 0) return; - iioe = new InterruptedIOException(); + while (flushRequested && length != 0) { + int oldLength = length; + syncCommit(); + amount += oldLength - length; + if (length == oldLength) throw new InterruptedIOException(); + } } catch (InterruptedIOException e) { - iioe = e; + e.bytesTransferred = amount; + throw e; } - iioe.bytesTransferred = oldLength - length; - throw iioe; + asyncCommit(); } - private void synccommit() throws IOException { - asynccommit(); - if (length != 0) { - int oldLength = length; - if (writeTimeout != -1) { - try { - wait(writeTimeout); - } catch (InterruptedException e) { - } - } - if (length == oldLength) { - throw new InterruptedIOException(); - } + /* + * Waits for the buffer to drain. + * The buffer might still be at the same level when this method returns if the operation timed out. + */ + private void syncCommit() throws IOException { + notify(); + try { + wait(writeTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we weren't expecting to be interrupted } + checkError(); + if (cannotWrite) throw new IOException(Policy.bind("TimeoutOutputStream.cannotWriteToStream")); } - private void asynccommit() throws IOException { - try { - if (ioe != null) { - IOException e = ioe; - ioe = null; - throw e; - } - if (re != null) { - RuntimeException e = re; - re = null; - throw e; - } - } finally { - if (length != 0 || flushRequested) notify(); + /* + * Notifies the background thread that some bytes were written so that it can drain the buffer + * asynchronously in the background. + */ + private void asyncCommit() throws IOException { + if (length != 0 || flushRequested || closeRequested) notify(); + } + + /* + * Checks if exceptions are pending and throws the next one, if any. + */ + private void checkError() throws IOException { + if (ioe != null) { + IOException e = ioe; + ioe = null; + throw e; + } + if (re != null) { + RuntimeException e = re; + re = null; + throw e; } } private class CommitBufferRunnable implements Runnable { + private final Object lock = TimeoutOutputStream.this; + public void run() { - final Object lock = TimeoutOutputStream.this; - boolean running = true; try { - for (;;) { - int off, len; + writeUntilDone(); + waitUntilClosed(); + } catch (IOException e) { + synchronized (lock) { ioe = e; } + waitUntilClosed(); + } catch (RuntimeException e) { + synchronized (lock) { re = e; } + waitUntilClosed(); + } finally { + /*** Closes the stream and sets thread to null when done ***/ + try { + out.close(); + } catch (IOException e) { + synchronized (lock) { ioe = e; } + } catch (RuntimeException e) { + synchronized (lock) { re = e; } + } finally { synchronized (lock) { + cannotWrite = true; + thread = null; + lock.notify(); + } + } + } + } + + /** + * Writes bytes from the buffer until closed and buffer is empty + */ + private void writeUntilDone() throws IOException { + boolean pause = false; + boolean mustFlush = false; + for (;;) { + int off, len; + synchronized (lock) { + for (;;) { + if (closeRequested && length == 0) return; // quit signal + if ((mustFlush || flushRequested || length != 0) && ! pause) break; + pause = false; try { - while (running && thread != null && ((length == 0 && ! flushRequested) || ioe != null || re != null)) { - lock.wait(); - } - if (thread == null) running = false; // quit signal - if (! running && length == 0) return; + lock.wait(); } catch (InterruptedException e) { - running = false; // alternative quit signal + closeRequested = true; // alternate quit signal } - off = head; - len = iobuffer.length - head; - if (len > length) len = length; } + off = head; + len = iobuffer.length - head; + if (len > length) len = length; + if (flushRequested) { + mustFlush = true; + flushRequested = false; + } + } + if (len != 0) { try { // the i/o operation might block without releasing the lock, // so we do this outside of the synchronized block - if (len != 0) out.write(iobuffer, off, len); - if (flushRequested) out.flush(); - synchronized (lock) { - flushRequested = false; - head = (head + len) % iobuffer.length; - length -= len; - lock.notify(); - } + out.write(iobuffer, off, len); } catch (InterruptedIOException e) { - len = e.bytesTransferred; // keep partial transfer - e.bytesTransferred = 0; // not relevant if rethrown - synchronized (lock) { - head = (head + len) % iobuffer.length; - length -= len; - ioe = e; - lock.notify(); - } - } catch (IOException e) { - synchronized (lock) { - ioe = e; - lock.notify(); - } - } catch (RuntimeException e) { - synchronized (lock) { - re = e; - lock.notify(); - } + len = e.bytesTransferred; + e.bytesTransferred = 0; + if (len == 0) pause = true; + synchronized (lock) { ioe = e; } } - } - } finally { - try { - out.close(); - } catch (IOException e) { synchronized (lock) { - ioe = e; - } - } catch (RuntimeException e) { + head = (head + len) % iobuffer.length; + length -= len; + lock.notify(); + } + } else { + try { + out.flush(); + mustFlush = false; + } catch (InterruptedIOException e) { + if (e.bytesTransferred == 0) pause = true; + e.bytesTransferred = 0; + synchronized (lock) { ioe = e; } + } synchronized (lock) { - re = e; + lock.notify(); + } + } + } + } + + /** + * Waits until we have been requested to close the stream. + */ + private void waitUntilClosed() { + synchronized (lock) { + cannotWrite = true; + lock.notify(); + while (! closeRequested) { + try { + lock.wait(); + } catch (InterruptedException e) { + closeRequested = true; // alternate quit signal } } } -- cgit v1.2.3