diff options
author | Jean Michel-Lemieux | 2002-04-08 19:11:31 +0000 |
---|---|---|
committer | Jean Michel-Lemieux | 2002-04-08 19:11:31 +0000 |
commit | 8b4faf973400073c07a82c84027e540de80b4da5 (patch) | |
tree | aadefdf288175dd12fe6470e2ec660b8a6e3701f | |
parent | 930aa2343da4aa6716d80325283c7c8e15665065 (diff) | |
download | eclipse.platform.team-8b4faf973400073c07a82c84027e540de80b4da5.tar.gz eclipse.platform.team-8b4faf973400073c07a82c84027e540de80b4da5.tar.xz eclipse.platform.team-8b4faf973400073c07a82c84027e540de80b4da5.zip |
- improved handling of '.project' in EclipseFile.setContents() to avoid
accidental closure of input stream
- replaced ad-hoc CR/LF conversion code with stream filters
- replaced ad-hoc file transfer progress monitor code with stream filter
- added low-level progress monitor cancellation support
- added timeouts on socket writing and on pipe reading and writing
- removed redundant buffer in Connection
- fixed possible i/o stream leak in Ext connection method
- fixed i/o stream leak in PServer connection method
- fixed i/o stream and socket leak in ExtSSH connection method
after failed authentication or cancellation
- fixed an error reporting condition in ExtSSH connection method
17 files changed, 1563 insertions, 464 deletions
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java index 058c6b3ee..9348e433a 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java @@ -52,6 +52,8 @@ public class CVSProviderPlugin extends Plugin { public static final boolean DEFAULT_FETCH = true; // communication timeout with the server public static final int DEFAULT_TIMEOUT = 60; + // file transfer compression level (0 - 9) + public static final int DEFAULT_COMPRESSION_LEVEL = 0; // cvs plugin extension points and ids public static final String ID = "org.eclipse.team.cvs.core"; //$NON-NLS-1$ @@ -59,6 +61,7 @@ public class CVSProviderPlugin extends Plugin { public static final String PT_CONNECTIONMETHODS = "connectionmethods"; //$NON-NLS-1$ private QuietOption quietness; + private int compressionLevel = DEFAULT_COMPRESSION_LEVEL; private int communicationsTimeout = DEFAULT_TIMEOUT; private boolean pruneEmptyDirectories = DEFAULT_PRUNE; private boolean fetchAbsentDirectories = DEFAULT_FETCH; @@ -130,6 +133,21 @@ public class CVSProviderPlugin extends Plugin { public static String getTypeId() { return NATURE_ID; } + + /** + * Sets the file transfer compression level. (if supported) + * Valid levels are: 0 (disabled), 1 (worst/fastest) - 9 (best/slowest) + */ + public void setCompressionLevel(int level) { + compressionLevel = level; + } + + /** + * Gets the file transfer compression level. + */ + public int getCompressionLevel() { + return compressionLevel; + } /** * Should the CVS adapter prune empty directories diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java index 5b0b6ed5a..0a161173c 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java @@ -63,6 +63,8 @@ import org.eclipse.team.internal.ccvs.core.connection.CVSServerException; import org.eclipse.team.internal.ccvs.core.resources.CVSRemoteSyncElement; import org.eclipse.team.internal.ccvs.core.resources.CVSWorkspaceRoot; import org.eclipse.team.internal.ccvs.core.resources.EclipseSynchronizer; +import org.eclipse.team.internal.ccvs.core.streams.CRLFtoLFInputStream; +import org.eclipse.team.internal.ccvs.core.streams.LFtoCRLFInputStream; import org.eclipse.team.internal.ccvs.core.syncinfo.FolderSyncInfo; import org.eclipse.team.internal.ccvs.core.syncinfo.MutableResourceSyncInfo; import org.eclipse.team.internal.ccvs.core.syncinfo.ResourceSyncInfo; @@ -99,11 +101,8 @@ import org.eclipse.team.internal.ccvs.core.util.PrepareForReplaceVisitor; * have them appear in Eclipse. This may be changed in the future. */ public class CVSTeamProvider extends RepositoryProvider { - private static final int CR_BYTE = 0x0D; - private static final int LF_BYTE = 0x0A; private static final boolean IS_CRLF_PLATFORM = Arrays.equals( - System.getProperty("line.separator").getBytes(), //$NON-NLS-1$ - new byte[] { CR_BYTE, LF_BYTE }); + System.getProperty("line.separator").getBytes(), new byte[] { '\r', '\n' }); //$NON-NLS-1$ private CVSWorkspaceRoot workspaceRoot; private IProject project; @@ -1055,47 +1054,19 @@ public class CVSTeamProvider extends RepositoryProvider { throws CVSException { try { // convert delimiters in memory - boolean changed = false; - InputStream is = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); + InputStream is = new BufferedInputStream(file.getContents()); try { - is = new BufferedInputStream(file.getContents()); - boolean seenCR = false; - int c; - while ((c = is.read()) != -1) { - if (c == LF_BYTE) { - if (useCRLF) { - bos.write(CR_BYTE); - if (! seenCR) changed = true; // added CR - } else { - if (seenCR) changed = true; // stripped CR - } - bos.write(LF_BYTE); - seenCR = false; - } else { - if (seenCR) { - bos.write(CR_BYTE); // preserve orphaned carriage returns - seenCR = false; - } - if (c == CR_BYTE) { - seenCR = true; - } else { - bos.write(c); - } - } - } - if (seenCR) { - bos.write(CR_BYTE); // preserve orphaned carriage returns - } - } finally { - if (is != null) is.close(); + is = new CRLFtoLFInputStream(is); + if (useCRLF) is = new LFtoCRLFInputStream(is); + for (int b; (b = is.read()) != -1;) bos.write(b); bos.close(); + } finally { + is.close(); } - if (changed) { - // write file back to disk with corrected delimiters if changes were made - ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); - file.setContents(bis, false /*force*/, true /*keepHistory*/, progress); - } + // write file back to disk with corrected delimiters if changes were made + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + file.setContents(bis, false /*force*/, false /*keepHistory*/, progress); } catch (CoreException e) { throw CVSException.wrapException(file, Policy.bind("CVSTeamProvider.cleanLineDelimitersException"), e); //$NON-NLS-1$ } catch (IOException e) { diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java deleted file mode 100644 index c7aa32cc9..000000000 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java +++ /dev/null @@ -1,188 +0,0 @@ -package org.eclipse.team.internal.ccvs.core.client; - -/* - * (c) Copyright IBM Corp. 2000, 2002. - * All Rights Reserved. - */ - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.team.internal.ccvs.core.CVSException; -import org.eclipse.team.internal.ccvs.core.Policy; - -/** - * This class can be used to transfer a file from the CVS server to a local IFile - */ -public class FileInputStreamWrapper { - - // default file transfer buffer size (in bytes) - private static int TRANSFER_BUFFER_SIZE = 8192; - // update progress bar in increments of this size (in bytes) - // no incremental progress shown for files smaller than this size - private static int TRANSFER_PROGRESS_INCREMENT = 32768; - - // the platform's line termination sequence - private static final byte[] PLATFORM_NEWLINE_BYTES = - System.getProperty("line.separator").getBytes(); //$NON-NLS-1$ // at least one byte long - // the server's line termination sequence - private static final int SERVER_NEWLINE_BYTE = 0x0a; // exactly one byte long - private static final byte[] SERVER_NEWLINE_BYTES = new byte[] { SERVER_NEWLINE_BYTE }; - // true iff newlines must be converted between platform and server formats - private static boolean DONT_CONVERT_NEWLINES = PLATFORM_NEWLINE_BYTES.length == 1 - && PLATFORM_NEWLINE_BYTES[0] == SERVER_NEWLINE_BYTE; - - // VCM 1.0 comitted files using CR/LF as a delimiter - private static final int CARRIAGE_RETURN_BYTE = 0x0d; - - private InputStream input; - private long fileSize; - private int totalRead; - private boolean isBinary; - private IProgressMonitor monitor; - private byte[] buffer; - private int nextProgressThresh; - - private static final byte[] BUFFER = new byte[TRANSFER_BUFFER_SIZE / 2]; - private static final byte[] EXPANSION_BUFFER = new byte[TRANSFER_BUFFER_SIZE]; - - private int position; - private int bufferLength; - private String title; - - public FileInputStreamWrapper(InputStream input, long fileSize, boolean isBinary, String title, IProgressMonitor monitor) { - this.input = input; - this.fileSize = fileSize; - this.totalRead = 0; - this.isBinary = isBinary; - this.monitor = monitor; - this.buffer = BUFFER; - this.nextProgressThresh = TRANSFER_PROGRESS_INCREMENT; - this.title = title; - } - - public class InputStreamFromServer extends InputStream { - public int read() throws IOException { - if (position >= bufferLength) { - if (fill() == -1) - return -1; - } - return buffer[position++]; - } - public int read(byte[] bytes) throws IOException { - return read(bytes, 0, bytes.length); - } - public int read(byte[] bytes, int offset, int length) throws IOException { - if (position >= bufferLength) { - if (fill() == -1) - return -1; - } - length = Math.min(bufferLength - position, length); - System.arraycopy(buffer, position, bytes, offset, length); - position += length; - return length; - } - } - - /** - * Return a stream that can be passed to IFile#setContent() - * After the call to setContent, the receiver's input stream will be at the byte - * after the received file. - */ - public InputStream getInputStream() { - return new InputStreamFromServer(); - } - - /* - * Transfers a file to or from the remove CVS server, possibly expanding line delimiters. - * <p> - * Line termination sequences are only converted upon request by specifying an - * array containing the expected sequence of bytes representing an outbound newline, - * and a single byte representing an inbound newline. If null is passed for the - * former, the file is assumed to have binary contents, hence no translation is - * performed. - * </p><p> - * Translation is performed on-the-fly, so the file need not fit in available memory. - * </p> - * @param in the input stream - * @param out the output stream - * @param size the source file size - * @param newlineIn the single byte for a received newline, ignored if binary - * @param newlineOut the sequence of bytes for sent newline, or null if binary - * @param monitor the progress monitor - * @param title the name of the file being received (as shown in the monitor) - */ - private int fill() throws IOException { - - // Check if we've read the entire file - if (totalRead == fileSize) { - return -1; - } else if (position < bufferLength) { - return bufferLength - position; - } - - position = 0; - - // If we're not converting, use the big buffer to read - if (isBinary || DONT_CONVERT_NEWLINES) { - buffer = EXPANSION_BUFFER; - } else { - buffer = BUFFER; - } - - bufferLength = input.read(buffer, 0, (int) Math.min(buffer.length, fileSize - totalRead)); - if (bufferLength == -1) { - // Unexpected end of stream - throw new IOException(Policy.bind("Session.readError")); //$NON-NLS-1$ - } - totalRead += bufferLength; - - if (isBinary || DONT_CONVERT_NEWLINES) { - return bufferLength; - } - - bufferLength = convertNewLines(BUFFER, EXPANSION_BUFFER, bufferLength); - buffer = EXPANSION_BUFFER; - - // update progress monitor - if (totalRead > nextProgressThresh) { - monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$ - new Object[] { title, new Long(totalRead / 1024), new Long(fileSize / 1024)})); - nextProgressThresh = totalRead + TRANSFER_PROGRESS_INCREMENT; - } - - return bufferLength; - } - - /* - * Copy the bytes from the source to the target, converting any LF to the platform newline byte. - * - * There is special handling that will skip incoming CRs that precede LF. - */ - private int convertNewLines(byte[] source, byte[] target, int length) { - boolean seenCR = false; - int targetPosition = 0; - for (int sourcePosition = 0; sourcePosition < length; ++sourcePosition) { - final byte b = source[sourcePosition]; - if (b == CARRIAGE_RETURN_BYTE) { - // We keep track of CRs to perform autocorrection for improperly stored text files - seenCR = true; - } else { - if (b == SERVER_NEWLINE_BYTE) { - // if fixCRLF we ignore previous CR (if there was one) - // replace newlineIn with newlineOut - for (int x = 0; x < PLATFORM_NEWLINE_BYTES.length; ++x) target[targetPosition++] = PLATFORM_NEWLINE_BYTES[x]; - } else { - if (seenCR) target[targetPosition++] = CARRIAGE_RETURN_BYTE; // preserve stray CR's - target[targetPosition++] = b; - } - seenCR = false; - } - } - if (seenCR) target[targetPosition++] = CARRIAGE_RETURN_BYTE; - - return targetPosition; - } -} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java index bc68b52d6..7eaf56722 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java @@ -6,15 +6,21 @@ package org.eclipse.team.internal.ccvs.core.client; */ import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.NullProgressMonitor; +import org.eclipse.team.ccvs.core.CVSProviderPlugin; import org.eclipse.team.ccvs.core.CVSProviderPlugin; import org.eclipse.team.ccvs.core.ICVSFile; import org.eclipse.team.ccvs.core.ICVSFolder; @@ -24,8 +30,11 @@ import org.eclipse.team.internal.ccvs.core.CVSException; import org.eclipse.team.internal.ccvs.core.Policy; import org.eclipse.team.internal.ccvs.core.connection.CVSRepositoryLocation; import org.eclipse.team.internal.ccvs.core.connection.Connection; +import org.eclipse.team.internal.ccvs.core.streams.CRLFtoLFInputStream; +import org.eclipse.team.internal.ccvs.core.streams.LFtoCRLFInputStream; +import org.eclipse.team.internal.ccvs.core.streams.ProgressMonitorInputStream; +import org.eclipse.team.internal.ccvs.core.streams.SizeConstrainedInputStream; import org.eclipse.team.internal.ccvs.core.syncinfo.ResourceSyncInfo; -import org.eclipse.team.internal.ccvs.core.util.Assert; import org.eclipse.team.internal.ccvs.core.util.Util; /** @@ -57,20 +66,10 @@ public class Session { private static int TRANSFER_BUFFER_SIZE = 8192; // update progress bar in increments of this size (in bytes) // no incremental progress shown for files smaller than this size - private static long TRANSFER_PROGRESS_INCREMENT = 32768; - - // the platform's line termination sequence - private static final byte[] PLATFORM_NEWLINE_BYTES = - System.getProperty("line.separator").getBytes(); //$NON-NLS-1$ // at least one byte long - // the server's line termination sequence - private static final int SERVER_NEWLINE_BYTE = 0x0a; // exactly one byte long - private static final byte[] SERVER_NEWLINE_BYTES = new byte[] { SERVER_NEWLINE_BYTE }; - // true iff newlines must be converted between platform and server formats - private static boolean MUST_CONVERT_NEWLINES = PLATFORM_NEWLINE_BYTES.length != 1 - && PLATFORM_NEWLINE_BYTES[0] != SERVER_NEWLINE_BYTE; - - // VCM 1.0 comitted files using CR/LF as a delimiter - private static final int CARRIAGE_RETURN_BYTE = 0x0d; + private static int TRANSFER_PROGRESS_INCREMENT = 32768; + + private static final boolean IS_CRLF_PLATFORM = Arrays.equals( + System.getProperty("line.separator").getBytes(), new byte[] { '\r', '\n' }); //$NON-NLS-1$ private CVSRepositoryLocation location; private ICVSFolder localRoot; @@ -80,13 +79,11 @@ public class Session { private Date modTime = null; private boolean noLocalChanges = false; private boolean createBackups = true; + private int compressionLevel = 0; private List expansions; private Collection /* of ICVSFile */ textTransferOverrideSet = null; private boolean hasBeenConnected = false; - // a shared buffer used for file transfers - private byte[] transferBuffer = null; - // The resource bundle key that provides the file sending message private String sendFileTitleKey; @@ -159,9 +156,23 @@ public class Session { // ask for the set of valid requests Request.VALID_REQUESTS.execute(this, Policy.subMonitorFor(monitor, 50)); - + // set the root directory on the server for this connection connection.writeLine("Root " + getRepositoryRoot()); //$NON-NLS-1$ + + // enable compression + compressionLevel = CVSProviderPlugin.getPlugin().getCompressionLevel(); + if (compressionLevel != 0 && isValidRequest("gzip-file-contents")) { + // Enable the use of CVS 1.8 per-file compression mechanism. + // The newer Gzip-stream request seems to be problematic due to Java's + // GZIPInputStream tendency to block on read() rather than to return a + // partially filled buffer. The latter option would be better since it + // can make more effective use of the code dictionary, if it can be made + // to work... + connection.writeLine("gzip-file-contents " + Integer.toString(compressionLevel)); + } else { + compressionLevel = 0; + } } catch (CVSException e) { // If there is a failure opening, make sure we're closed if (connection != null) { @@ -518,14 +529,6 @@ public class Session { } /** - * Gets the shared file transfer buffer. - */ - private byte[] getTransferBuffer() { - if (transferBuffer == null) transferBuffer = new byte[TRANSFER_BUFFER_SIZE]; - return transferBuffer; - } - - /** * Sends a file to the remote CVS server, possibly translating line delimiters. * <p> * Line termination sequences are automatically converted to linefeeds only @@ -538,87 +541,60 @@ public class Session { * @param isBinary is true if the file should be sent without translation * @param monitor the progress monitor */ - public void sendFile(ICVSFile file, boolean isBinary, IProgressMonitor monitor) - throws CVSException { + public void sendFile(ICVSFile file, boolean isBinary, IProgressMonitor monitor) throws CVSException { // check overrides if (textTransferOverrideSet != null && textTransferOverrideSet.contains(file)) isBinary = false; // update progress monitor - String title = Policy.bind(getSendFileTitleKey(), new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$ + final String title = Policy.bind(getSendFileTitleKey(), new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$ monitor.subTask(Policy.bind("Session.transferNoSize", title)); //$NON-NLS-1$ // obtain an input stream for the file and its size long size = file.getSize(); OutputStream out = connection.getOutputStream(); - InputStream in = file.getContents(); try { - if (isBinary || PLATFORM_NEWLINE_BYTES.length == 1) { - writeLine(Long.toString(size)); - if (! isBinary && MUST_CONVERT_NEWLINES) { - /*** convert newlines on-the-fly ***/ - transferWithProgress(in, out, size, - PLATFORM_NEWLINE_BYTES[0], SERVER_NEWLINE_BYTES, monitor, title); - } else { - /*** perform no conversion ***/ - transferWithProgress(in, out, size, 0, null, monitor, title); - } - } else { - // implies file is text, and we must convert newlines since size of platform newline - // sequence is not 1, but the server's is - /*** convert newlines in memory, since file size may change ***/ - Assert.isTrue(size < Integer.MAX_VALUE); - int fsize = (int) size; - byte[] fileContents; - if (fsize <= TRANSFER_BUFFER_SIZE) fileContents = getTransferBuffer(); - else fileContents = new byte[fsize]; - // translate the file from non-LF delimiters in memory and - // compute its reduced size - try { - // read exactly _size_ bytes - try { - for (int pos = 0, read; pos < fsize; pos += read) { - Policy.checkCanceled(monitor); - read = in.read(fileContents, pos, fsize - pos); - if (read == -1) { - // file ended prematurely - throw new IOException(Policy.bind("Session.readError"));//$NON-NLS-1$ - } + InputStream in = file.getContents(); + try { + byte[] buffer = new byte[TRANSFER_BUFFER_SIZE]; + if (! isBinary && IS_CRLF_PLATFORM || compressionLevel != 0) { + // this affects the file size, spool the converted copy to an in-memory buffer + if (! isBinary && IS_CRLF_PLATFORM) in = new CRLFtoLFInputStream(in); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + OutputStream zout; + if (compressionLevel != 0) { + try { + zout = new GZIPOutputStream(bout); // apparently does not support specifying compression level + } catch (IOException e) { + throw CVSException.wrapException(e); } - } finally { - in.close(); // remember to close the source file - in = null; - } - } catch (IOException e) { - throw CVSException.wrapException(e); - } - // convert platform line termination sequences - // conservative since it leaves any partial sequences alone (like stray CR's) - // assumes no prefix of a sequence - int cur = 0, match = 0; - for (int pos = 0; pos < fsize; ++pos) { - byte b = fileContents[pos]; - if (PLATFORM_NEWLINE_BYTES[match] == b) { - if (match == PLATFORM_NEWLINE_BYTES.length - 1) { - b = SERVER_NEWLINE_BYTE; - cur -= match; - match = 0; - } else match += 1; } else { - match = 0; + zout = bout; } - fileContents[cur++] = b; + for (int count; (count = in.read(buffer)) != -1;) zout.write(buffer, 0, count); + zout.close(); + byte[] contents = bout.toByteArray(); + in.close(); + in = new ByteArrayInputStream(contents); + size = contents.length; } - // send file - writeLine(Integer.toString(cur)); - in = new ByteArrayInputStream(fileContents, 0, cur); - transferWithProgress(in, out, cur, 0, null, monitor, title); - } - } finally { - try { - if (in != null) in.close(); - } catch (IOException e) { - throw CVSException.wrapException(e); + // setup progress monitoring + in = new ProgressMonitorInputStream(in, size, TRANSFER_PROGRESS_INCREMENT, monitor) { + protected void updateMonitor(long bytesRead, long bytesTotal, IProgressMonitor monitor) { + if (bytesRead == 0) return; + monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$ + new Object[] { title, Long.toString(bytesRead >> 10), Long.toString(bytesTotal >> 10) })); + } + }; + // send the file + String sizeLine = Long.toString(size); + if (compressionLevel != 0) sizeLine = "z" + sizeLine; + writeLine(sizeLine); + for (int count; (count = in.read(buffer)) != -1;) out.write(buffer, 0, count); + } finally { + in.close(); } + } catch (IOException e) { + throw CVSException.wrapException(e); } } @@ -644,108 +620,44 @@ public class Session { textTransferOverrideSet.contains(file)) isBinary = false; // update progress monitor - String title = Policy.bind("Session.receiving", new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$ + final String title = Policy.bind("Session.receiving", new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$ monitor.subTask(Policy.bind("Session.transferNoSize", title)); //$NON-NLS-1$ // get the file size from the server long size; + boolean compressed = false; try { - size = Long.parseLong(readLine(), 10); + String sizeLine = readLine(); + if (sizeLine.charAt(0) == 'z') { + compressed = true; + sizeLine = sizeLine.substring(1); + } + size = Long.parseLong(sizeLine, 10); } catch (NumberFormatException e) { throw new CVSException(Policy.bind("Session.badInt"), e); //$NON-NLS-1$ } - // Set the contents of the file using the stream wrapper - FileInputStreamWrapper wrapper = new FileInputStreamWrapper(connection.getInputStream(), size, isBinary, title, monitor); - file.setContents(wrapper.getInputStream(), responseType, true, monitor); - } - - /** - * Transfers a file to or from the remove CVS server, possibly expanding line delimiters. - * <p> - * Line termination sequences are only converted upon request by specifying an - * array containing the expected sequence of bytes representing an outbound newline, - * and a single byte representing an inbound newline. If null is passed for the - * former, the file is assumed to have binary contents, hence no translation is - * performed. - * </p><p> - * Translation is performed on-the-fly, so the file need not fit in available memory. - * </p> - * @param in the input stream - * @param out the output stream - * @param size the source file size - * @param newlineIn the single byte for a received newline, ignored if binary - * @param newlineOut the sequence of bytes for sent newline, or null if binary - * @param monitor the progress monitor - * @param title the name of the file being received (as shown in the monitor) - */ - private void transferWithProgress(InputStream in, OutputStream out, - long size, int newlineIn, byte[] newlineOut, IProgressMonitor monitor, String title) - throws CVSException { - long nextProgressThresh = TRANSFER_PROGRESS_INCREMENT; - Long ksize = new Long(size / 1024); - try { - byte[] buffer = getTransferBuffer(); - final int wfirst, wlast; - if (newlineOut != null) { - wfirst = buffer.length / 2; - wlast = buffer.length - newlineOut.length - 1; // reserve space for newline & stray CR - } else { - wfirst = buffer.length; - wlast = wfirst; + // create an input stream that spans the next 'size' bytes from the connection + InputStream in = new SizeConstrainedInputStream(connection.getInputStream(), size, true /*discardOnClose*/); + // setup progress monitoring + in = new ProgressMonitorInputStream(in, size, TRANSFER_PROGRESS_INCREMENT, monitor) { + protected void updateMonitor(long bytesRead, long bytesTotal, IProgressMonitor monitor) { + if (bytesRead == 0) return; + monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$ + new Object[] { title, Long.toString(bytesRead >> 10), Long.toString(bytesTotal >> 10) })); } - int wpos = wfirst; - // read exactly _size_ bytes - boolean fixCRLF = (newlineIn == SERVER_NEWLINE_BYTE); - boolean seenCR = false; // only true if fixCRLF and last byte was a CR - for (long totalRead = 0; totalRead < size;) { - Policy.checkCanceled(monitor); - int read = in.read(buffer, 0, (int) Math.min(wfirst, size - totalRead)); - if (read == -1) { - // file ended prematurely - throw new IOException(Policy.bind("Session.readError")); //$NON-NLS-1$ - } - totalRead += read; - if (newlineOut == null) { - // dump binary data - out.write(buffer, 0, read); - } else { - // filter newline sequences in memory from first half of buffer into second half - // then dump to output stream - for (int p = 0; p < read; ++p) { - final byte b = buffer[p]; - if (b == CARRIAGE_RETURN_BYTE && fixCRLF) { - seenCR = true; - } else { - if (b == newlineIn) { - // if fixCRLF we ignore previous CR (if there was one) - // replace newlineIn with newlineOut - for (int x = 0; x < newlineOut.length; ++x) buffer[wpos++] = newlineOut[x]; - } else { - if (seenCR) buffer[wpos++] = CARRIAGE_RETURN_BYTE; // preserve stray CR's - buffer[wpos++] = b; - } - seenCR = false; - } - if (wpos >= wlast) { - // flush output buffer - out.write(buffer, wfirst, wpos - wfirst); - wpos = wfirst; - } - } - } - // update progress monitor - if (totalRead > nextProgressThresh) { - monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$ - new Object[] { title, new Long(totalRead / 1024), ksize})); - nextProgressThresh = totalRead + TRANSFER_PROGRESS_INCREMENT; - } + }; + // if compression enabled, decompress on the fly + if (compressed) { + try { + in = new GZIPInputStream(in); + } catch (IOException e) { + throw CVSException.wrapException(e); } - // flush pending buffered output - if (seenCR) buffer[wpos++] = CARRIAGE_RETURN_BYTE; // preserve stray CR's - if (wpos != wfirst) out.write(buffer, wfirst, wpos - wfirst); - } catch (IOException e) { - throw CVSException.wrapException(e); } - } + // if not binary, translate line delimiters on the fly + if (! isBinary && IS_CRLF_PLATFORM) in = new LFtoCRLFInputStream(in); + // write the file locally + file.setContents(in, responseType, true, new NullProgressMonitor()); + } /** * Stores the value of the last Mod-time response encountered. diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java index d97ea7540..351edce30 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java @@ -5,7 +5,6 @@ package org.eclipse.team.internal.ccvs.core.connection; * All Rights Reserved. */ -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -40,7 +39,7 @@ public class Connection { private ICVSRepositoryLocation fCVSRoot; private String fCVSRootDirectory; private boolean fIsEstablished; - private BufferedInputStream fResponseStream; + private InputStream fResponseStream; private byte[] readLineBuffer = new byte[256]; public Connection(ICVSRepositoryLocation cvsroot, IServerConnection serverConnection) { @@ -104,7 +103,7 @@ public class Connection { if (!isEstablished()) return null; if (fResponseStream == null) - fResponseStream = new BufferedInputStream(serverConnection.getInputStream()); + fResponseStream = serverConnection.getInputStream(); return fResponseStream; } diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java index b3de2471a..e0a506880 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java @@ -5,6 +5,7 @@ package org.eclipse.team.internal.ccvs.core.connection; * All Rights Reserved. */ +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -14,6 +15,7 @@ import org.eclipse.team.ccvs.core.CVSProviderPlugin; import org.eclipse.team.ccvs.core.ICVSRepositoryLocation; import org.eclipse.team.ccvs.core.IServerConnection; import org.eclipse.team.internal.ccvs.core.Policy; +import org.eclipse.team.internal.ccvs.core.streams.*; /** * Implements a connection method which invokes an external tool to @@ -50,9 +52,17 @@ public class ExtConnection implements IServerConnection { * Closes the connection. */ public void close() throws IOException { - inputStream.close(); - outputStream.close(); - process.destroy(); + try { + if (inputStream != null) inputStream.close(); + } finally { + inputStream = null; + try { + if (outputStream != null) outputStream.close(); + } finally { + outputStream = null; + process.destroy(); + } + } } /** @@ -87,30 +97,52 @@ public class ExtConnection implements IServerConnection { int port = location.getPort(); if (port == location.USE_DEFAULT_PORT) port = DEFAULT_PORT; - try { - // The command line doesn't support the use of a port - if (port != DEFAULT_PORT) - throw new IOException(Policy.bind("EXTServerConnection.invalidPort")); //$NON-NLS-1$ - - if(CVS_RSH == null || CVS_SERVER == null) { - throw new IOException(Policy.bind("EXTServerConnection.varsNotSet")); //$NON-NLS-1$ - } + + // The command line doesn't support the use of a port + if (port != DEFAULT_PORT) + throw new IOException(Policy.bind("EXTServerConnection.invalidPort")); //$NON-NLS-1$ - process = Runtime.getRuntime().exec(command); + if(CVS_RSH == null || CVS_SERVER == null) { + throw new IOException(Policy.bind("EXTServerConnection.varsNotSet")); //$NON-NLS-1$ + } - inputStream = process.getInputStream(); - outputStream = process.getOutputStream(); - } catch (IOException ex) { - try { - close(); - } finally { - throw new IOException(Policy.bind("EXTServerConnection.ioError", CVS_RSH)); //$NON-NLS-1$ + boolean connected = false; + try { + process = Runtime.getRuntime().exec(command); + + inputStream = new PollingInputStream(new TimeoutInputStream(process.getInputStream(), + 8192 /*bufferSize*/, 1000 /*readTimeout*/, -1 /*closeTimeout*/), location.getTimeout(), monitor); + outputStream = new PollingOutputStream(new TimeoutOutputStream(process.getOutputStream(), + 8192 /*buffersize*/, 1000 /*writeTimeout*/, 1000 /*closeTimeout*/), location.getTimeout(), monitor); + + // XXX need to do something more useful with stderr + // discard the input to prevent the process from hanging due to a full pipe + Thread thread = new DiscardInputThread(process.getErrorStream()); + connected = true; + } finally { + if (! connected) { + try { + close(); + } finally { + throw new IOException(Policy.bind("EXTServerConnection.ioError", CVS_RSH)); //$NON-NLS-1$ + } } - } catch (RuntimeException e) { + } + } + + private static class DiscardInputThread extends Thread { + private InputStream in; + public DiscardInputThread(InputStream in) { + this.in = in; + } + public void run() { try { - close(); - } finally { - throw new IOException(Policy.bind("EXTServerConnection.ioError", CVS_RSH)); //$NON-NLS-1$ + try { + while (in.read() != -1); + } finally { + in.close(); + } + } catch (IOException e) { } } } diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java index c6c8461b2..bbd4e51fa 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java @@ -13,10 +13,11 @@ import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.Socket; -import org.eclipse.team.internal.ccvs.core.Policy; import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.team.ccvs.core.*; -import org.eclipse.team.ccvs.core.*; +import org.eclipse.team.ccvs.core.ICVSRepositoryLocation; +import org.eclipse.team.ccvs.core.IServerConnection; +import org.eclipse.team.internal.ccvs.core.Policy; +import org.eclipse.team.internal.ccvs.core.streams.*; /** * A connection used to talk to an cvs pserver. @@ -73,8 +74,21 @@ public class PServerConnection implements IServerConnection { * @see Connection#doClose() */ public void close() throws IOException { - fSocket.close(); - fSocket= null; + try { + if (inputStream != null) inputStream.close(); + } finally { + inputStream = null; + try { + if (outputStream != null) outputStream.close(); + } finally { + outputStream = null; + try { + if (fSocket != null) fSocket.close(); + } finally { + fSocket = null; + } + } + } } /** @@ -91,16 +105,17 @@ public class PServerConnection implements IServerConnection { monitor.worked(1); fSocket = createSocket(); + boolean connected = false; try { - this.inputStream = new BufferedInputStream(fSocket.getInputStream()); - this.outputStream = new BufferedOutputStream(fSocket.getOutputStream()); + this.inputStream = new BufferedInputStream(new PollingInputStream(fSocket.getInputStream(), + cvsroot.getTimeout(), monitor)); + this.outputStream = new PollingOutputStream(new TimeoutOutputStream( + fSocket.getOutputStream(), 8192 /*bufferSize*/, 1000 /*writeTimeout*/, 1000 /*closeTimeout*/), + cvsroot.getTimeout(), monitor); authenticate(); - } catch (IOException e) { - cleanUpAfterFailedConnection(); - throw e; - } catch (CVSAuthenticationException e) { - cleanUpAfterFailedConnection(); - throw e; + connected = true; + } finally { + if (! connected) cleanUpAfterFailedConnection(); } } @@ -212,7 +227,7 @@ public class PServerConnection implements IServerConnection { // If we get this exception, chances are the host is not responding throw new InterruptedIOException(Policy.bind("PServerConnection.socket", new Object[] {cvsroot.getHost()}));//$NON-NLS-1$ } - result.setSoTimeout(cvsroot.getTimeout() * 1000); + result.setSoTimeout(1000); // 1 second between timeouts return result; } diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties index c8e5a130e..a1cece4e6 100644 --- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties @@ -242,3 +242,5 @@ EclipseSynchronizer.ErrorSettingIgnorePattern=Cannot set ignored pattern on {0} EclipseSynchronizer.ErrorDeletingFolderSync=Could not delete all CVS folders SyncFileChangeListener.errorSettingTeamPrivateFlag=Error setting team-private flag on resource +PollingInputStream.readTimeout=Timeout during read operation +PollingOutputStream.writeTimeout=Timeout during write operation diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java new file mode 100644 index 000000000..ecf856bb5 --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java @@ -0,0 +1,155 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial API and implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +/** + * Converts CR/LFs in the underlying input stream to LF. + * + * Supports resuming partially completed operations after an InterruptedIOException + * if the underlying stream does. Check the bytesTransferred field to determine how + * much of the operation completed; conversely, at what point to resume. + */ +public class CRLFtoLFInputStream extends FilterInputStream { + private boolean pendingByte = false; + private int lastByte = -1; + + /** + * Creates a new filtered input stream. + * @param in the underlying input stream + */ + public CRLFtoLFInputStream(InputStream in) { + super(in); + } + + /** + * Wraps the underlying stream's method. + * Translates CR/LF sequences to LFs transparently. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public int read() throws IOException { + if (! pendingByte) { + lastByte = in.read(); // ok if this throws + pendingByte = true; // remember the byte in case we throw an exception later on + } + if (lastByte == '\r') { + lastByte = in.read(); // ok if this throws + if (lastByte != '\n') { + if (lastByte == -1) pendingByte = false; + return '\r'; // leaves the byte pending for later + } + } + pendingByte = false; + return lastByte; + } + + /** + * Wraps the underlying stream's method. + * Translates CR/LF sequences to LFs transparently. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public int read(byte[] buffer, int off, int len) throws IOException { + // handle boundary cases cleanly + if (len == 0) { + return 0; + } else if (len == 1) { + int b = read(); + if (b == -1) return -1; + buffer[off] = (byte) b; + return 1; + } + // read some bytes from the stream + // prefix with pending byte from last read if any + int count = 0; + if (pendingByte) { + buffer[off] = (byte) lastByte; + pendingByte = false; + count = 1; + } + InterruptedIOException iioe = null; + try { + len = in.read(buffer, off + count, len - count); + if (len == -1) { + return (count == 0) ? -1 : count; + } + } catch (InterruptedIOException e) { + len = e.bytesTransferred; + iioe = e; + } + count += len; + // strip out CR's in CR/LF pairs + // pendingByte will be true iff previous byte was a CR + int j = 0; + for (int i = 0; i < count; ++i) { // invariant: j <= i + lastByte = buffer[i]; + if (lastByte == '\r') { + if (pendingByte) { + buffer[j++] = '\r'; // write out orphan CR + } else { + pendingByte = true; + } + } else { + if (pendingByte) { + if (lastByte != '\n') buffer[j++] = '\r'; // if LF, don't write the CR + pendingByte = false; + } + buffer[j++] = (byte) lastByte; + } + } + if (iioe != null) { + iioe.bytesTransferred = j; + throw iioe; + } + return j; + } + + /** + * Calls read() to skip the specified number of bytes + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public long skip(long count) throws IOException { + int actualCount = 0; // assumes count < Integer.MAX_INT + try { + while (count-- > 0 && read() != -1) actualCount++; // skip the specified number of bytes + return actualCount; + } catch (InterruptedIOException e) { + e.bytesTransferred = actualCount; + throw e; + } + } + + /** + * Wraps the underlying stream's method. + * Returns the number of bytes that can be read without blocking; accounts for + * possible translation of CR/LF sequences to LFs in these bytes. + * @throws IOException if an i/o error occurs + */ + public int available() throws IOException { + return in.available() / 2; // we can guarantee at least this amount after contraction + } + + /** + * Mark is not supported by the wrapper even if the underlying stream does, returns false. + */ + public boolean markSupported() { + return false; + } +} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java new file mode 100644 index 000000000..6c4d20e4a --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java @@ -0,0 +1,146 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial API and implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +/** + * Converts LFs in the underlying input stream to CR/LF. + * + * Supports resuming partially completed operations after an InterruptedIOException + * if the underlying stream does. Check the bytesTransferred field to determine how + * much of the operation completed; conversely, at what point to resume. + */ +public class LFtoCRLFInputStream extends FilterInputStream { + private boolean mustReturnLF = false; + + /** + * Creates a new filtered input stream. + * @param in the underlying input stream + */ + public LFtoCRLFInputStream(InputStream in) { + super(in); + } + + /** + * Wraps the underlying stream's method. + * Translates LFs to CR/LF sequences transparently. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public int read() throws IOException { + if (mustReturnLF) { + mustReturnLF = false; + return '\n'; + } + int b = in.read(); // ok if this throws + if (b == '\n') { + mustReturnLF = true; + b = '\r'; + } + return b; + } + + /** + * Wraps the underlying stream's method. + * Translates LFs to CR/LF sequences transparently. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public int read(byte[] buffer, int off, int len) throws IOException { + // handle boundary cases cleanly + if (len == 0) { + return 0; + } else if (len == 1) { + int b = read(); + if (b == -1) return -1; + buffer[off] = (byte) b; + return 1; + } + // prefix with remembered \n from last read, but don't expand it a second time + int count = 0; + if (mustReturnLF) { + mustReturnLF = false; + buffer[off++] = '\n'; + --len; + count = 1; + if (len < 2) return count; // is there still enough room to expand more? + } + // read some bytes from the stream into the back half of the buffer + // this guarantees that there is always room to expand + len /= 2; + int j = off + len; + InterruptedIOException iioe = null; + try { + len = in.read(buffer, j, len); + if (len == -1) { + return (count == 0) ? -1 : count; + } + } catch (InterruptedIOException e) { + len = e.bytesTransferred; + iioe = e; + } + count += len; + // copy bytes from the middle to the front of the array, expanding LF->CR/LF + while (len-- > 0) { + byte b = buffer[j++]; + if (b == '\n') { + buffer[off++] = '\r'; + count++; + } + buffer[off++] = b; + } + if (iioe != null) { + iioe.bytesTransferred = count; + throw iioe; + } + return count; + } + + /** + * Calls read() to skip the specified number of bytes + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public long skip(long count) throws IOException { + int actualCount = 0; // assumes count < Integer.MAX_INT + try { + while (count-- > 0 && read() != -1) actualCount++; // skip the specified number of bytes + return actualCount; + } catch (InterruptedIOException e) { + e.bytesTransferred = actualCount; + throw e; + } + } + + /** + * Wraps the underlying stream's method. + * Returns the number of bytes that can be read without blocking; accounts for + * possible translation of LFs to CR/LF sequences in these bytes. + * @throws IOException if an i/o error occurs + */ + public int available() throws IOException { + return in.available(); // we can guarantee at least this amount after expansion + } + + /** + * Mark is not supported by the wrapper even if the underlying stream does, returns false. + */ + public boolean markSupported() { + return false; + } +} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java new file mode 100644 index 000000000..931b470c3 --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java @@ -0,0 +1,113 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial API and implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.OperationCanceledException; +import org.eclipse.team.internal.ccvs.core.Policy; + +/** + * Polls a progress monitor periodically and handles timeouts over extended durations. + * For this class to be effective, a high numAttempts should be specified, and the + * underlying stream should time out frequently on reads (every second or so). + * + * Supports resuming partially completed operations after an InterruptedIOException + * if the underlying stream does. Check the bytesTransferred field to determine how + * much of the operation completed; conversely, at what point to resume. + */ +public class PollingInputStream extends FilterInputStream { + private static final boolean DEBUG = false; + private int numAttempts; + private IProgressMonitor monitor; + + /** + * Creates a new polling input stream. + * @param in the underlying input stream + * @param numAttempts the number of attempts before issuing an InterruptedIOException, + * if 0, retries indefinitely until canceled + * @param monitor the progress monitor to be polled for cancellation + */ + public PollingInputStream(InputStream in, int numAttempts, IProgressMonitor monitor) { + super(in); + this.numAttempts = numAttempts; + this.monitor = monitor; + } + + /** + * Wraps the underlying stream's method. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times + * and no data was received, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public int read() throws IOException { + int attempts = 0; + for (;;) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + try { + return in.read(); + } catch (InterruptedIOException e) { + if (++attempts == numAttempts) + throw new InterruptedIOException(Policy.bind("PollingInputStream.readTimeout")); + if (DEBUG) System.out.println("read retry=" + attempts); + } + } + } + + /** + * Wraps the underlying stream's method. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times + * and no data was received, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public int read(byte[] buffer, int off, int len) throws IOException { + int attempts = 0; + for (;;) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + try { + return in.read(buffer, off, len); + } catch (InterruptedIOException e) { + if (e.bytesTransferred != 0) return e.bytesTransferred; // keep partial transfer + if (++attempts == numAttempts) + throw new InterruptedIOException(Policy.bind("PollingInputStream.readTimeout")); + if (DEBUG) System.out.println("read retry=" + attempts); + } + } + } + + /** + * Wraps the underlying stream's method. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times + * and no data was received, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public long skip(long count) throws IOException { + int attempts = 0; + for (;;) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + try { + return in.skip(count); + } catch (InterruptedIOException e) { + if (e.bytesTransferred != 0) return e.bytesTransferred; // keep partial transfer + if (++attempts == numAttempts) + throw new InterruptedIOException(Policy.bind("PollingInputStream.readTimeout")); + if (DEBUG) System.out.println("read retry=" + attempts); + } + } + } +} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java new file mode 100644 index 000000000..c3597893b --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java @@ -0,0 +1,150 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial API and implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.OperationCanceledException; +import org.eclipse.team.internal.ccvs.core.Policy; + +/** + * Polls a progress monitor periodically and handles timeouts over extended durations. + * For this class to be effective, a high numAttempts should be specified, and the + * underlying stream should time out frequently on writes (every second or so). + * + * Supports resuming partially completed operations after an InterruptedIOException + * if the underlying stream does. Check the bytesTransferred field to determine how + * much of the operation completed; conversely, at what point to resume. + */ +public class PollingOutputStream extends FilterOutputStream { + private static final boolean DEBUG = false; + private int numAttempts; + private IProgressMonitor monitor; + + /** + * Creates a new polling output stream. + * @param in the underlying output stream + * @param numAttempts the number of attempts before issuing an InterruptedIOException, + * if 0, retries indefinitely until canceled + * @param monitor the progress monitor to be polled for cancellation + */ + public PollingOutputStream(OutputStream out, int numAttempts, IProgressMonitor monitor) { + super(out); + this.numAttempts = numAttempts; + this.monitor = monitor; + } + + /** + * Wraps the underlying stream's method. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times + * and no data was sent, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public void write(int b) throws IOException { + int attempts = 0; + for (;;) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + try { + out.write(b); + return; + } catch (InterruptedIOException e) { + if (++attempts == numAttempts) + throw new InterruptedIOException(Policy.bind("PollingOutputStream.writeTimeout")); + if (DEBUG) System.out.println("write retry=" + attempts); + } + } + } + + /** + * Wraps the underlying stream's method. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times, + * bytesTransferred will reflect the number of bytes sent + * @throws IOException if an i/o error occurs + */ + public void write(byte[] buffer, int off, int len) throws IOException { + int count = 0; + int attempts = 0; + for (;;) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + try { + out.write(buffer, off, len); + return; + } catch (InterruptedIOException e) { + int amount = e.bytesTransferred; + if (amount != 0) { // keep partial transfer + len -= amount; + if (len <= 0) return; + off += amount; + count += amount; + attempts = 0; // made some progress, don't time out quite yet + } + if (++attempts == numAttempts) { + e = new InterruptedIOException(Policy.bind("PollingOutputStream.writeTimeout")); + e.bytesTransferred = count; + throw e; + } + if (DEBUG) System.out.println("write retry=" + attempts); + } + } + } + + /** + * Wraps the underlying stream's method. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times, + * bytesTransferred will reflect the number of bytes sent + * @throws IOException if an i/o error occurs + */ + public void flush() throws IOException { + int count = 0; + int attempts = 0; + for (;;) { + if (monitor.isCanceled()) throw new OperationCanceledException(); + try { + out.flush(); + return; + } catch (InterruptedIOException e) { + int amount = e.bytesTransferred; + if (amount != 0) { // keep partial transfer + count += amount; + attempts = 0; // made some progress, don't time out quite yet + } + if (++attempts == numAttempts) { + e = new InterruptedIOException(Policy.bind("PollingOutputStream.writeTimeout")); + e.bytesTransferred = count; + throw e; + } + if (DEBUG) System.out.println("write retry=" + attempts); + } + } + } + + /** + * Calls flush() then close() on the underlying stream. + * @throws OperationCanceledException if the progress monitor is canceled + * @throws InterruptedIOException if the underlying operation times out numAttempts times, + * bytesTransferred will reflect the number of bytes sent during the flush() + * @throws IOException if an i/o error occurs + */ + public void close() throws IOException { + try { + flush(); + } finally { + out.close(); + } + } +}
\ No newline at end of file diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java new file mode 100644 index 000000000..9ee8e93cb --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java @@ -0,0 +1,140 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.OperationCanceledException; + +/** + * Updates a progress monitor as bytes are read from the input stream. + * Also starts a background thread to provide responsive cancellation on read(). + * + * Supports resuming partially completed operations after an InterruptedIOException + * if the underlying stream does. Check the bytesTransferred field to determine how + * much of the operation completed; conversely, at what point to resume. + */ +public abstract class ProgressMonitorInputStream extends FilterInputStream { + private IProgressMonitor monitor; + private int updateIncrement; + private long bytesTotal; + private long bytesRead = 0; + private long lastUpdate = -1; + private long nextUpdate = 0; + + /** + * Creates a progress monitoring input stream. + * @param in the underlying input stream + * @param bytesTotal the number of bytes to read in total (passed to updateMonitor()) + * @param updateIncrement the number of bytes read between updates + * @param monitor the progress monitor + */ + public ProgressMonitorInputStream(InputStream in, long bytesTotal, int updateIncrement, IProgressMonitor monitor) { + super(in); + this.bytesTotal = bytesTotal; + this.updateIncrement = updateIncrement; + this.monitor = monitor; + update(true); + } + + protected abstract void updateMonitor(long bytesRead, long size, IProgressMonitor monitor); + + /** + * Wraps the underlying stream's method. + * Updates the progress monitor to the final number of bytes read. + * @throws IOException if an i/o error occurs + */ + public void close() throws IOException { + try { + in.close(); + } finally { + update(true); + } + } + + /** + * Wraps the underlying stream's method. + * Updates the progress monitor if the next update increment has been reached. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public int read() throws IOException { + int b = in.read(); + if (b != -1) { + bytesRead += 1; + update(false); + } + return b; + } + + /** + * Wraps the underlying stream's method. + * Updates the progress monitor if the next update increment has been reached. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public int read(byte[] buffer, int offset, int length) throws IOException { + try { + int count = in.read(buffer, offset, length); + if (count != -1) { + bytesRead += count; + update(false); + } + return count; + } catch (InterruptedIOException e) { + bytesRead += e.bytesTransferred; + update(false); + throw e; + } + } + + /** + * Wraps the underlying stream's method. + * Updates the progress monitor if the next update increment has been reached. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public long skip(long amount) throws IOException { + try { + long count = in.skip(amount); + bytesRead += count; + update(false); + return count; + } catch (InterruptedIOException e) { + bytesRead += e.bytesTransferred; + update(false); + throw e; + } + } + + /** + * Mark is not supported by the wrapper even if the underlying stream does, returns false. + */ + public boolean markSupported() { + return false; + } + + private void update(boolean now) { + if (bytesRead >= nextUpdate || now) { + nextUpdate = bytesRead - (bytesRead % updateIncrement); + if (nextUpdate != lastUpdate) updateMonitor(nextUpdate, bytesTotal, monitor); + lastUpdate = nextUpdate; + nextUpdate += updateIncrement; + } + } +} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java new file mode 100644 index 000000000..17498cc42 --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +/** + * Simulates a stream that represents only a portion of the underlying stream. + * Will report EOF when this portion has been fully read and prevent further reads. + * The underlying stream is not closed on close(), but the remaining unread input + * may optionally be skip()'d. + * + * Supports resuming partially completed operations after an InterruptedIOException + * if the underlying stream does. Check the bytesTransferred field to determine how + * much of the operation completed; conversely, at what point to resume. + */ +public class SizeConstrainedInputStream extends FilterInputStream { + private boolean discardOnClose; + private long bytesRemaining; + + /** + * Creates a size contrained input stream. + * @param in the underlying input stream, never actually closed by this filter + * @param size the maximum number of bytes of the underlying input stream that + * can be read through this filter + * @param discardOnClose if true, discards remaining unread bytes on close() + */ + public SizeConstrainedInputStream(InputStream in, long size, boolean discardOnClose) { + super(in); + this.bytesRemaining = size; + this.discardOnClose = discardOnClose; + } + + /** + * Prevents further reading from the stream but does not close the underlying stream. + * If discardOnClose, skip()'s over any remaining unread bytes in the constrained region. + * @throws IOException if an i/o error occurs + */ + public void close() throws IOException { + try { + if (discardOnClose) { + while (bytesRemaining != 0 && skip(bytesRemaining) != 0); + } + } finally { + bytesRemaining = 0; + } + } + + /** + * Wraps the underlying stream's method. + * Simulates an end-of-file condition if the end of the constrained region has been reached. + * @throws IOException if an i/o error occurs + */ + public int available() throws IOException { + int amount = in.available(); + if (amount > bytesRemaining) amount = (int) bytesRemaining; + return amount; + } + + /** + * Wraps the underlying stream's method. + * Simulates an end-of-file condition if the end of the constrained region has been reached. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public int read() throws IOException { + if (bytesRemaining == 0) return -1; + int b = in.read(); + if (b != -1) bytesRemaining -= 1; + return b; + } + + /** + * Wraps the underlying stream's method. + * Simulates an end-of-file condition if the end of the constrained region has been reached. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public int read(byte[] buffer, int offset, int length) throws IOException { + if (length >= bytesRemaining) { + length = (int) bytesRemaining; + if (length == 0) return -1; + } + try { + int count = in.read(buffer, offset, length); + if (count != -1) bytesRemaining -= count; + return count; + } catch (InterruptedIOException e) { + bytesRemaining -= e.bytesTransferred; + throw e; + } + } + + /** + * Wraps the underlying stream's method. + * Simulates an end-of-file condition if the end of the constrained region has been reached. + * @throws InterruptedIOException if the operation was interrupted before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public long skip(long amount) throws IOException { + if (amount > bytesRemaining) amount = bytesRemaining; + try { + long count = in.skip(amount); + bytesRemaining -= count; + return count; + } catch (InterruptedIOException e) { + bytesRemaining -= e.bytesTransferred; + throw e; + } + } + + /** + * Mark is not supported by the wrapper even if the underlying stream does, returns false. + */ + public boolean markSupported() { + return false; + } +} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java new file mode 100644 index 000000000..72c8eb495 --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java @@ -0,0 +1,256 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial API and implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; + +/** + * Wraps an input stream that blocks indefinitely to simulate timeouts on read(), + * skip(), and close(). The resulting input stream is buffered and supports + * retrying operations that failed due to an InterruptedIOException. + * + * Supports resuming partially completed operations after an InterruptedIOException + * REGARDLESS of whether the underlying stream does unless the underlying stream itself + * generates InterruptedIOExceptions in which case it must also support resuming. + * Check the bytesTransferred field to determine how much of the operation completed; + * conversely, at what point to resume. + */ +public class TimeoutInputStream extends FilterInputStream { + private byte[] iobuffer; // circular buffer + private int head = 0; // points to first unread byte + private int length = 0; // number of remaining unread bytes, -1 if closed + + private long readTimeout; + private long closeTimeout; + private Thread thread = new Thread(new FillBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$ + private IOException ioe = null; + private RuntimeException re = null; + + /** + * Creates a timeout wrapper for an input stream. + * @param in the underlying input stream + * @param bufferSize the buffer size in bytes; should be large enough to mitigate + * Thread synchronization and context switching overhead + * @param readTimeout the number of milliseconds to block for a read() or skip() before + * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block + * @param closeTimeout the number of milliseconds to block for a close() before throwing + * an InterruptedIOException; 0 blocks indefinitely, -1 does not block + */ + public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) { + super(in); + this.iobuffer = new byte[bufferSize]; + this.readTimeout = readTimeout; + this.closeTimeout = closeTimeout; + thread.setDaemon(true); + thread.start(); + } + + /** + * Wraps the underlying stream's method. + * @throws IOException if an i/o error occurs + */ + public void close() throws IOException { + if (thread == null) return; + Thread oldThread = thread; + thread = null; + oldThread.interrupt(); + if (closeTimeout != -1) { + try { + oldThread.join(closeTimeout); + } catch (InterruptedException e) { + } + } + synchronized (this) { + if (ioe != null) throw ioe; + if (re != null) throw re; + } + } + + /** + * Returns the number of unread bytes in the buffer. + */ + public synchronized int available() throws IOException { + return length > 0 ? length : 0; + } + + /** + * Reads a byte from the stream. + * @throws InterruptedIOException if the timeout expired and no data was received, + * bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public synchronized int read() throws IOException { + syncfill(); + if (length == -1) return -1; + int b = iobuffer[head++]; + if (head == iobuffer.length) head = 0; + length--; + asyncfill(); + return b; + } + + /** + * Reads multiple bytes from the stream. + * @throws InterruptedIOException if the timeout expired and no data was received, + * bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public synchronized int read(byte[] buffer, int off, int len) throws IOException { + if (len == 0) return 0; + syncfill(); + if (length == -1) return -1; + int pos = off; + if (len > length) len = length; + while (len-- > 0) { + buffer[pos++] = iobuffer[head++]; + if (head == iobuffer.length) head = 0; + length--; + } + asyncfill(); + return pos - off; + } + + /** + * Skips multiple bytes in the stream. + * @throws InterruptedIOException if the timeout expired before all of the + * bytes specified have been skipped, bytesTransferred may be non-zero + * @throws IOException if an i/o error occurs + */ + public synchronized long skip(long count) throws IOException { + long amount = 0; + try { + while (count != 0 && length != -1) { + int skip = (count > length) ? length : (int) count; + head = (head + skip) % iobuffer.length; + length -= skip; + amount += skip; + count -= skip; + syncfill(); + } + return amount; + } catch (InterruptedIOException e) { + e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT + throw e; + } + } + + /** + * Mark is not supported by the wrapper even if the underlying stream does, returns false. + */ + public boolean markSupported() { + return false; + } + + private void syncfill() throws IOException { + if (length == 0) { + asyncfill(); + if (readTimeout != -1) { + try { + wait(readTimeout); + } catch (InterruptedException e) { + } + } + if (length == 0) { + throw new InterruptedIOException(); + } + } + } + + private void asyncfill() throws IOException { + try { + if (ioe != null) { + IOException e = ioe; + ioe = null; + throw e; + } + if (re != null) { + RuntimeException e = re; + re = null; + throw e; + } + } finally { + if (length != -1 && length != iobuffer.length) { + notify(); + } + } + } + + private class FillBufferRunnable implements Runnable { + public void run() { + final Object lock = TimeoutInputStream.this; + try { + boolean eof = false; + for (;;) { + int off, len; + synchronized (lock) { + try { + while (thread != null && (length == iobuffer.length || eof || ioe != null || re != null)) { + lock.wait(); + } + if (thread == null) return; // quit signal + } catch (InterruptedException e) { + return; // alternative quit signal + } + off = (head + length) % iobuffer.length; + len = ((head > off) ? head : iobuffer.length) - off; + } + try { + // the i/o operation might block without releasing the lock, + // so we do this outside of the synchronized block + int count = in.read(iobuffer, off, len); + if (count == -1) eof = true; + synchronized (lock) { + if (eof) { + if (length == 0) length = -1; + } else { + length += count; + } + if (count != 0) lock.notify(); + } + } catch (InterruptedIOException e) { + int count = e.bytesTransferred; // keep partial transfer + e.bytesTransferred = 0; // not relevant if rethrown + synchronized (lock) { + if (length != -1) length += count; + ioe = e; + lock.notify(); + } + } catch (IOException e) { + synchronized (lock) { + ioe = e; + lock.notify(); + } + } catch (RuntimeException e) { + synchronized (lock) { + re = e; + lock.notify(); + } + } + } + } finally { + try { + in.close(); + } catch (IOException e) { + synchronized (lock) { + ioe = e; + } + } catch (RuntimeException e) { + synchronized (lock) { + re = e; + } + } + } + } + } +} diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java new file mode 100644 index 000000000..201b804ba --- /dev/null +++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java @@ -0,0 +1,244 @@ +/******************************************************************************* + * Copyright (c) 2002 IBM Corporation and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Common Public License v0.5 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/cpl-v05.html + * + * Contributors: + * IBM - Initial API and implementation + ******************************************************************************/ +package org.eclipse.team.internal.ccvs.core.streams; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; + +/** + * Wraps an output stream that blocks indefinitely to simulate timeouts on write(), + * flush(), and close(). The resulting output stream is buffered and supports + * retrying operations that failed due to an InterruptedIOException. + * + * Supports resuming partially completed operations after an InterruptedIOException + * REGARDLESS of whether the underlying stream does unless the underlying stream itself + * generates InterruptedIOExceptions in which case it must also support resuming. + * Check the bytesTransferred field to determine how much of the operation completed; + * conversely, at what point to resume. + */ +public class TimeoutOutputStream extends FilterOutputStream { + private byte[] iobuffer; // circular buffer + private int head = 0; // points to first unwritten byte + private int length = 0; // number of remaining unwritten bytes + + private long writeTimeout; + private long closeTimeout; + private boolean flushRequested = false; + private Thread thread = new Thread(new CommitBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$ + private IOException ioe = null; + private RuntimeException re = null; + + /** + * Creates a timeout wrapper for an output stream. + * @param out the underlying input stream + * @param bufferSize the buffer size in bytes; should be large enough to mitigate + * Thread synchronization and context switching overhead + * @param writeTimeout the number of milliseconds to block for a write() or flush() before + * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block + * @param closeTimeout the number of milliseconds to block for a close() before throwing + * an InterruptedIOException; 0 blocks indefinitely, -1 does not block + */ + public TimeoutOutputStream(OutputStream out, int bufferSize, long writeTimeout, long closeTimeout) { + super(out); + this.iobuffer = new byte[bufferSize]; + this.writeTimeout = writeTimeout; + this.closeTimeout = closeTimeout; + thread.setDaemon(true); + thread.start(); + } + + /** + * Wraps the underlying stream's method. + * @throws InterruptedIOException if the timeout expired, bytesTransferred will + * reflect the number of bytes flushed from the buffer + * @throws IOException if an i/o error occurs + */ + public void close() throws IOException { + if (thread == null) return; + try { + flush(); + } finally { + Thread oldThread = thread; + thread = null; + oldThread.interrupt(); + if (closeTimeout != -1) { + try { + oldThread.join(closeTimeout); + } catch (InterruptedException e) { + } + } + synchronized (this) { + if (ioe != null) throw ioe; + if (re != null) throw re; + } + } + } + + /** + * Writes a byte to the stream. + * @throws InterruptedIOException if the timeout expired and no data was sent, + * bytesTransferred will be zero + * @throws IOException if an i/o error occurs + */ + public synchronized void write(int b) throws IOException { + if (length == iobuffer.length) synccommit(); + iobuffer[(head + length) % iobuffer.length] = (byte) b; + length++; + asynccommit(); + } + + /** + * Writes multiple bytes to the stream. + * @throws InterruptedIOException if the timeout expired, bytesTransferred will + * reflect the number of bytes sent + * @throws IOException if an i/o error occurs + */ + public synchronized void write(byte[] buffer, int off, int len) throws IOException { + int amount = 0; + try { + while (len-- > 0) { + if (length == iobuffer.length) synccommit(); + iobuffer[(head + length) % iobuffer.length] = buffer[off++]; + length++; + amount++; + } + asynccommit(); + } catch (InterruptedIOException e) { + e.bytesTransferred = amount; + throw e; + } + } + + /** + * Flushes the stream. + * @throws InterruptedIOException if the timeout expired, bytesTransferred will + * reflect the number of bytes flushed from the buffer + * @throws IOException if an i/o error occurs + */ + public synchronized void flush() throws IOException { + int oldLength = length; + flushRequested = true; + InterruptedIOException iioe = null; + try { + synccommit(); + if (length == 0) return; + iioe = new InterruptedIOException(); + } catch (InterruptedIOException e) { + iioe = e; + } + iioe.bytesTransferred = oldLength - length; + throw iioe; + } + + private void synccommit() throws IOException { + asynccommit(); + if (length != 0) { + int oldLength = length; + if (writeTimeout != -1) { + try { + wait(writeTimeout); + } catch (InterruptedException e) { + } + } + if (length == oldLength) { + throw new InterruptedIOException(); + } + } + } + + private void asynccommit() throws IOException { + try { + if (ioe != null) { + IOException e = ioe; + ioe = null; + throw e; + } + if (re != null) { + RuntimeException e = re; + re = null; + throw e; + } + } finally { + if (length != 0 || flushRequested) notify(); + } + } + + private class CommitBufferRunnable implements Runnable { + public void run() { + final Object lock = TimeoutOutputStream.this; + boolean running = true; + try { + for (;;) { + int off, len; + synchronized (lock) { + try { + while (running && thread != null && ((length == 0 && ! flushRequested) || ioe != null || re != null)) { + lock.wait(); + } + if (thread == null) running = false; // quit signal + if (! running && length == 0) return; + } catch (InterruptedException e) { + running = false; // alternative quit signal + } + off = head; + len = iobuffer.length - head; + if (len > length) len = length; + } + try { + // the i/o operation might block without releasing the lock, + // so we do this outside of the synchronized block + if (len != 0) out.write(iobuffer, off, len); + if (flushRequested) out.flush(); + synchronized (lock) { + flushRequested = false; + head = (head + len) % iobuffer.length; + length -= len; + lock.notify(); + } + } catch (InterruptedIOException e) { + len = e.bytesTransferred; // keep partial transfer + e.bytesTransferred = 0; // not relevant if rethrown + synchronized (lock) { + head = (head + len) % iobuffer.length; + length -= len; + ioe = e; + lock.notify(); + } + } catch (IOException e) { + synchronized (lock) { + ioe = e; + lock.notify(); + } + } catch (RuntimeException e) { + synchronized (lock) { + re = e; + lock.notify(); + } + } + } + } finally { + try { + out.close(); + } catch (IOException e) { + synchronized (lock) { + ioe = e; + } + } catch (RuntimeException e) { + synchronized (lock) { + re = e; + } + } + } + } + } +} diff --git a/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java b/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java index 97dd1e5d3..7cf832116 100644 --- a/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java +++ b/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java @@ -19,6 +19,9 @@ import java.net.Socket; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.team.internal.ccvs.core.connection.CVSAuthenticationException; +import org.eclipse.team.internal.ccvs.core.streams.PollingInputStream; +import org.eclipse.team.internal.ccvs.core.streams.PollingOutputStream; +import org.eclipse.team.internal.ccvs.core.streams.TimeoutOutputStream; public class Client { // client identification string @@ -335,10 +338,13 @@ public void connect(IProgressMonitor monitor) throws IOException, CVSAuthenticat } if (timeout >= 0) { - socket.setSoTimeout(timeout * 1000); + socket.setSoTimeout(1000); } - socketIn = new BufferedInputStream(socket.getInputStream()); - socketOut = new BufferedOutputStream(socket.getOutputStream()); + socketIn = new BufferedInputStream(new PollingInputStream(socket.getInputStream(), + timeout > 0 ? timeout : 1, monitor)); + socketOut = new PollingOutputStream(new TimeoutOutputStream( + socket.getOutputStream(), 8192 /*bufferSize*/, 1000 /*writeTimeout*/, 1000 /*closeTimeout*/), + timeout > 0 ? timeout : 1, monitor); } // read the ssh server id. The socket creation may of failed if the @@ -371,13 +377,9 @@ public void connect(IProgressMonitor monitor) throws IOException, CVSAuthenticat os = new StandardOutputStream(); connected = true; - } catch (IOException e) { - // If an exception occurs while connected, make sure we disconnect before passing the exception on - try { - cleanup(); - } finally { - throw e; - } + // If an exception occurs while connected, make sure we disconnect before passing the exception on + } finally { + if (! connected) cleanup(); } } /** |