Skip to main content
summaryrefslogtreecommitdiffstats
blob: d4049ee6b0b9742df2568e0688dad301e98f7837 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*******************************************************************************
 * Copyright (c) 2017 Google, Inc and others.
 *
 * This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License 2.0
 * which accompanies this distribution, and is available at
 * https://www.eclipse.org/legal/epl-2.0/
 *
 * SPDX-License-Identifier: EPL-2.0
 *
 * Contributors:
 *   Stefan Xenos (Google) - Initial implementation
 *******************************************************************************/
package org.eclipse.jdt.internal.core.nd.db;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
 * Combines sequential small writes into larger writes and that ensures that writes don't happen faster than a certain
 * maximum rate.
 */
public class ChunkWriter {
	private double maxBytesPerMillisecond;
	private long lastWritePosition;
	private long bufferStartPosition;
	private byte[] buffer;
	private WriteCallback writeCallback;
	private long bytesWrittenSinceLastSleep;
	private long totalWriteTimeMs;
	private long totalBytesWritten;
	private SleepCallback sleepFunction = Thread::sleep;

	/**
	 * Interface used to perform the uninterruptable writes when the buffer fills up.
	 */
	@FunctionalInterface
	public interface WriteCallback {
		/**
		 * Performs an uninterruptable write of the given bytes to the given file position.
		 *
		 * @param buffer
		 *            the bytes to write
		 * @param position
		 *            the file position to write them to
		 * @return true iff an attempt was made to interrupt the write via {@link Thread#interrupt()}. Note that the
		 *         write must succeed regardless of the return value.
		 * @throws IOException
		 *             if unable to perform the write
		 */
		boolean write(ByteBuffer buffer, long position) throws IOException;
	}

	@FunctionalInterface
	public interface SleepCallback {
		/**
		 * Sleeps the caller for the given time (in milliseconds)
		 */
		void sleep(long millisecond) throws InterruptedException;
	}

	/**
	 * Constructs a new {@link ChunkWriter}
	 *
	 * @param bufferSize
	 *            size of the write buffer (the maximum number of bytes that will be written in a single write).
	 * @param maxBytesPerMillisecond
	 *            the maximum number of bytes that will be written per second. If an attempt is made to write more
	 *            rapidly than this, the thread will be put to sleep.
	 * @param callback
	 *            will be invoked to perform the writes
	 */
	public ChunkWriter(int bufferSize, double maxBytesPerMillisecond, WriteCallback callback) {
		this.buffer = new byte[bufferSize];
		this.lastWritePosition = 0;
		this.bufferStartPosition = 0;
		this.maxBytesPerMillisecond = maxBytesPerMillisecond;
		this.writeCallback = callback;
	}

	/**
	 * Writes the given bytes to the given file position.
	 *
	 * @return true iff any attempt was made to interrupt the thread using {@link Thread#interrupt()}. The write
	 *         succeeds regardless of the return value.
	 * @throws IOException if unable to perform the write
	 */
	public boolean write(long position, byte[] data) throws IOException {
		if (position == this.lastWritePosition) {
			int bufferPosition = (int) (this.lastWritePosition - this.bufferStartPosition);
			if (bufferPosition + data.length <= this.buffer.length) {
				System.arraycopy(data, 0, this.buffer, bufferPosition, data.length);
				this.lastWritePosition = position + data.length;
				return false;
			}
		}

		boolean wasInterrupted = flush();
		System.arraycopy(data, 0, this.buffer, 0, data.length);
		this.bufferStartPosition = position;
		this.lastWritePosition = position + data.length;
		return wasInterrupted;
	}

	/**
	 * Flushes any outstanding writes to disk immediately.
	 *
	 * @return true iff any attempt was made to interrupt the thread using {@link Thread#interrupt()}. The write
	 *         succeeds regardless of the return value.
	 * @throws IOException if unable to perform the write
	 */
	public boolean flush() throws IOException {
		int bytesToWrite = (int) (this.lastWritePosition - this.bufferStartPosition);
		if (bytesToWrite == 0) {
			return false;
		}
		long startTimeMs = System.currentTimeMillis();
		boolean result = this.writeCallback.write(ByteBuffer.wrap(this.buffer, 0, bytesToWrite),
				this.bufferStartPosition);
		long elapsedTimeMs = System.currentTimeMillis() - startTimeMs;

		this.totalBytesWritten += bytesToWrite;
		this.totalWriteTimeMs += elapsedTimeMs;
		this.bytesWrittenSinceLastSleep = Math.max(0,
				this.bytesWrittenSinceLastSleep + bytesToWrite - (long) (elapsedTimeMs * this.maxBytesPerMillisecond));
		long desiredSleepTime = (long) (this.bytesWrittenSinceLastSleep / this.maxBytesPerMillisecond);

		// If we're writing too fast, sleep to create backpressure and prevent us from overloading
		// the disk's I/O bandwidth.
		if (desiredSleepTime > 0) {
			try {
				this.sleepFunction.sleep(desiredSleepTime);
				this.bytesWrittenSinceLastSleep -= this.maxBytesPerMillisecond * desiredSleepTime;
			} catch (InterruptedException e) {
				result = true;
			}
		}
		this.bufferStartPosition = this.lastWritePosition;
		return result;
	}

	/**
	 * Overrides the sleep callback function.
	 */
	public void setSleepFunction(SleepCallback callback) {
		this.sleepFunction = callback;
	}

	/**
	 * Returns the total number of bytes written
	 */
	public long getBytesWritten() {
		return this.totalBytesWritten;
	}

	/**
	 * Returns the total time spent in calls to {@link WriteCallback#write(ByteBuffer, long)}.
	 */
	public long getTotalWriteTimeMs() {
		return this.totalWriteTimeMs;
	}
}

Back to the top