Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJean Michel-Lemieux2002-04-08 15:11:31 -0400
committerJean Michel-Lemieux2002-04-08 15:11:31 -0400
commit8b4faf973400073c07a82c84027e540de80b4da5 (patch)
treeaadefdf288175dd12fe6470e2ec660b8a6e3701f
parent930aa2343da4aa6716d80325283c7c8e15665065 (diff)
downloadeclipse.platform.team-8b4faf973400073c07a82c84027e540de80b4da5.tar.gz
eclipse.platform.team-8b4faf973400073c07a82c84027e540de80b4da5.tar.xz
eclipse.platform.team-8b4faf973400073c07a82c84027e540de80b4da5.zip
- improved handling of '.project' in EclipseFile.setContents() to avoid
accidental closure of input stream - replaced ad-hoc CR/LF conversion code with stream filters - replaced ad-hoc file transfer progress monitor code with stream filter - added low-level progress monitor cancellation support - added timeouts on socket writing and on pipe reading and writing - removed redundant buffer in Connection - fixed possible i/o stream leak in Ext connection method - fixed i/o stream leak in PServer connection method - fixed i/o stream and socket leak in ExtSSH connection method after failed authentication or cancellation - fixed an error reporting condition in ExtSSH connection method
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java18
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java53
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java188
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java282
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java5
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java78
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java43
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties2
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java155
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java146
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java113
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java150
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java140
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java132
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java256
-rw-r--r--bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java244
-rw-r--r--bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java22
17 files changed, 1563 insertions, 464 deletions
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java
index 058c6b3ee..9348e433a 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSProviderPlugin.java
@@ -52,6 +52,8 @@ public class CVSProviderPlugin extends Plugin {
public static final boolean DEFAULT_FETCH = true;
// communication timeout with the server
public static final int DEFAULT_TIMEOUT = 60;
+ // file transfer compression level (0 - 9)
+ public static final int DEFAULT_COMPRESSION_LEVEL = 0;
// cvs plugin extension points and ids
public static final String ID = "org.eclipse.team.cvs.core"; //$NON-NLS-1$
@@ -59,6 +61,7 @@ public class CVSProviderPlugin extends Plugin {
public static final String PT_CONNECTIONMETHODS = "connectionmethods"; //$NON-NLS-1$
private QuietOption quietness;
+ private int compressionLevel = DEFAULT_COMPRESSION_LEVEL;
private int communicationsTimeout = DEFAULT_TIMEOUT;
private boolean pruneEmptyDirectories = DEFAULT_PRUNE;
private boolean fetchAbsentDirectories = DEFAULT_FETCH;
@@ -130,6 +133,21 @@ public class CVSProviderPlugin extends Plugin {
public static String getTypeId() {
return NATURE_ID;
}
+
+ /**
+ * Sets the file transfer compression level. (if supported)
+ * Valid levels are: 0 (disabled), 1 (worst/fastest) - 9 (best/slowest)
+ */
+ public void setCompressionLevel(int level) {
+ compressionLevel = level;
+ }
+
+ /**
+ * Gets the file transfer compression level.
+ */
+ public int getCompressionLevel() {
+ return compressionLevel;
+ }
/**
* Should the CVS adapter prune empty directories
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java
index 5b0b6ed5a..0a161173c 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/CVSTeamProvider.java
@@ -63,6 +63,8 @@ import org.eclipse.team.internal.ccvs.core.connection.CVSServerException;
import org.eclipse.team.internal.ccvs.core.resources.CVSRemoteSyncElement;
import org.eclipse.team.internal.ccvs.core.resources.CVSWorkspaceRoot;
import org.eclipse.team.internal.ccvs.core.resources.EclipseSynchronizer;
+import org.eclipse.team.internal.ccvs.core.streams.CRLFtoLFInputStream;
+import org.eclipse.team.internal.ccvs.core.streams.LFtoCRLFInputStream;
import org.eclipse.team.internal.ccvs.core.syncinfo.FolderSyncInfo;
import org.eclipse.team.internal.ccvs.core.syncinfo.MutableResourceSyncInfo;
import org.eclipse.team.internal.ccvs.core.syncinfo.ResourceSyncInfo;
@@ -99,11 +101,8 @@ import org.eclipse.team.internal.ccvs.core.util.PrepareForReplaceVisitor;
* have them appear in Eclipse. This may be changed in the future.
*/
public class CVSTeamProvider extends RepositoryProvider {
- private static final int CR_BYTE = 0x0D;
- private static final int LF_BYTE = 0x0A;
private static final boolean IS_CRLF_PLATFORM = Arrays.equals(
- System.getProperty("line.separator").getBytes(), //$NON-NLS-1$
- new byte[] { CR_BYTE, LF_BYTE });
+ System.getProperty("line.separator").getBytes(), new byte[] { '\r', '\n' }); //$NON-NLS-1$
private CVSWorkspaceRoot workspaceRoot;
private IProject project;
@@ -1055,47 +1054,19 @@ public class CVSTeamProvider extends RepositoryProvider {
throws CVSException {
try {
// convert delimiters in memory
- boolean changed = false;
- InputStream is = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ InputStream is = new BufferedInputStream(file.getContents());
try {
- is = new BufferedInputStream(file.getContents());
- boolean seenCR = false;
- int c;
- while ((c = is.read()) != -1) {
- if (c == LF_BYTE) {
- if (useCRLF) {
- bos.write(CR_BYTE);
- if (! seenCR) changed = true; // added CR
- } else {
- if (seenCR) changed = true; // stripped CR
- }
- bos.write(LF_BYTE);
- seenCR = false;
- } else {
- if (seenCR) {
- bos.write(CR_BYTE); // preserve orphaned carriage returns
- seenCR = false;
- }
- if (c == CR_BYTE) {
- seenCR = true;
- } else {
- bos.write(c);
- }
- }
- }
- if (seenCR) {
- bos.write(CR_BYTE); // preserve orphaned carriage returns
- }
- } finally {
- if (is != null) is.close();
+ is = new CRLFtoLFInputStream(is);
+ if (useCRLF) is = new LFtoCRLFInputStream(is);
+ for (int b; (b = is.read()) != -1;) bos.write(b);
bos.close();
+ } finally {
+ is.close();
}
- if (changed) {
- // write file back to disk with corrected delimiters if changes were made
- ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
- file.setContents(bis, false /*force*/, true /*keepHistory*/, progress);
- }
+ // write file back to disk with corrected delimiters if changes were made
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ file.setContents(bis, false /*force*/, false /*keepHistory*/, progress);
} catch (CoreException e) {
throw CVSException.wrapException(file, Policy.bind("CVSTeamProvider.cleanLineDelimitersException"), e); //$NON-NLS-1$
} catch (IOException e) {
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java
deleted file mode 100644
index c7aa32cc9..000000000
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/FileInputStreamWrapper.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package org.eclipse.team.internal.ccvs.core.client;
-
-/*
- * (c) Copyright IBM Corp. 2000, 2002.
- * All Rights Reserved.
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.eclipse.core.runtime.IProgressMonitor;
-import org.eclipse.team.internal.ccvs.core.CVSException;
-import org.eclipse.team.internal.ccvs.core.Policy;
-
-/**
- * This class can be used to transfer a file from the CVS server to a local IFile
- */
-public class FileInputStreamWrapper {
-
- // default file transfer buffer size (in bytes)
- private static int TRANSFER_BUFFER_SIZE = 8192;
- // update progress bar in increments of this size (in bytes)
- // no incremental progress shown for files smaller than this size
- private static int TRANSFER_PROGRESS_INCREMENT = 32768;
-
- // the platform's line termination sequence
- private static final byte[] PLATFORM_NEWLINE_BYTES =
- System.getProperty("line.separator").getBytes(); //$NON-NLS-1$ // at least one byte long
- // the server's line termination sequence
- private static final int SERVER_NEWLINE_BYTE = 0x0a; // exactly one byte long
- private static final byte[] SERVER_NEWLINE_BYTES = new byte[] { SERVER_NEWLINE_BYTE };
- // true iff newlines must be converted between platform and server formats
- private static boolean DONT_CONVERT_NEWLINES = PLATFORM_NEWLINE_BYTES.length == 1
- && PLATFORM_NEWLINE_BYTES[0] == SERVER_NEWLINE_BYTE;
-
- // VCM 1.0 comitted files using CR/LF as a delimiter
- private static final int CARRIAGE_RETURN_BYTE = 0x0d;
-
- private InputStream input;
- private long fileSize;
- private int totalRead;
- private boolean isBinary;
- private IProgressMonitor monitor;
- private byte[] buffer;
- private int nextProgressThresh;
-
- private static final byte[] BUFFER = new byte[TRANSFER_BUFFER_SIZE / 2];
- private static final byte[] EXPANSION_BUFFER = new byte[TRANSFER_BUFFER_SIZE];
-
- private int position;
- private int bufferLength;
- private String title;
-
- public FileInputStreamWrapper(InputStream input, long fileSize, boolean isBinary, String title, IProgressMonitor monitor) {
- this.input = input;
- this.fileSize = fileSize;
- this.totalRead = 0;
- this.isBinary = isBinary;
- this.monitor = monitor;
- this.buffer = BUFFER;
- this.nextProgressThresh = TRANSFER_PROGRESS_INCREMENT;
- this.title = title;
- }
-
- public class InputStreamFromServer extends InputStream {
- public int read() throws IOException {
- if (position >= bufferLength) {
- if (fill() == -1)
- return -1;
- }
- return buffer[position++];
- }
- public int read(byte[] bytes) throws IOException {
- return read(bytes, 0, bytes.length);
- }
- public int read(byte[] bytes, int offset, int length) throws IOException {
- if (position >= bufferLength) {
- if (fill() == -1)
- return -1;
- }
- length = Math.min(bufferLength - position, length);
- System.arraycopy(buffer, position, bytes, offset, length);
- position += length;
- return length;
- }
- }
-
- /**
- * Return a stream that can be passed to IFile#setContent()
- * After the call to setContent, the receiver's input stream will be at the byte
- * after the received file.
- */
- public InputStream getInputStream() {
- return new InputStreamFromServer();
- }
-
- /*
- * Transfers a file to or from the remove CVS server, possibly expanding line delimiters.
- * <p>
- * Line termination sequences are only converted upon request by specifying an
- * array containing the expected sequence of bytes representing an outbound newline,
- * and a single byte representing an inbound newline. If null is passed for the
- * former, the file is assumed to have binary contents, hence no translation is
- * performed.
- * </p><p>
- * Translation is performed on-the-fly, so the file need not fit in available memory.
- * </p>
- * @param in the input stream
- * @param out the output stream
- * @param size the source file size
- * @param newlineIn the single byte for a received newline, ignored if binary
- * @param newlineOut the sequence of bytes for sent newline, or null if binary
- * @param monitor the progress monitor
- * @param title the name of the file being received (as shown in the monitor)
- */
- private int fill() throws IOException {
-
- // Check if we've read the entire file
- if (totalRead == fileSize) {
- return -1;
- } else if (position < bufferLength) {
- return bufferLength - position;
- }
-
- position = 0;
-
- // If we're not converting, use the big buffer to read
- if (isBinary || DONT_CONVERT_NEWLINES) {
- buffer = EXPANSION_BUFFER;
- } else {
- buffer = BUFFER;
- }
-
- bufferLength = input.read(buffer, 0, (int) Math.min(buffer.length, fileSize - totalRead));
- if (bufferLength == -1) {
- // Unexpected end of stream
- throw new IOException(Policy.bind("Session.readError")); //$NON-NLS-1$
- }
- totalRead += bufferLength;
-
- if (isBinary || DONT_CONVERT_NEWLINES) {
- return bufferLength;
- }
-
- bufferLength = convertNewLines(BUFFER, EXPANSION_BUFFER, bufferLength);
- buffer = EXPANSION_BUFFER;
-
- // update progress monitor
- if (totalRead > nextProgressThresh) {
- monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$
- new Object[] { title, new Long(totalRead / 1024), new Long(fileSize / 1024)}));
- nextProgressThresh = totalRead + TRANSFER_PROGRESS_INCREMENT;
- }
-
- return bufferLength;
- }
-
- /*
- * Copy the bytes from the source to the target, converting any LF to the platform newline byte.
- *
- * There is special handling that will skip incoming CRs that precede LF.
- */
- private int convertNewLines(byte[] source, byte[] target, int length) {
- boolean seenCR = false;
- int targetPosition = 0;
- for (int sourcePosition = 0; sourcePosition < length; ++sourcePosition) {
- final byte b = source[sourcePosition];
- if (b == CARRIAGE_RETURN_BYTE) {
- // We keep track of CRs to perform autocorrection for improperly stored text files
- seenCR = true;
- } else {
- if (b == SERVER_NEWLINE_BYTE) {
- // if fixCRLF we ignore previous CR (if there was one)
- // replace newlineIn with newlineOut
- for (int x = 0; x < PLATFORM_NEWLINE_BYTES.length; ++x) target[targetPosition++] = PLATFORM_NEWLINE_BYTES[x];
- } else {
- if (seenCR) target[targetPosition++] = CARRIAGE_RETURN_BYTE; // preserve stray CR's
- target[targetPosition++] = b;
- }
- seenCR = false;
- }
- }
- if (seenCR) target[targetPosition++] = CARRIAGE_RETURN_BYTE;
-
- return targetPosition;
- }
-}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java
index bc68b52d6..7eaf56722 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/client/Session.java
@@ -6,15 +6,21 @@ package org.eclipse.team.internal.ccvs.core.client;
*/
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.NullProgressMonitor;
+import org.eclipse.team.ccvs.core.CVSProviderPlugin;
import org.eclipse.team.ccvs.core.CVSProviderPlugin;
import org.eclipse.team.ccvs.core.ICVSFile;
import org.eclipse.team.ccvs.core.ICVSFolder;
@@ -24,8 +30,11 @@ import org.eclipse.team.internal.ccvs.core.CVSException;
import org.eclipse.team.internal.ccvs.core.Policy;
import org.eclipse.team.internal.ccvs.core.connection.CVSRepositoryLocation;
import org.eclipse.team.internal.ccvs.core.connection.Connection;
+import org.eclipse.team.internal.ccvs.core.streams.CRLFtoLFInputStream;
+import org.eclipse.team.internal.ccvs.core.streams.LFtoCRLFInputStream;
+import org.eclipse.team.internal.ccvs.core.streams.ProgressMonitorInputStream;
+import org.eclipse.team.internal.ccvs.core.streams.SizeConstrainedInputStream;
import org.eclipse.team.internal.ccvs.core.syncinfo.ResourceSyncInfo;
-import org.eclipse.team.internal.ccvs.core.util.Assert;
import org.eclipse.team.internal.ccvs.core.util.Util;
/**
@@ -57,20 +66,10 @@ public class Session {
private static int TRANSFER_BUFFER_SIZE = 8192;
// update progress bar in increments of this size (in bytes)
// no incremental progress shown for files smaller than this size
- private static long TRANSFER_PROGRESS_INCREMENT = 32768;
-
- // the platform's line termination sequence
- private static final byte[] PLATFORM_NEWLINE_BYTES =
- System.getProperty("line.separator").getBytes(); //$NON-NLS-1$ // at least one byte long
- // the server's line termination sequence
- private static final int SERVER_NEWLINE_BYTE = 0x0a; // exactly one byte long
- private static final byte[] SERVER_NEWLINE_BYTES = new byte[] { SERVER_NEWLINE_BYTE };
- // true iff newlines must be converted between platform and server formats
- private static boolean MUST_CONVERT_NEWLINES = PLATFORM_NEWLINE_BYTES.length != 1
- && PLATFORM_NEWLINE_BYTES[0] != SERVER_NEWLINE_BYTE;
-
- // VCM 1.0 comitted files using CR/LF as a delimiter
- private static final int CARRIAGE_RETURN_BYTE = 0x0d;
+ private static int TRANSFER_PROGRESS_INCREMENT = 32768;
+
+ private static final boolean IS_CRLF_PLATFORM = Arrays.equals(
+ System.getProperty("line.separator").getBytes(), new byte[] { '\r', '\n' }); //$NON-NLS-1$
private CVSRepositoryLocation location;
private ICVSFolder localRoot;
@@ -80,13 +79,11 @@ public class Session {
private Date modTime = null;
private boolean noLocalChanges = false;
private boolean createBackups = true;
+ private int compressionLevel = 0;
private List expansions;
private Collection /* of ICVSFile */ textTransferOverrideSet = null;
private boolean hasBeenConnected = false;
- // a shared buffer used for file transfers
- private byte[] transferBuffer = null;
-
// The resource bundle key that provides the file sending message
private String sendFileTitleKey;
@@ -159,9 +156,23 @@ public class Session {
// ask for the set of valid requests
Request.VALID_REQUESTS.execute(this, Policy.subMonitorFor(monitor, 50));
-
+
// set the root directory on the server for this connection
connection.writeLine("Root " + getRepositoryRoot()); //$NON-NLS-1$
+
+ // enable compression
+ compressionLevel = CVSProviderPlugin.getPlugin().getCompressionLevel();
+ if (compressionLevel != 0 && isValidRequest("gzip-file-contents")) {
+ // Enable the use of CVS 1.8 per-file compression mechanism.
+ // The newer Gzip-stream request seems to be problematic due to Java's
+ // GZIPInputStream tendency to block on read() rather than to return a
+ // partially filled buffer. The latter option would be better since it
+ // can make more effective use of the code dictionary, if it can be made
+ // to work...
+ connection.writeLine("gzip-file-contents " + Integer.toString(compressionLevel));
+ } else {
+ compressionLevel = 0;
+ }
} catch (CVSException e) {
// If there is a failure opening, make sure we're closed
if (connection != null) {
@@ -518,14 +529,6 @@ public class Session {
}
/**
- * Gets the shared file transfer buffer.
- */
- private byte[] getTransferBuffer() {
- if (transferBuffer == null) transferBuffer = new byte[TRANSFER_BUFFER_SIZE];
- return transferBuffer;
- }
-
- /**
* Sends a file to the remote CVS server, possibly translating line delimiters.
* <p>
* Line termination sequences are automatically converted to linefeeds only
@@ -538,87 +541,60 @@ public class Session {
* @param isBinary is true if the file should be sent without translation
* @param monitor the progress monitor
*/
- public void sendFile(ICVSFile file, boolean isBinary, IProgressMonitor monitor)
- throws CVSException {
+ public void sendFile(ICVSFile file, boolean isBinary, IProgressMonitor monitor) throws CVSException {
// check overrides
if (textTransferOverrideSet != null &&
textTransferOverrideSet.contains(file)) isBinary = false;
// update progress monitor
- String title = Policy.bind(getSendFileTitleKey(), new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$
+ final String title = Policy.bind(getSendFileTitleKey(), new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$
monitor.subTask(Policy.bind("Session.transferNoSize", title)); //$NON-NLS-1$
// obtain an input stream for the file and its size
long size = file.getSize();
OutputStream out = connection.getOutputStream();
- InputStream in = file.getContents();
try {
- if (isBinary || PLATFORM_NEWLINE_BYTES.length == 1) {
- writeLine(Long.toString(size));
- if (! isBinary && MUST_CONVERT_NEWLINES) {
- /*** convert newlines on-the-fly ***/
- transferWithProgress(in, out, size,
- PLATFORM_NEWLINE_BYTES[0], SERVER_NEWLINE_BYTES, monitor, title);
- } else {
- /*** perform no conversion ***/
- transferWithProgress(in, out, size, 0, null, monitor, title);
- }
- } else {
- // implies file is text, and we must convert newlines since size of platform newline
- // sequence is not 1, but the server's is
- /*** convert newlines in memory, since file size may change ***/
- Assert.isTrue(size < Integer.MAX_VALUE);
- int fsize = (int) size;
- byte[] fileContents;
- if (fsize <= TRANSFER_BUFFER_SIZE) fileContents = getTransferBuffer();
- else fileContents = new byte[fsize];
- // translate the file from non-LF delimiters in memory and
- // compute its reduced size
- try {
- // read exactly _size_ bytes
- try {
- for (int pos = 0, read; pos < fsize; pos += read) {
- Policy.checkCanceled(monitor);
- read = in.read(fileContents, pos, fsize - pos);
- if (read == -1) {
- // file ended prematurely
- throw new IOException(Policy.bind("Session.readError"));//$NON-NLS-1$
- }
+ InputStream in = file.getContents();
+ try {
+ byte[] buffer = new byte[TRANSFER_BUFFER_SIZE];
+ if (! isBinary && IS_CRLF_PLATFORM || compressionLevel != 0) {
+ // this affects the file size, spool the converted copy to an in-memory buffer
+ if (! isBinary && IS_CRLF_PLATFORM) in = new CRLFtoLFInputStream(in);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ OutputStream zout;
+ if (compressionLevel != 0) {
+ try {
+ zout = new GZIPOutputStream(bout); // apparently does not support specifying compression level
+ } catch (IOException e) {
+ throw CVSException.wrapException(e);
}
- } finally {
- in.close(); // remember to close the source file
- in = null;
- }
- } catch (IOException e) {
- throw CVSException.wrapException(e);
- }
- // convert platform line termination sequences
- // conservative since it leaves any partial sequences alone (like stray CR's)
- // assumes no prefix of a sequence
- int cur = 0, match = 0;
- for (int pos = 0; pos < fsize; ++pos) {
- byte b = fileContents[pos];
- if (PLATFORM_NEWLINE_BYTES[match] == b) {
- if (match == PLATFORM_NEWLINE_BYTES.length - 1) {
- b = SERVER_NEWLINE_BYTE;
- cur -= match;
- match = 0;
- } else match += 1;
} else {
- match = 0;
+ zout = bout;
}
- fileContents[cur++] = b;
+ for (int count; (count = in.read(buffer)) != -1;) zout.write(buffer, 0, count);
+ zout.close();
+ byte[] contents = bout.toByteArray();
+ in.close();
+ in = new ByteArrayInputStream(contents);
+ size = contents.length;
}
- // send file
- writeLine(Integer.toString(cur));
- in = new ByteArrayInputStream(fileContents, 0, cur);
- transferWithProgress(in, out, cur, 0, null, monitor, title);
- }
- } finally {
- try {
- if (in != null) in.close();
- } catch (IOException e) {
- throw CVSException.wrapException(e);
+ // setup progress monitoring
+ in = new ProgressMonitorInputStream(in, size, TRANSFER_PROGRESS_INCREMENT, monitor) {
+ protected void updateMonitor(long bytesRead, long bytesTotal, IProgressMonitor monitor) {
+ if (bytesRead == 0) return;
+ monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$
+ new Object[] { title, Long.toString(bytesRead >> 10), Long.toString(bytesTotal >> 10) }));
+ }
+ };
+ // send the file
+ String sizeLine = Long.toString(size);
+ if (compressionLevel != 0) sizeLine = "z" + sizeLine;
+ writeLine(sizeLine);
+ for (int count; (count = in.read(buffer)) != -1;) out.write(buffer, 0, count);
+ } finally {
+ in.close();
}
+ } catch (IOException e) {
+ throw CVSException.wrapException(e);
}
}
@@ -644,108 +620,44 @@ public class Session {
textTransferOverrideSet.contains(file)) isBinary = false;
// update progress monitor
- String title = Policy.bind("Session.receiving", new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$
+ final String title = Policy.bind("Session.receiving", new Object[]{ Util.toTruncatedPath(file, localRoot, 3) }); //$NON-NLS-1$
monitor.subTask(Policy.bind("Session.transferNoSize", title)); //$NON-NLS-1$
// get the file size from the server
long size;
+ boolean compressed = false;
try {
- size = Long.parseLong(readLine(), 10);
+ String sizeLine = readLine();
+ if (sizeLine.charAt(0) == 'z') {
+ compressed = true;
+ sizeLine = sizeLine.substring(1);
+ }
+ size = Long.parseLong(sizeLine, 10);
} catch (NumberFormatException e) {
throw new CVSException(Policy.bind("Session.badInt"), e); //$NON-NLS-1$
}
- // Set the contents of the file using the stream wrapper
- FileInputStreamWrapper wrapper = new FileInputStreamWrapper(connection.getInputStream(), size, isBinary, title, monitor);
- file.setContents(wrapper.getInputStream(), responseType, true, monitor);
- }
-
- /**
- * Transfers a file to or from the remove CVS server, possibly expanding line delimiters.
- * <p>
- * Line termination sequences are only converted upon request by specifying an
- * array containing the expected sequence of bytes representing an outbound newline,
- * and a single byte representing an inbound newline. If null is passed for the
- * former, the file is assumed to have binary contents, hence no translation is
- * performed.
- * </p><p>
- * Translation is performed on-the-fly, so the file need not fit in available memory.
- * </p>
- * @param in the input stream
- * @param out the output stream
- * @param size the source file size
- * @param newlineIn the single byte for a received newline, ignored if binary
- * @param newlineOut the sequence of bytes for sent newline, or null if binary
- * @param monitor the progress monitor
- * @param title the name of the file being received (as shown in the monitor)
- */
- private void transferWithProgress(InputStream in, OutputStream out,
- long size, int newlineIn, byte[] newlineOut, IProgressMonitor monitor, String title)
- throws CVSException {
- long nextProgressThresh = TRANSFER_PROGRESS_INCREMENT;
- Long ksize = new Long(size / 1024);
- try {
- byte[] buffer = getTransferBuffer();
- final int wfirst, wlast;
- if (newlineOut != null) {
- wfirst = buffer.length / 2;
- wlast = buffer.length - newlineOut.length - 1; // reserve space for newline & stray CR
- } else {
- wfirst = buffer.length;
- wlast = wfirst;
+ // create an input stream that spans the next 'size' bytes from the connection
+ InputStream in = new SizeConstrainedInputStream(connection.getInputStream(), size, true /*discardOnClose*/);
+ // setup progress monitoring
+ in = new ProgressMonitorInputStream(in, size, TRANSFER_PROGRESS_INCREMENT, monitor) {
+ protected void updateMonitor(long bytesRead, long bytesTotal, IProgressMonitor monitor) {
+ if (bytesRead == 0) return;
+ monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$
+ new Object[] { title, Long.toString(bytesRead >> 10), Long.toString(bytesTotal >> 10) }));
}
- int wpos = wfirst;
- // read exactly _size_ bytes
- boolean fixCRLF = (newlineIn == SERVER_NEWLINE_BYTE);
- boolean seenCR = false; // only true if fixCRLF and last byte was a CR
- for (long totalRead = 0; totalRead < size;) {
- Policy.checkCanceled(monitor);
- int read = in.read(buffer, 0, (int) Math.min(wfirst, size - totalRead));
- if (read == -1) {
- // file ended prematurely
- throw new IOException(Policy.bind("Session.readError")); //$NON-NLS-1$
- }
- totalRead += read;
- if (newlineOut == null) {
- // dump binary data
- out.write(buffer, 0, read);
- } else {
- // filter newline sequences in memory from first half of buffer into second half
- // then dump to output stream
- for (int p = 0; p < read; ++p) {
- final byte b = buffer[p];
- if (b == CARRIAGE_RETURN_BYTE && fixCRLF) {
- seenCR = true;
- } else {
- if (b == newlineIn) {
- // if fixCRLF we ignore previous CR (if there was one)
- // replace newlineIn with newlineOut
- for (int x = 0; x < newlineOut.length; ++x) buffer[wpos++] = newlineOut[x];
- } else {
- if (seenCR) buffer[wpos++] = CARRIAGE_RETURN_BYTE; // preserve stray CR's
- buffer[wpos++] = b;
- }
- seenCR = false;
- }
- if (wpos >= wlast) {
- // flush output buffer
- out.write(buffer, wfirst, wpos - wfirst);
- wpos = wfirst;
- }
- }
- }
- // update progress monitor
- if (totalRead > nextProgressThresh) {
- monitor.subTask(Policy.bind("Session.transfer", //$NON-NLS-1$
- new Object[] { title, new Long(totalRead / 1024), ksize}));
- nextProgressThresh = totalRead + TRANSFER_PROGRESS_INCREMENT;
- }
+ };
+ // if compression enabled, decompress on the fly
+ if (compressed) {
+ try {
+ in = new GZIPInputStream(in);
+ } catch (IOException e) {
+ throw CVSException.wrapException(e);
}
- // flush pending buffered output
- if (seenCR) buffer[wpos++] = CARRIAGE_RETURN_BYTE; // preserve stray CR's
- if (wpos != wfirst) out.write(buffer, wfirst, wpos - wfirst);
- } catch (IOException e) {
- throw CVSException.wrapException(e);
}
- }
+ // if not binary, translate line delimiters on the fly
+ if (! isBinary && IS_CRLF_PLATFORM) in = new LFtoCRLFInputStream(in);
+ // write the file locally
+ file.setContents(in, responseType, true, new NullProgressMonitor());
+ }
/**
* Stores the value of the last Mod-time response encountered.
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java
index d97ea7540..351edce30 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/Connection.java
@@ -5,7 +5,6 @@ package org.eclipse.team.internal.ccvs.core.connection;
* All Rights Reserved.
*/
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -40,7 +39,7 @@ public class Connection {
private ICVSRepositoryLocation fCVSRoot;
private String fCVSRootDirectory;
private boolean fIsEstablished;
- private BufferedInputStream fResponseStream;
+ private InputStream fResponseStream;
private byte[] readLineBuffer = new byte[256];
public Connection(ICVSRepositoryLocation cvsroot, IServerConnection serverConnection) {
@@ -104,7 +103,7 @@ public class Connection {
if (!isEstablished())
return null;
if (fResponseStream == null)
- fResponseStream = new BufferedInputStream(serverConnection.getInputStream());
+ fResponseStream = serverConnection.getInputStream();
return fResponseStream;
}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java
index b3de2471a..e0a506880 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/ExtConnection.java
@@ -5,6 +5,7 @@ package org.eclipse.team.internal.ccvs.core.connection;
* All Rights Reserved.
*/
+import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -14,6 +15,7 @@ import org.eclipse.team.ccvs.core.CVSProviderPlugin;
import org.eclipse.team.ccvs.core.ICVSRepositoryLocation;
import org.eclipse.team.ccvs.core.IServerConnection;
import org.eclipse.team.internal.ccvs.core.Policy;
+import org.eclipse.team.internal.ccvs.core.streams.*;
/**
* Implements a connection method which invokes an external tool to
@@ -50,9 +52,17 @@ public class ExtConnection implements IServerConnection {
* Closes the connection.
*/
public void close() throws IOException {
- inputStream.close();
- outputStream.close();
- process.destroy();
+ try {
+ if (inputStream != null) inputStream.close();
+ } finally {
+ inputStream = null;
+ try {
+ if (outputStream != null) outputStream.close();
+ } finally {
+ outputStream = null;
+ process.destroy();
+ }
+ }
}
/**
@@ -87,30 +97,52 @@ public class ExtConnection implements IServerConnection {
int port = location.getPort();
if (port == location.USE_DEFAULT_PORT)
port = DEFAULT_PORT;
- try {
- // The command line doesn't support the use of a port
- if (port != DEFAULT_PORT)
- throw new IOException(Policy.bind("EXTServerConnection.invalidPort")); //$NON-NLS-1$
-
- if(CVS_RSH == null || CVS_SERVER == null) {
- throw new IOException(Policy.bind("EXTServerConnection.varsNotSet")); //$NON-NLS-1$
- }
+
+ // The command line doesn't support the use of a port
+ if (port != DEFAULT_PORT)
+ throw new IOException(Policy.bind("EXTServerConnection.invalidPort")); //$NON-NLS-1$
- process = Runtime.getRuntime().exec(command);
+ if(CVS_RSH == null || CVS_SERVER == null) {
+ throw new IOException(Policy.bind("EXTServerConnection.varsNotSet")); //$NON-NLS-1$
+ }
- inputStream = process.getInputStream();
- outputStream = process.getOutputStream();
- } catch (IOException ex) {
- try {
- close();
- } finally {
- throw new IOException(Policy.bind("EXTServerConnection.ioError", CVS_RSH)); //$NON-NLS-1$
+ boolean connected = false;
+ try {
+ process = Runtime.getRuntime().exec(command);
+
+ inputStream = new PollingInputStream(new TimeoutInputStream(process.getInputStream(),
+ 8192 /*bufferSize*/, 1000 /*readTimeout*/, -1 /*closeTimeout*/), location.getTimeout(), monitor);
+ outputStream = new PollingOutputStream(new TimeoutOutputStream(process.getOutputStream(),
+ 8192 /*buffersize*/, 1000 /*writeTimeout*/, 1000 /*closeTimeout*/), location.getTimeout(), monitor);
+
+ // XXX need to do something more useful with stderr
+ // discard the input to prevent the process from hanging due to a full pipe
+ Thread thread = new DiscardInputThread(process.getErrorStream());
+ connected = true;
+ } finally {
+ if (! connected) {
+ try {
+ close();
+ } finally {
+ throw new IOException(Policy.bind("EXTServerConnection.ioError", CVS_RSH)); //$NON-NLS-1$
+ }
}
- } catch (RuntimeException e) {
+ }
+ }
+
+ private static class DiscardInputThread extends Thread {
+ private InputStream in;
+ public DiscardInputThread(InputStream in) {
+ this.in = in;
+ }
+ public void run() {
try {
- close();
- } finally {
- throw new IOException(Policy.bind("EXTServerConnection.ioError", CVS_RSH)); //$NON-NLS-1$
+ try {
+ while (in.read() != -1);
+ } finally {
+ in.close();
+ }
+ } catch (IOException e) {
}
}
}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java
index c6c8461b2..bbd4e51fa 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/connection/PServerConnection.java
@@ -13,10 +13,11 @@ import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
-import org.eclipse.team.internal.ccvs.core.Policy;
import org.eclipse.core.runtime.IProgressMonitor;
-import org.eclipse.team.ccvs.core.*;
-import org.eclipse.team.ccvs.core.*;
+import org.eclipse.team.ccvs.core.ICVSRepositoryLocation;
+import org.eclipse.team.ccvs.core.IServerConnection;
+import org.eclipse.team.internal.ccvs.core.Policy;
+import org.eclipse.team.internal.ccvs.core.streams.*;
/**
* A connection used to talk to an cvs pserver.
@@ -73,8 +74,21 @@ public class PServerConnection implements IServerConnection {
* @see Connection#doClose()
*/
public void close() throws IOException {
- fSocket.close();
- fSocket= null;
+ try {
+ if (inputStream != null) inputStream.close();
+ } finally {
+ inputStream = null;
+ try {
+ if (outputStream != null) outputStream.close();
+ } finally {
+ outputStream = null;
+ try {
+ if (fSocket != null) fSocket.close();
+ } finally {
+ fSocket = null;
+ }
+ }
+ }
}
/**
@@ -91,16 +105,17 @@ public class PServerConnection implements IServerConnection {
monitor.worked(1);
fSocket = createSocket();
+ boolean connected = false;
try {
- this.inputStream = new BufferedInputStream(fSocket.getInputStream());
- this.outputStream = new BufferedOutputStream(fSocket.getOutputStream());
+ this.inputStream = new BufferedInputStream(new PollingInputStream(fSocket.getInputStream(),
+ cvsroot.getTimeout(), monitor));
+ this.outputStream = new PollingOutputStream(new TimeoutOutputStream(
+ fSocket.getOutputStream(), 8192 /*bufferSize*/, 1000 /*writeTimeout*/, 1000 /*closeTimeout*/),
+ cvsroot.getTimeout(), monitor);
authenticate();
- } catch (IOException e) {
- cleanUpAfterFailedConnection();
- throw e;
- } catch (CVSAuthenticationException e) {
- cleanUpAfterFailedConnection();
- throw e;
+ connected = true;
+ } finally {
+ if (! connected) cleanUpAfterFailedConnection();
}
}
@@ -212,7 +227,7 @@ public class PServerConnection implements IServerConnection {
// If we get this exception, chances are the host is not responding
throw new InterruptedIOException(Policy.bind("PServerConnection.socket", new Object[] {cvsroot.getHost()}));//$NON-NLS-1$
}
- result.setSoTimeout(cvsroot.getTimeout() * 1000);
+ result.setSoTimeout(1000); // 1 second between timeouts
return result;
}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties
index c8e5a130e..a1cece4e6 100644
--- a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/messages.properties
@@ -242,3 +242,5 @@ EclipseSynchronizer.ErrorSettingIgnorePattern=Cannot set ignored pattern on {0}
EclipseSynchronizer.ErrorDeletingFolderSync=Could not delete all CVS folders
SyncFileChangeListener.errorSettingTeamPrivateFlag=Error setting team-private flag on resource
+PollingInputStream.readTimeout=Timeout during read operation
+PollingOutputStream.writeTimeout=Timeout during write operation
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java
new file mode 100644
index 000000000..ecf856bb5
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/CRLFtoLFInputStream.java
@@ -0,0 +1,155 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial API and implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+/**
+ * Converts CR/LFs in the underlying input stream to LF.
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * if the underlying stream does. Check the bytesTransferred field to determine how
+ * much of the operation completed; conversely, at what point to resume.
+ */
+public class CRLFtoLFInputStream extends FilterInputStream {
+ private boolean pendingByte = false;
+ private int lastByte = -1;
+
+ /**
+ * Creates a new filtered input stream.
+ * @param in the underlying input stream
+ */
+ public CRLFtoLFInputStream(InputStream in) {
+ super(in);
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Translates CR/LF sequences to LFs transparently.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read() throws IOException {
+ if (! pendingByte) {
+ lastByte = in.read(); // ok if this throws
+ pendingByte = true; // remember the byte in case we throw an exception later on
+ }
+ if (lastByte == '\r') {
+ lastByte = in.read(); // ok if this throws
+ if (lastByte != '\n') {
+ if (lastByte == -1) pendingByte = false;
+ return '\r'; // leaves the byte pending for later
+ }
+ }
+ pendingByte = false;
+ return lastByte;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Translates CR/LF sequences to LFs transparently.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ // handle boundary cases cleanly
+ if (len == 0) {
+ return 0;
+ } else if (len == 1) {
+ int b = read();
+ if (b == -1) return -1;
+ buffer[off] = (byte) b;
+ return 1;
+ }
+ // read some bytes from the stream
+ // prefix with pending byte from last read if any
+ int count = 0;
+ if (pendingByte) {
+ buffer[off] = (byte) lastByte;
+ pendingByte = false;
+ count = 1;
+ }
+ InterruptedIOException iioe = null;
+ try {
+ len = in.read(buffer, off + count, len - count);
+ if (len == -1) {
+ return (count == 0) ? -1 : count;
+ }
+ } catch (InterruptedIOException e) {
+ len = e.bytesTransferred;
+ iioe = e;
+ }
+ count += len;
+ // strip out CR's in CR/LF pairs
+ // pendingByte will be true iff previous byte was a CR
+ int j = 0;
+ for (int i = 0; i < count; ++i) { // invariant: j <= i
+ lastByte = buffer[i];
+ if (lastByte == '\r') {
+ if (pendingByte) {
+ buffer[j++] = '\r'; // write out orphan CR
+ } else {
+ pendingByte = true;
+ }
+ } else {
+ if (pendingByte) {
+ if (lastByte != '\n') buffer[j++] = '\r'; // if LF, don't write the CR
+ pendingByte = false;
+ }
+ buffer[j++] = (byte) lastByte;
+ }
+ }
+ if (iioe != null) {
+ iioe.bytesTransferred = j;
+ throw iioe;
+ }
+ return j;
+ }
+
+ /**
+ * Calls read() to skip the specified number of bytes
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public long skip(long count) throws IOException {
+ int actualCount = 0; // assumes count < Integer.MAX_INT
+ try {
+ while (count-- > 0 && read() != -1) actualCount++; // skip the specified number of bytes
+ return actualCount;
+ } catch (InterruptedIOException e) {
+ e.bytesTransferred = actualCount;
+ throw e;
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Returns the number of bytes that can be read without blocking; accounts for
+ * possible translation of CR/LF sequences to LFs in these bytes.
+ * @throws IOException if an i/o error occurs
+ */
+ public int available() throws IOException {
+ return in.available() / 2; // we can guarantee at least this amount after contraction
+ }
+
+ /**
+ * Mark is not supported by the wrapper even if the underlying stream does, returns false.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java
new file mode 100644
index 000000000..6c4d20e4a
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/LFtoCRLFInputStream.java
@@ -0,0 +1,146 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial API and implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+/**
+ * Converts LFs in the underlying input stream to CR/LF.
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * if the underlying stream does. Check the bytesTransferred field to determine how
+ * much of the operation completed; conversely, at what point to resume.
+ */
+public class LFtoCRLFInputStream extends FilterInputStream {
+ private boolean mustReturnLF = false;
+
+ /**
+ * Creates a new filtered input stream.
+ * @param in the underlying input stream
+ */
+ public LFtoCRLFInputStream(InputStream in) {
+ super(in);
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Translates LFs to CR/LF sequences transparently.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read() throws IOException {
+ if (mustReturnLF) {
+ mustReturnLF = false;
+ return '\n';
+ }
+ int b = in.read(); // ok if this throws
+ if (b == '\n') {
+ mustReturnLF = true;
+ b = '\r';
+ }
+ return b;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Translates LFs to CR/LF sequences transparently.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ // handle boundary cases cleanly
+ if (len == 0) {
+ return 0;
+ } else if (len == 1) {
+ int b = read();
+ if (b == -1) return -1;
+ buffer[off] = (byte) b;
+ return 1;
+ }
+ // prefix with remembered \n from last read, but don't expand it a second time
+ int count = 0;
+ if (mustReturnLF) {
+ mustReturnLF = false;
+ buffer[off++] = '\n';
+ --len;
+ count = 1;
+ if (len < 2) return count; // is there still enough room to expand more?
+ }
+ // read some bytes from the stream into the back half of the buffer
+ // this guarantees that there is always room to expand
+ len /= 2;
+ int j = off + len;
+ InterruptedIOException iioe = null;
+ try {
+ len = in.read(buffer, j, len);
+ if (len == -1) {
+ return (count == 0) ? -1 : count;
+ }
+ } catch (InterruptedIOException e) {
+ len = e.bytesTransferred;
+ iioe = e;
+ }
+ count += len;
+ // copy bytes from the middle to the front of the array, expanding LF->CR/LF
+ while (len-- > 0) {
+ byte b = buffer[j++];
+ if (b == '\n') {
+ buffer[off++] = '\r';
+ count++;
+ }
+ buffer[off++] = b;
+ }
+ if (iioe != null) {
+ iioe.bytesTransferred = count;
+ throw iioe;
+ }
+ return count;
+ }
+
+ /**
+ * Calls read() to skip the specified number of bytes
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public long skip(long count) throws IOException {
+ int actualCount = 0; // assumes count < Integer.MAX_INT
+ try {
+ while (count-- > 0 && read() != -1) actualCount++; // skip the specified number of bytes
+ return actualCount;
+ } catch (InterruptedIOException e) {
+ e.bytesTransferred = actualCount;
+ throw e;
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Returns the number of bytes that can be read without blocking; accounts for
+ * possible translation of LFs to CR/LF sequences in these bytes.
+ * @throws IOException if an i/o error occurs
+ */
+ public int available() throws IOException {
+ return in.available(); // we can guarantee at least this amount after expansion
+ }
+
+ /**
+ * Mark is not supported by the wrapper even if the underlying stream does, returns false.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java
new file mode 100644
index 000000000..931b470c3
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingInputStream.java
@@ -0,0 +1,113 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial API and implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.OperationCanceledException;
+import org.eclipse.team.internal.ccvs.core.Policy;
+
+/**
+ * Polls a progress monitor periodically and handles timeouts over extended durations.
+ * For this class to be effective, a high numAttempts should be specified, and the
+ * underlying stream should time out frequently on reads (every second or so).
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * if the underlying stream does. Check the bytesTransferred field to determine how
+ * much of the operation completed; conversely, at what point to resume.
+ */
+public class PollingInputStream extends FilterInputStream {
+ private static final boolean DEBUG = false;
+ private int numAttempts;
+ private IProgressMonitor monitor;
+
+ /**
+ * Creates a new polling input stream.
+ * @param in the underlying input stream
+ * @param numAttempts the number of attempts before issuing an InterruptedIOException,
+ * if 0, retries indefinitely until canceled
+ * @param monitor the progress monitor to be polled for cancellation
+ */
+ public PollingInputStream(InputStream in, int numAttempts, IProgressMonitor monitor) {
+ super(in);
+ this.numAttempts = numAttempts;
+ this.monitor = monitor;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times
+ * and no data was received, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read() throws IOException {
+ int attempts = 0;
+ for (;;) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ try {
+ return in.read();
+ } catch (InterruptedIOException e) {
+ if (++attempts == numAttempts)
+ throw new InterruptedIOException(Policy.bind("PollingInputStream.readTimeout"));
+ if (DEBUG) System.out.println("read retry=" + attempts);
+ }
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times
+ * and no data was received, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ int attempts = 0;
+ for (;;) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ try {
+ return in.read(buffer, off, len);
+ } catch (InterruptedIOException e) {
+ if (e.bytesTransferred != 0) return e.bytesTransferred; // keep partial transfer
+ if (++attempts == numAttempts)
+ throw new InterruptedIOException(Policy.bind("PollingInputStream.readTimeout"));
+ if (DEBUG) System.out.println("read retry=" + attempts);
+ }
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times
+ * and no data was received, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public long skip(long count) throws IOException {
+ int attempts = 0;
+ for (;;) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ try {
+ return in.skip(count);
+ } catch (InterruptedIOException e) {
+ if (e.bytesTransferred != 0) return e.bytesTransferred; // keep partial transfer
+ if (++attempts == numAttempts)
+ throw new InterruptedIOException(Policy.bind("PollingInputStream.readTimeout"));
+ if (DEBUG) System.out.println("read retry=" + attempts);
+ }
+ }
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java
new file mode 100644
index 000000000..c3597893b
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/PollingOutputStream.java
@@ -0,0 +1,150 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial API and implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.OperationCanceledException;
+import org.eclipse.team.internal.ccvs.core.Policy;
+
+/**
+ * Polls a progress monitor periodically and handles timeouts over extended durations.
+ * For this class to be effective, a high numAttempts should be specified, and the
+ * underlying stream should time out frequently on writes (every second or so).
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * if the underlying stream does. Check the bytesTransferred field to determine how
+ * much of the operation completed; conversely, at what point to resume.
+ */
+public class PollingOutputStream extends FilterOutputStream {
+ private static final boolean DEBUG = false;
+ private int numAttempts;
+ private IProgressMonitor monitor;
+
+ /**
+ * Creates a new polling output stream.
+ * @param in the underlying output stream
+ * @param numAttempts the number of attempts before issuing an InterruptedIOException,
+ * if 0, retries indefinitely until canceled
+ * @param monitor the progress monitor to be polled for cancellation
+ */
+ public PollingOutputStream(OutputStream out, int numAttempts, IProgressMonitor monitor) {
+ super(out);
+ this.numAttempts = numAttempts;
+ this.monitor = monitor;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times
+ * and no data was sent, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public void write(int b) throws IOException {
+ int attempts = 0;
+ for (;;) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ try {
+ out.write(b);
+ return;
+ } catch (InterruptedIOException e) {
+ if (++attempts == numAttempts)
+ throw new InterruptedIOException(Policy.bind("PollingOutputStream.writeTimeout"));
+ if (DEBUG) System.out.println("write retry=" + attempts);
+ }
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times,
+ * bytesTransferred will reflect the number of bytes sent
+ * @throws IOException if an i/o error occurs
+ */
+ public void write(byte[] buffer, int off, int len) throws IOException {
+ int count = 0;
+ int attempts = 0;
+ for (;;) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ try {
+ out.write(buffer, off, len);
+ return;
+ } catch (InterruptedIOException e) {
+ int amount = e.bytesTransferred;
+ if (amount != 0) { // keep partial transfer
+ len -= amount;
+ if (len <= 0) return;
+ off += amount;
+ count += amount;
+ attempts = 0; // made some progress, don't time out quite yet
+ }
+ if (++attempts == numAttempts) {
+ e = new InterruptedIOException(Policy.bind("PollingOutputStream.writeTimeout"));
+ e.bytesTransferred = count;
+ throw e;
+ }
+ if (DEBUG) System.out.println("write retry=" + attempts);
+ }
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times,
+ * bytesTransferred will reflect the number of bytes sent
+ * @throws IOException if an i/o error occurs
+ */
+ public void flush() throws IOException {
+ int count = 0;
+ int attempts = 0;
+ for (;;) {
+ if (monitor.isCanceled()) throw new OperationCanceledException();
+ try {
+ out.flush();
+ return;
+ } catch (InterruptedIOException e) {
+ int amount = e.bytesTransferred;
+ if (amount != 0) { // keep partial transfer
+ count += amount;
+ attempts = 0; // made some progress, don't time out quite yet
+ }
+ if (++attempts == numAttempts) {
+ e = new InterruptedIOException(Policy.bind("PollingOutputStream.writeTimeout"));
+ e.bytesTransferred = count;
+ throw e;
+ }
+ if (DEBUG) System.out.println("write retry=" + attempts);
+ }
+ }
+ }
+
+ /**
+ * Calls flush() then close() on the underlying stream.
+ * @throws OperationCanceledException if the progress monitor is canceled
+ * @throws InterruptedIOException if the underlying operation times out numAttempts times,
+ * bytesTransferred will reflect the number of bytes sent during the flush()
+ * @throws IOException if an i/o error occurs
+ */
+ public void close() throws IOException {
+ try {
+ flush();
+ } finally {
+ out.close();
+ }
+ }
+} \ No newline at end of file
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java
new file mode 100644
index 000000000..9ee8e93cb
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/ProgressMonitorInputStream.java
@@ -0,0 +1,140 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.OperationCanceledException;
+
+/**
+ * Updates a progress monitor as bytes are read from the input stream.
+ * Also starts a background thread to provide responsive cancellation on read().
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * if the underlying stream does. Check the bytesTransferred field to determine how
+ * much of the operation completed; conversely, at what point to resume.
+ */
+public abstract class ProgressMonitorInputStream extends FilterInputStream {
+ private IProgressMonitor monitor;
+ private int updateIncrement;
+ private long bytesTotal;
+ private long bytesRead = 0;
+ private long lastUpdate = -1;
+ private long nextUpdate = 0;
+
+ /**
+ * Creates a progress monitoring input stream.
+ * @param in the underlying input stream
+ * @param bytesTotal the number of bytes to read in total (passed to updateMonitor())
+ * @param updateIncrement the number of bytes read between updates
+ * @param monitor the progress monitor
+ */
+ public ProgressMonitorInputStream(InputStream in, long bytesTotal, int updateIncrement, IProgressMonitor monitor) {
+ super(in);
+ this.bytesTotal = bytesTotal;
+ this.updateIncrement = updateIncrement;
+ this.monitor = monitor;
+ update(true);
+ }
+
+ protected abstract void updateMonitor(long bytesRead, long size, IProgressMonitor monitor);
+
+ /**
+ * Wraps the underlying stream's method.
+ * Updates the progress monitor to the final number of bytes read.
+ * @throws IOException if an i/o error occurs
+ */
+ public void close() throws IOException {
+ try {
+ in.close();
+ } finally {
+ update(true);
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Updates the progress monitor if the next update increment has been reached.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read() throws IOException {
+ int b = in.read();
+ if (b != -1) {
+ bytesRead += 1;
+ update(false);
+ }
+ return b;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Updates the progress monitor if the next update increment has been reached.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ try {
+ int count = in.read(buffer, offset, length);
+ if (count != -1) {
+ bytesRead += count;
+ update(false);
+ }
+ return count;
+ } catch (InterruptedIOException e) {
+ bytesRead += e.bytesTransferred;
+ update(false);
+ throw e;
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Updates the progress monitor if the next update increment has been reached.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public long skip(long amount) throws IOException {
+ try {
+ long count = in.skip(amount);
+ bytesRead += count;
+ update(false);
+ return count;
+ } catch (InterruptedIOException e) {
+ bytesRead += e.bytesTransferred;
+ update(false);
+ throw e;
+ }
+ }
+
+ /**
+ * Mark is not supported by the wrapper even if the underlying stream does, returns false.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+
+ private void update(boolean now) {
+ if (bytesRead >= nextUpdate || now) {
+ nextUpdate = bytesRead - (bytesRead % updateIncrement);
+ if (nextUpdate != lastUpdate) updateMonitor(nextUpdate, bytesTotal, monitor);
+ lastUpdate = nextUpdate;
+ nextUpdate += updateIncrement;
+ }
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java
new file mode 100644
index 000000000..17498cc42
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/SizeConstrainedInputStream.java
@@ -0,0 +1,132 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+/**
+ * Simulates a stream that represents only a portion of the underlying stream.
+ * Will report EOF when this portion has been fully read and prevent further reads.
+ * The underlying stream is not closed on close(), but the remaining unread input
+ * may optionally be skip()'d.
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * if the underlying stream does. Check the bytesTransferred field to determine how
+ * much of the operation completed; conversely, at what point to resume.
+ */
+public class SizeConstrainedInputStream extends FilterInputStream {
+ private boolean discardOnClose;
+ private long bytesRemaining;
+
+ /**
+ * Creates a size contrained input stream.
+ * @param in the underlying input stream, never actually closed by this filter
+ * @param size the maximum number of bytes of the underlying input stream that
+ * can be read through this filter
+ * @param discardOnClose if true, discards remaining unread bytes on close()
+ */
+ public SizeConstrainedInputStream(InputStream in, long size, boolean discardOnClose) {
+ super(in);
+ this.bytesRemaining = size;
+ this.discardOnClose = discardOnClose;
+ }
+
+ /**
+ * Prevents further reading from the stream but does not close the underlying stream.
+ * If discardOnClose, skip()'s over any remaining unread bytes in the constrained region.
+ * @throws IOException if an i/o error occurs
+ */
+ public void close() throws IOException {
+ try {
+ if (discardOnClose) {
+ while (bytesRemaining != 0 && skip(bytesRemaining) != 0);
+ }
+ } finally {
+ bytesRemaining = 0;
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Simulates an end-of-file condition if the end of the constrained region has been reached.
+ * @throws IOException if an i/o error occurs
+ */
+ public int available() throws IOException {
+ int amount = in.available();
+ if (amount > bytesRemaining) amount = (int) bytesRemaining;
+ return amount;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Simulates an end-of-file condition if the end of the constrained region has been reached.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read() throws IOException {
+ if (bytesRemaining == 0) return -1;
+ int b = in.read();
+ if (b != -1) bytesRemaining -= 1;
+ return b;
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Simulates an end-of-file condition if the end of the constrained region has been reached.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (length >= bytesRemaining) {
+ length = (int) bytesRemaining;
+ if (length == 0) return -1;
+ }
+ try {
+ int count = in.read(buffer, offset, length);
+ if (count != -1) bytesRemaining -= count;
+ return count;
+ } catch (InterruptedIOException e) {
+ bytesRemaining -= e.bytesTransferred;
+ throw e;
+ }
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * Simulates an end-of-file condition if the end of the constrained region has been reached.
+ * @throws InterruptedIOException if the operation was interrupted before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public long skip(long amount) throws IOException {
+ if (amount > bytesRemaining) amount = bytesRemaining;
+ try {
+ long count = in.skip(amount);
+ bytesRemaining -= count;
+ return count;
+ } catch (InterruptedIOException e) {
+ bytesRemaining -= e.bytesTransferred;
+ throw e;
+ }
+ }
+
+ /**
+ * Mark is not supported by the wrapper even if the underlying stream does, returns false.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java
new file mode 100644
index 000000000..72c8eb495
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutInputStream.java
@@ -0,0 +1,256 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial API and implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+/**
+ * Wraps an input stream that blocks indefinitely to simulate timeouts on read(),
+ * skip(), and close(). The resulting input stream is buffered and supports
+ * retrying operations that failed due to an InterruptedIOException.
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * REGARDLESS of whether the underlying stream does unless the underlying stream itself
+ * generates InterruptedIOExceptions in which case it must also support resuming.
+ * Check the bytesTransferred field to determine how much of the operation completed;
+ * conversely, at what point to resume.
+ */
+public class TimeoutInputStream extends FilterInputStream {
+ private byte[] iobuffer; // circular buffer
+ private int head = 0; // points to first unread byte
+ private int length = 0; // number of remaining unread bytes, -1 if closed
+
+ private long readTimeout;
+ private long closeTimeout;
+ private Thread thread = new Thread(new FillBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$
+ private IOException ioe = null;
+ private RuntimeException re = null;
+
+ /**
+ * Creates a timeout wrapper for an input stream.
+ * @param in the underlying input stream
+ * @param bufferSize the buffer size in bytes; should be large enough to mitigate
+ * Thread synchronization and context switching overhead
+ * @param readTimeout the number of milliseconds to block for a read() or skip() before
+ * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ * @param closeTimeout the number of milliseconds to block for a close() before throwing
+ * an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ */
+ public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) {
+ super(in);
+ this.iobuffer = new byte[bufferSize];
+ this.readTimeout = readTimeout;
+ this.closeTimeout = closeTimeout;
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws IOException if an i/o error occurs
+ */
+ public void close() throws IOException {
+ if (thread == null) return;
+ Thread oldThread = thread;
+ thread = null;
+ oldThread.interrupt();
+ if (closeTimeout != -1) {
+ try {
+ oldThread.join(closeTimeout);
+ } catch (InterruptedException e) {
+ }
+ }
+ synchronized (this) {
+ if (ioe != null) throw ioe;
+ if (re != null) throw re;
+ }
+ }
+
+ /**
+ * Returns the number of unread bytes in the buffer.
+ */
+ public synchronized int available() throws IOException {
+ return length > 0 ? length : 0;
+ }
+
+ /**
+ * Reads a byte from the stream.
+ * @throws InterruptedIOException if the timeout expired and no data was received,
+ * bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public synchronized int read() throws IOException {
+ syncfill();
+ if (length == -1) return -1;
+ int b = iobuffer[head++];
+ if (head == iobuffer.length) head = 0;
+ length--;
+ asyncfill();
+ return b;
+ }
+
+ /**
+ * Reads multiple bytes from the stream.
+ * @throws InterruptedIOException if the timeout expired and no data was received,
+ * bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public synchronized int read(byte[] buffer, int off, int len) throws IOException {
+ if (len == 0) return 0;
+ syncfill();
+ if (length == -1) return -1;
+ int pos = off;
+ if (len > length) len = length;
+ while (len-- > 0) {
+ buffer[pos++] = iobuffer[head++];
+ if (head == iobuffer.length) head = 0;
+ length--;
+ }
+ asyncfill();
+ return pos - off;
+ }
+
+ /**
+ * Skips multiple bytes in the stream.
+ * @throws InterruptedIOException if the timeout expired before all of the
+ * bytes specified have been skipped, bytesTransferred may be non-zero
+ * @throws IOException if an i/o error occurs
+ */
+ public synchronized long skip(long count) throws IOException {
+ long amount = 0;
+ try {
+ while (count != 0 && length != -1) {
+ int skip = (count > length) ? length : (int) count;
+ head = (head + skip) % iobuffer.length;
+ length -= skip;
+ amount += skip;
+ count -= skip;
+ syncfill();
+ }
+ return amount;
+ } catch (InterruptedIOException e) {
+ e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT
+ throw e;
+ }
+ }
+
+ /**
+ * Mark is not supported by the wrapper even if the underlying stream does, returns false.
+ */
+ public boolean markSupported() {
+ return false;
+ }
+
+ private void syncfill() throws IOException {
+ if (length == 0) {
+ asyncfill();
+ if (readTimeout != -1) {
+ try {
+ wait(readTimeout);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (length == 0) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+
+ private void asyncfill() throws IOException {
+ try {
+ if (ioe != null) {
+ IOException e = ioe;
+ ioe = null;
+ throw e;
+ }
+ if (re != null) {
+ RuntimeException e = re;
+ re = null;
+ throw e;
+ }
+ } finally {
+ if (length != -1 && length != iobuffer.length) {
+ notify();
+ }
+ }
+ }
+
+ private class FillBufferRunnable implements Runnable {
+ public void run() {
+ final Object lock = TimeoutInputStream.this;
+ try {
+ boolean eof = false;
+ for (;;) {
+ int off, len;
+ synchronized (lock) {
+ try {
+ while (thread != null && (length == iobuffer.length || eof || ioe != null || re != null)) {
+ lock.wait();
+ }
+ if (thread == null) return; // quit signal
+ } catch (InterruptedException e) {
+ return; // alternative quit signal
+ }
+ off = (head + length) % iobuffer.length;
+ len = ((head > off) ? head : iobuffer.length) - off;
+ }
+ try {
+ // the i/o operation might block without releasing the lock,
+ // so we do this outside of the synchronized block
+ int count = in.read(iobuffer, off, len);
+ if (count == -1) eof = true;
+ synchronized (lock) {
+ if (eof) {
+ if (length == 0) length = -1;
+ } else {
+ length += count;
+ }
+ if (count != 0) lock.notify();
+ }
+ } catch (InterruptedIOException e) {
+ int count = e.bytesTransferred; // keep partial transfer
+ e.bytesTransferred = 0; // not relevant if rethrown
+ synchronized (lock) {
+ if (length != -1) length += count;
+ ioe = e;
+ lock.notify();
+ }
+ } catch (IOException e) {
+ synchronized (lock) {
+ ioe = e;
+ lock.notify();
+ }
+ } catch (RuntimeException e) {
+ synchronized (lock) {
+ re = e;
+ lock.notify();
+ }
+ }
+ }
+ } finally {
+ try {
+ in.close();
+ } catch (IOException e) {
+ synchronized (lock) {
+ ioe = e;
+ }
+ } catch (RuntimeException e) {
+ synchronized (lock) {
+ re = e;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java
new file mode 100644
index 000000000..201b804ba
--- /dev/null
+++ b/bundles/org.eclipse.team.cvs.core/src/org/eclipse/team/internal/ccvs/core/streams/TimeoutOutputStream.java
@@ -0,0 +1,244 @@
+/*******************************************************************************
+ * Copyright (c) 2002 IBM Corporation and others.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Common Public License v0.5
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/cpl-v05.html
+ *
+ * Contributors:
+ * IBM - Initial API and implementation
+ ******************************************************************************/
+package org.eclipse.team.internal.ccvs.core.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+
+/**
+ * Wraps an output stream that blocks indefinitely to simulate timeouts on write(),
+ * flush(), and close(). The resulting output stream is buffered and supports
+ * retrying operations that failed due to an InterruptedIOException.
+ *
+ * Supports resuming partially completed operations after an InterruptedIOException
+ * REGARDLESS of whether the underlying stream does unless the underlying stream itself
+ * generates InterruptedIOExceptions in which case it must also support resuming.
+ * Check the bytesTransferred field to determine how much of the operation completed;
+ * conversely, at what point to resume.
+ */
+public class TimeoutOutputStream extends FilterOutputStream {
+ private byte[] iobuffer; // circular buffer
+ private int head = 0; // points to first unwritten byte
+ private int length = 0; // number of remaining unwritten bytes
+
+ private long writeTimeout;
+ private long closeTimeout;
+ private boolean flushRequested = false;
+ private Thread thread = new Thread(new CommitBufferRunnable(), "TimeoutInputStream");//$NON-NLS-1$
+ private IOException ioe = null;
+ private RuntimeException re = null;
+
+ /**
+ * Creates a timeout wrapper for an output stream.
+ * @param out the underlying input stream
+ * @param bufferSize the buffer size in bytes; should be large enough to mitigate
+ * Thread synchronization and context switching overhead
+ * @param writeTimeout the number of milliseconds to block for a write() or flush() before
+ * throwing an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ * @param closeTimeout the number of milliseconds to block for a close() before throwing
+ * an InterruptedIOException; 0 blocks indefinitely, -1 does not block
+ */
+ public TimeoutOutputStream(OutputStream out, int bufferSize, long writeTimeout, long closeTimeout) {
+ super(out);
+ this.iobuffer = new byte[bufferSize];
+ this.writeTimeout = writeTimeout;
+ this.closeTimeout = closeTimeout;
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Wraps the underlying stream's method.
+ * @throws InterruptedIOException if the timeout expired, bytesTransferred will
+ * reflect the number of bytes flushed from the buffer
+ * @throws IOException if an i/o error occurs
+ */
+ public void close() throws IOException {
+ if (thread == null) return;
+ try {
+ flush();
+ } finally {
+ Thread oldThread = thread;
+ thread = null;
+ oldThread.interrupt();
+ if (closeTimeout != -1) {
+ try {
+ oldThread.join(closeTimeout);
+ } catch (InterruptedException e) {
+ }
+ }
+ synchronized (this) {
+ if (ioe != null) throw ioe;
+ if (re != null) throw re;
+ }
+ }
+ }
+
+ /**
+ * Writes a byte to the stream.
+ * @throws InterruptedIOException if the timeout expired and no data was sent,
+ * bytesTransferred will be zero
+ * @throws IOException if an i/o error occurs
+ */
+ public synchronized void write(int b) throws IOException {
+ if (length == iobuffer.length) synccommit();
+ iobuffer[(head + length) % iobuffer.length] = (byte) b;
+ length++;
+ asynccommit();
+ }
+
+ /**
+ * Writes multiple bytes to the stream.
+ * @throws InterruptedIOException if the timeout expired, bytesTransferred will
+ * reflect the number of bytes sent
+ * @throws IOException if an i/o error occurs
+ */
+ public synchronized void write(byte[] buffer, int off, int len) throws IOException {
+ int amount = 0;
+ try {
+ while (len-- > 0) {
+ if (length == iobuffer.length) synccommit();
+ iobuffer[(head + length) % iobuffer.length] = buffer[off++];
+ length++;
+ amount++;
+ }
+ asynccommit();
+ } catch (InterruptedIOException e) {
+ e.bytesTransferred = amount;
+ throw e;
+ }
+ }
+
+ /**
+ * Flushes the stream.
+ * @throws InterruptedIOException if the timeout expired, bytesTransferred will
+ * reflect the number of bytes flushed from the buffer
+ * @throws IOException if an i/o error occurs
+ */
+ public synchronized void flush() throws IOException {
+ int oldLength = length;
+ flushRequested = true;
+ InterruptedIOException iioe = null;
+ try {
+ synccommit();
+ if (length == 0) return;
+ iioe = new InterruptedIOException();
+ } catch (InterruptedIOException e) {
+ iioe = e;
+ }
+ iioe.bytesTransferred = oldLength - length;
+ throw iioe;
+ }
+
+ private void synccommit() throws IOException {
+ asynccommit();
+ if (length != 0) {
+ int oldLength = length;
+ if (writeTimeout != -1) {
+ try {
+ wait(writeTimeout);
+ } catch (InterruptedException e) {
+ }
+ }
+ if (length == oldLength) {
+ throw new InterruptedIOException();
+ }
+ }
+ }
+
+ private void asynccommit() throws IOException {
+ try {
+ if (ioe != null) {
+ IOException e = ioe;
+ ioe = null;
+ throw e;
+ }
+ if (re != null) {
+ RuntimeException e = re;
+ re = null;
+ throw e;
+ }
+ } finally {
+ if (length != 0 || flushRequested) notify();
+ }
+ }
+
+ private class CommitBufferRunnable implements Runnable {
+ public void run() {
+ final Object lock = TimeoutOutputStream.this;
+ boolean running = true;
+ try {
+ for (;;) {
+ int off, len;
+ synchronized (lock) {
+ try {
+ while (running && thread != null && ((length == 0 && ! flushRequested) || ioe != null || re != null)) {
+ lock.wait();
+ }
+ if (thread == null) running = false; // quit signal
+ if (! running && length == 0) return;
+ } catch (InterruptedException e) {
+ running = false; // alternative quit signal
+ }
+ off = head;
+ len = iobuffer.length - head;
+ if (len > length) len = length;
+ }
+ try {
+ // the i/o operation might block without releasing the lock,
+ // so we do this outside of the synchronized block
+ if (len != 0) out.write(iobuffer, off, len);
+ if (flushRequested) out.flush();
+ synchronized (lock) {
+ flushRequested = false;
+ head = (head + len) % iobuffer.length;
+ length -= len;
+ lock.notify();
+ }
+ } catch (InterruptedIOException e) {
+ len = e.bytesTransferred; // keep partial transfer
+ e.bytesTransferred = 0; // not relevant if rethrown
+ synchronized (lock) {
+ head = (head + len) % iobuffer.length;
+ length -= len;
+ ioe = e;
+ lock.notify();
+ }
+ } catch (IOException e) {
+ synchronized (lock) {
+ ioe = e;
+ lock.notify();
+ }
+ } catch (RuntimeException e) {
+ synchronized (lock) {
+ re = e;
+ lock.notify();
+ }
+ }
+ }
+ } finally {
+ try {
+ out.close();
+ } catch (IOException e) {
+ synchronized (lock) {
+ ioe = e;
+ }
+ } catch (RuntimeException e) {
+ synchronized (lock) {
+ re = e;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java b/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java
index 97dd1e5d3..7cf832116 100644
--- a/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java
+++ b/bundles/org.eclipse.team.cvs.ssh/src/org/eclipse/team/internal/ccvs/ssh/Client.java
@@ -19,6 +19,9 @@ import java.net.Socket;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.team.internal.ccvs.core.connection.CVSAuthenticationException;
+import org.eclipse.team.internal.ccvs.core.streams.PollingInputStream;
+import org.eclipse.team.internal.ccvs.core.streams.PollingOutputStream;
+import org.eclipse.team.internal.ccvs.core.streams.TimeoutOutputStream;
public class Client {
// client identification string
@@ -335,10 +338,13 @@ public void connect(IProgressMonitor monitor) throws IOException, CVSAuthenticat
}
if (timeout >= 0) {
- socket.setSoTimeout(timeout * 1000);
+ socket.setSoTimeout(1000);
}
- socketIn = new BufferedInputStream(socket.getInputStream());
- socketOut = new BufferedOutputStream(socket.getOutputStream());
+ socketIn = new BufferedInputStream(new PollingInputStream(socket.getInputStream(),
+ timeout > 0 ? timeout : 1, monitor));
+ socketOut = new PollingOutputStream(new TimeoutOutputStream(
+ socket.getOutputStream(), 8192 /*bufferSize*/, 1000 /*writeTimeout*/, 1000 /*closeTimeout*/),
+ timeout > 0 ? timeout : 1, monitor);
}
// read the ssh server id. The socket creation may of failed if the
@@ -371,13 +377,9 @@ public void connect(IProgressMonitor monitor) throws IOException, CVSAuthenticat
os = new StandardOutputStream();
connected = true;
- } catch (IOException e) {
- // If an exception occurs while connected, make sure we disconnect before passing the exception on
- try {
- cleanup();
- } finally {
- throw e;
- }
+ // If an exception occurs while connected, make sure we disconnect before passing the exception on
+ } finally {
+ if (! connected) cleanup();
}
}
/**

Back to the top