From 4ccf7d8ad0f346062085775b6d47b30daa613105 Mon Sep 17 00:00:00 2001 From: Tomasz Zarna Date: Wed, 31 May 2017 17:26:47 -0700 Subject: replace synchronized methods of TaskDataStore with ReadWriteLock Change-Id: Iff82eae848243acf98f3e1a2e4aff8b6de254d3c--- .../tasks/core/data/TaskDataStoreTest.java | 333 +++++++++++++++++++++ .../internal/tasks/core/data/TaskDataStore.java | 38 ++- 2 files changed, 361 insertions(+), 10 deletions(-) create mode 100644 org.eclipse.mylyn.tasks.core.tests/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStoreTest.java diff --git a/org.eclipse.mylyn.tasks.core.tests/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStoreTest.java b/org.eclipse.mylyn.tasks.core.tests/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStoreTest.java new file mode 100644 index 000000000..73a9591ec --- /dev/null +++ b/org.eclipse.mylyn.tasks.core.tests/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStoreTest.java @@ -0,0 +1,333 @@ +/******************************************************************************* + * Copyright (c) 2017 Tasktop Technologies and others. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Tasktop Technologies - initial API and implementation + *******************************************************************************/ + +package org.eclipse.mylyn.internal.tasks.core.data; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.eclipse.core.runtime.CoreException; +import org.eclipse.mylyn.internal.tasks.core.TaskRepositoryManager; +import org.eclipse.mylyn.tasks.core.TaskRepository; +import org.eclipse.mylyn.tasks.core.data.ITaskDataWorkingCopy; +import org.eclipse.mylyn.tasks.core.data.TaskAttributeMapper; +import org.eclipse.mylyn.tasks.core.data.TaskData; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.xml.sax.SAXException; + +import com.google.common.base.Charsets; +import com.google.common.io.ByteStreams; + +public class TaskDataStoreTest { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private static final int THREAD_COUNT = 10; + + private static final long DELAY = 100; + + private static final String DATA_XML_CONTENT = "test"; + + private static final TaskData TASK_DATA = newTaskData(); + + private static final TaskDataState TEST_STATE = new TaskDataState("connectorKind", "repositoryUrl", "taskId"); + + @Test + public void getTaskDataStateByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(getTaskDataState(file)); + + assertNoFailures(failures); + } + + @Test + public void discardEditsByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(discardEdits(file)); + + assertNoFailures(failures); + } + + @Test + public void putEditsByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(putEdits(file)); + + assertNoFailures(failures); + } + + @Test + public void putTaskDataSetLastReadUserByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(putTaskData(file)); + + assertNoFailures(failures); + } + + @Test + public void setTaskDataByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(setTaskData(file)); + + assertNoFailures(failures); + } + + @Test + public void putTaskDataByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(putTaskDataState(file)); + + assertNoFailures(failures); + } + + @Test + public void deleteTaskDataByMultipleThreads() throws Exception { + File file = newTaskDataZipFile(); + + List failures = runSameByMultipleThreads(deleteTaskData(file)); + + assertNoFailures(failures); + } + + @Test + public void manipulateTaskDataConcurrently() throws Exception { + File file = newTaskDataZipFile(); + + // run multiple times to increase chance of a failure + for (int i = 0; i <= 10; i++) { + List failures = runConcurrently( // + getTaskDataState(file), // + discardEdits(file), // + putEdits(file), // + putTaskData(file), // + setTaskData(file), // + putTaskDataState(file), // + deleteTaskData(file)); + assertNoFailures(failures); + } + } + + @Test + public void manipulateTaskDataByFixedNumberOfThreads() throws Exception { + File file = newTaskDataZipFile(); + TaskDataStore store = newTaskDataStore(); + List failures = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(4); + + List threads = new ArrayList<>(); + threads.addAll(Collections.nCopies(3, thread(getTaskDataState(file), store, Optional.empty(), failures).get())); + threads.addAll(Collections.nCopies(3, thread(discardEdits(file), store, Optional.empty(), failures).get())); + threads.addAll(Collections.nCopies(3, thread(putEdits(file), store, Optional.empty(), failures).get())); + threads.addAll(Collections.nCopies(3, thread(putTaskData(file), store, Optional.empty(), failures).get())); + threads.addAll(Collections.nCopies(3, thread(setTaskData(file), store, Optional.empty(), failures).get())); + threads.addAll(Collections.nCopies(3, thread(putTaskDataState(file), store, Optional.empty(), failures).get())); + threads.addAll(Collections.nCopies(3, thread(deleteTaskData(file), store, Optional.empty(), failures).get())); + Collections.shuffle(threads); + + threads.stream().forEach(thread -> executor.submit(thread)); + + executor.shutdown(); + + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS)); + assertNoFailures(failures); + } + + private static TaskDataStore newTaskDataStore() { + TaskRepositoryManager manager = new TaskRepositoryManager(); + TaskDataExternalizer externalizer = new TaskDataExternalizer(manager) { + @Override + public TaskDataState readState(InputStream in) throws IOException, SAXException { + // read the input stream fully but do not return the result, not relevant for the purpose of the test + assertEquals(DATA_XML_CONTENT, new String(ByteStreams.toByteArray(in), Charsets.UTF_8)); + in.close(); + // pretend that reading state takes more than the blink of an eye + sleep(); + return TEST_STATE; + }; + + @Override + public void writeState(OutputStream out, ITaskDataWorkingCopy state) throws IOException { + out.write(DATA_XML_CONTENT.getBytes(Charsets.UTF_8)); + // pretend that writing state takes more than the blink of an eye + sleep(); + } + }; + return new TaskDataStore(externalizer); + } + + private static void sleep() { + try { + Thread.sleep(DELAY); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private File newTaskDataZipFile() throws IOException { + File file = folder.newFile("test.zip"); + try (ZipOutputStream outputStream = new ZipOutputStream(new FileOutputStream(file))) { + ZipEntry entry = new ZipEntry("data.xml"); + outputStream.putNextEntry(entry); + byte[] data = DATA_XML_CONTENT.getBytes(); + outputStream.write(data, 0, data.length); + outputStream.closeEntry(); + } + return file; + } + + private static TaskData newTaskData() { + TaskRepository repository = new TaskRepository("connectorKind", "repositoryUrl"); + TaskAttributeMapper mapper = new TaskAttributeMapper(repository); + return new TaskData(mapper, "connectorKind", "repositoryUrl", "taskId"); + } + + @FunctionalInterface + public interface ConsumerWithCoreException { + public void accept(T t) throws CoreException; + } + + private ConsumerWithCoreException getTaskDataState(File file) { + return store -> store.getTaskDataState(file); + } + + private ConsumerWithCoreException discardEdits(File file) { + return store -> store.discardEdits(file); + } + + private ConsumerWithCoreException putEdits(File file) { + return store -> store.putEdits(file, TASK_DATA); + } + + private ConsumerWithCoreException putTaskData(File file) { + return store -> store.putTaskData(file, TASK_DATA, true, true); + } + + private ConsumerWithCoreException setTaskData(File file) { + return store -> store.setTaskData(file, TASK_DATA); + } + + private ConsumerWithCoreException putTaskDataState(File file) { + return store -> store.putTaskData(file, TEST_STATE); + } + + private ConsumerWithCoreException deleteTaskData(File file) { + return store -> store.deleteTaskData(file); + } + + private static List runSameByMultipleThreads(ConsumerWithCoreException consumer) + throws Exception { + TaskDataStore store = newTaskDataStore(); + CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT + 1); + List failures = new ArrayList<>(THREAD_COUNT); + + List threads = Stream.generate(thread(consumer, store, Optional.of(barrier), failures)) + .limit(THREAD_COUNT) + .collect(toList()); + + startThreadsSimultaneously(barrier, threads); + return failures; + } + + @SafeVarargs + private static List runConcurrently(ConsumerWithCoreException... consumers) + throws Exception { + TaskDataStore store = newTaskDataStore(); + CyclicBarrier barrier = new CyclicBarrier(consumers.length + 1); + List failures = new ArrayList<>(consumers.length); + + List threads = Arrays.asList(consumers) + .stream() + .map(c -> thread(c, store, Optional.of(barrier), failures).get()) + .collect(toList()); + + startThreadsSimultaneously(barrier, threads); + return failures; + } + + private static void startThreadsSimultaneously(CyclicBarrier barrier, List threads) throws Exception { + threads.stream().forEach(Thread::start); + barrier.await(); + for (Thread thread : threads) { + thread.join(); + } + } + + private static Supplier thread(ConsumerWithCoreException consumer, TaskDataStore store, + Optional barrier, List failures) { + return new Supplier() { + + @Override + public Thread get() { + return new Thread(new Runnable() { + + @Override + public void run() { + try { + barrier.ifPresent(b -> { + try { + b.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + }); + consumer.accept(store); + } catch (Throwable e) { + failures.add(e); + } + } + }); + } + }; + } + + private static void assertNoFailures(List failures) { + assertTrue(format("expected no failures but found %d:\n%s", failures.size(), collectMessages(failures)), + failures.isEmpty()); + } + + private static String collectMessages(List failures) { + return failures.stream().map(e -> e.getClass().getSimpleName()).collect(Collectors.joining("\n")); + } + +} diff --git a/org.eclipse.mylyn.tasks.core/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStore.java b/org.eclipse.mylyn.tasks.core/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStore.java index defad4d18..85c61070d 100644 --- a/org.eclipse.mylyn.tasks.core/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStore.java +++ b/org.eclipse.mylyn.tasks.core/src/org/eclipse/mylyn/internal/tasks/core/data/TaskDataStore.java @@ -17,6 +17,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; @@ -40,11 +42,17 @@ public class TaskDataStore { private final TaskDataExternalizer externalizer; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + public TaskDataStore(IRepositoryManager taskRepositoryManager) { - this.externalizer = new TaskDataExternalizer(taskRepositoryManager); + this(new TaskDataExternalizer(taskRepositoryManager)); + } + + TaskDataStore(TaskDataExternalizer externalizer) { + this.externalizer = externalizer; } - public synchronized TaskDataState discardEdits(File file) throws CoreException { + public TaskDataState discardEdits(File file) throws CoreException { TaskDataState state = readState(file); if (state != null) { state.setEditsData(null); @@ -53,11 +61,11 @@ public class TaskDataStore { return state; } - public synchronized TaskDataState getTaskDataState(File file) throws CoreException { + public TaskDataState getTaskDataState(File file) throws CoreException { return readState(file); } - public synchronized void putEdits(File file, TaskData data) throws CoreException { + public void putEdits(File file, TaskData data) throws CoreException { Assert.isNotNull(file); Assert.isNotNull(data); TaskDataState state = readState(file); @@ -68,8 +76,7 @@ public class TaskDataStore { writeState(file, state); } - public synchronized TaskDataState putTaskData(File file, TaskData data, boolean setLastRead, boolean user) - throws CoreException { + public TaskDataState putTaskData(File file, TaskData data, boolean setLastRead, boolean user) throws CoreException { Assert.isNotNull(file); Assert.isNotNull(data); TaskDataState state = null; @@ -93,7 +100,7 @@ public class TaskDataStore { return state; } - public synchronized TaskDataState setTaskData(File file, TaskData data) throws CoreException { + public TaskDataState setTaskData(File file, TaskData data) throws CoreException { Assert.isNotNull(file); Assert.isNotNull(data); @@ -123,6 +130,7 @@ public class TaskDataStore { } private TaskDataState readState(File file) throws CoreException { + lock.readLock().lock(); try { if (file.exists()) { try { @@ -146,10 +154,13 @@ public class TaskDataStore { } catch (IOException e) { throw new CoreException(new Status(IStatus.ERROR, ITasksCoreConstants.ID_PLUGIN, "Error reading task data", //$NON-NLS-1$ e)); + } finally { + lock.readLock().unlock(); } } private void writeState(File file, TaskDataState state) throws CoreException { + lock.writeLock().lock(); try { try (ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(file)))) { out.setMethod(ZipOutputStream.DEFLATED); @@ -162,15 +173,22 @@ public class TaskDataStore { } catch (IOException e) { throw new CoreException(new Status(IStatus.ERROR, ITasksCoreConstants.ID_PLUGIN, "Error writing task data", //$NON-NLS-1$ e)); + } finally { + lock.writeLock().unlock(); } } - public synchronized void putTaskData(File file, TaskDataState state) throws CoreException { + public void putTaskData(File file, TaskDataState state) throws CoreException { writeState(file, state); } - public synchronized boolean deleteTaskData(File file) { - return file.delete(); + public boolean deleteTaskData(File file) { + lock.writeLock().lock(); + try { + return file.delete(); + } finally { + lock.writeLock().unlock(); + } } } -- cgit v1.2.3