diff options
author | Thomas Watson | 2017-02-15 18:43:00 +0000 |
---|---|---|
committer | Thomas Watson | 2017-06-16 12:38:08 +0000 |
commit | 89617708753627e2aecdea7393d64ccf24a0b0eb (patch) | |
tree | 5e94961260e14f6ad5fcd1522eb5c254971d18e7 | |
parent | 5bfcf92c80bd0a88f702c5c61e644db3144da53b (diff) | |
download | rt.equinox.framework-89617708753627e2aecdea7393d64ccf24a0b0eb.tar.gz rt.equinox.framework-89617708753627e2aecdea7393d64ccf24a0b0eb.tar.xz rt.equinox.framework-89617708753627e2aecdea7393d64ccf24a0b0eb.zip |
Work in progress for log stream impl
Change-Id: I0637d29c39e8b896bf5dc863cb52c51bc18f5d53
Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
11 files changed, 377 insertions, 0 deletions
diff --git a/bundles/org.eclipse.equinox.logstream/.classpath b/bundles/org.eclipse.equinox.logstream/.classpath new file mode 100644 index 000000000..eca7bdba8 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/.classpath @@ -0,0 +1,7 @@ +<?xml version="1.0" encoding="UTF-8"?> +<classpath> + <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/> + <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/> + <classpathentry kind="src" path="src"/> + <classpathentry kind="output" path="bin"/> +</classpath> diff --git a/bundles/org.eclipse.equinox.logstream/.gitignore b/bundles/org.eclipse.equinox.logstream/.gitignore new file mode 100644 index 000000000..ae3c17260 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/bundles/org.eclipse.equinox.logstream/.project b/bundles/org.eclipse.equinox.logstream/.project new file mode 100644 index 000000000..8314316ac --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/.project @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8"?> +<projectDescription> + <name>org.eclipse.equinox.log.stream</name> + <comment></comment> + <projects> + </projects> + <buildSpec> + <buildCommand> + <name>org.eclipse.jdt.core.javabuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.ManifestBuilder</name> + <arguments> + </arguments> + </buildCommand> + <buildCommand> + <name>org.eclipse.pde.SchemaBuilder</name> + <arguments> + </arguments> + </buildCommand> + </buildSpec> + <natures> + <nature>org.eclipse.pde.PluginNature</nature> + <nature>org.eclipse.jdt.core.javanature</nature> + </natures> +</projectDescription> diff --git a/bundles/org.eclipse.equinox.logstream/.settings/org.eclipse.jdt.core.prefs b/bundles/org.eclipse.equinox.logstream/.settings/org.eclipse.jdt.core.prefs new file mode 100644 index 000000000..0c68a61dc --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/.settings/org.eclipse.jdt.core.prefs @@ -0,0 +1,7 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled +org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8 +org.eclipse.jdt.core.compiler.compliance=1.8 +org.eclipse.jdt.core.compiler.problem.assertIdentifier=error +org.eclipse.jdt.core.compiler.problem.enumIdentifier=error +org.eclipse.jdt.core.compiler.source=1.8 diff --git a/bundles/org.eclipse.equinox.logstream/.settings/org.eclipse.pde.core.prefs b/bundles/org.eclipse.equinox.logstream/.settings/org.eclipse.pde.core.prefs new file mode 100644 index 000000000..f29e940a0 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/.settings/org.eclipse.pde.core.prefs @@ -0,0 +1,3 @@ +eclipse.preferences.version=1 +pluginProject.extensions=false +resolve.requirebundle=false diff --git a/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF new file mode 100644 index 000000000..3568239b1 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/META-INF/MANIFEST.MF @@ -0,0 +1,13 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Logstream +Bundle-SymbolicName: org.eclipse.equinox.log.stream +Bundle-Version: 1.0.0.qualifier +Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamFactoryImpl +Bundle-RequiredExecutionEnvironment: JavaSE-1.8 +Import-Package: org.osgi.framework;version="1.3.0", + org.osgi.service.log;version="1.4.0", + org.osgi.util.promise;version="1.0.0", + org.osgi.util.pushstream;version="1.0.0", + org.osgi.util.tracker;version="1.5.0" +Bundle-ActivationPolicy: lazy diff --git a/bundles/org.eclipse.equinox.logstream/build.properties b/bundles/org.eclipse.equinox.logstream/build.properties new file mode 100644 index 000000000..87e00da18 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/build.properties @@ -0,0 +1,5 @@ +source.. = src/ +output.. = bin/ +bin.includes = META-INF/,\ + . +jars.extra.classpath = platform:/plugin/org.eclipse.osgi/osgi/osgi.annotation.jar diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java new file mode 100644 index 000000000..0842e6c00 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java @@ -0,0 +1,55 @@ +package org.eclipse.equinox.internal.log.stream; + +import java.io.Closeable; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import org.osgi.service.log.LogEntry; +import org.osgi.service.log.LogReaderService; +import org.osgi.util.pushstream.PushEvent; +import org.osgi.util.pushstream.PushEventConsumer; +import org.osgi.util.pushstream.PushEventSource; +import org.osgi.util.tracker.ServiceTracker; + +public class LogEntrySource implements PushEventSource<LogEntry> { + private final Executor executor; + private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory; + private final Set<PushEventConsumer<? super LogEntry>> consumers = new CopyOnWriteArraySet<>(); + public LogEntrySource(Executor executor, ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) { + this.executor = executor; + this.withHistory = withHistory; + } + + + @Override + public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception { + if (!consumers.add(aec)){ + throw new IllegalStateException("Cannot add the same consumer multiple times"); + } + return () -> { + if (consumers.remove(aec)) { + try { + aec.accept(PushEvent.close()); + } catch (Exception e) { + // ignore here for log stream + } + } + }; + } + + public void logged(LogEntry entry) { + for (PushEventConsumer<? super LogEntry> consumer : consumers) { + try { + long status = consumer.accept(PushEvent.data(entry)); + if (status < 0) { + consumer.accept(PushEvent.close()); + } + } catch (Exception e) { + // we ignore exceptions here for log stream + } + } + } + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java new file mode 100644 index 000000000..3627c7c9b --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/eclipse/equinox/internal/log/stream/LogStreamFactoryImpl.java @@ -0,0 +1,157 @@ +package org.eclipse.equinox.internal.log.stream; + +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.service.log.LogEntry; +import org.osgi.service.log.LogListener; +import org.osgi.service.log.LogReaderService; +import org.osgi.service.log.stream.LogStreamProvider; +import org.osgi.util.pushstream.PushEvent; +import org.osgi.util.pushstream.PushStream; +import org.osgi.util.pushstream.PushStreamBuilder; +import org.osgi.util.pushstream.PushStreamProvider; +import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; + +public class LogStreamFactoryImpl implements BundleActivator, LogStreamProvider, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener { + private final PushStreamProvider pushStreamProvider = new PushStreamProvider(); + private final Set<LogEntrySource> logEntrySources = new CopyOnWriteArraySet<>(); + private final ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "LogStream thread"); + } + }); + private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService; + BundleContext context; + ReentrantLock eventProducerLock = new ReentrantLock(); + + /* + * (non-Javadoc) + * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext) + */ + public void start(BundleContext context) throws Exception { + this.context = context; + logReaderService = new ServiceTracker<>(context, LogReaderService.class, this); + logReaderService.open(); + } + + /* + * (non-Javadoc) + * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext) + */ + public void stop(BundleContext bundleContext) throws Exception { + logReaderService.close(); + executor.shutdown(); + } + + @Override + public AtomicReference<LogReaderService> addingService(ServiceReference<LogReaderService> reference) { + AtomicReference<LogReaderService> tracked = new AtomicReference<>(); + modifiedService(reference, tracked); + return tracked; + } + + @Override + public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef, AtomicReference<LogReaderService> modifiedTracked) { + eventProducerLock.lock(); + try { + // Check if the currently used reader service is lower ranked that the modified serviceRef + ServiceReference<LogReaderService> currentServiceRef = logReaderService.getServiceReference(); + if (currentServiceRef == null || modifiedServiceRef.compareTo(currentServiceRef) > 0) { + // The modified service reference is higher ranked than the currently used one; + // Use the modified service reference instead. + LogReaderService readerService = context.getService(modifiedServiceRef); + if (readerService != null) { + if (modifiedTracked.get() == null) { + // update our tracked object for the reference with the real service + modifiedTracked.set(readerService); + } + // remove our listener from the currently used service + if (currentServiceRef != null) { + AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentServiceRef); + if (currentTracked != null) { + LogReaderService currentLogReader = currentTracked.get(); + if (currentLogReader != null) { + // we were really using this service; + // remove the our listener and unget the service + currentLogReader.removeLogListener(this); + context.ungetService(currentServiceRef); + // finally null out our tracked reference + currentTracked.set(null); + } + } + } + readerService.addLogListener(this); + } + } + } finally { + eventProducerLock.unlock(); + } + } + + @Override + public void removedService(ServiceReference<LogReaderService> removedRef, AtomicReference<LogReaderService> removedTracked) { + eventProducerLock.lock(); + try { + } finally { + LogReaderService removedLogReader = removedTracked.get(); + if (removedLogReader != null) { + removedLogReader.removeLogListener(this); + context.ungetService(removedRef); + removedTracked.set(null); + } + ServiceReference<LogReaderService> currentRef = logReaderService.getServiceReference(); + if (currentRef != null) { + AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentRef); + if (currentTracked != null) { + LogReaderService currentLogReader = currentTracked.get(); + if (currentLogReader == null) { + currentLogReader = context.getService(currentRef); + currentTracked.set(currentLogReader); + } + if (currentLogReader != null) { + currentLogReader.addLogListener(this); + } + } + } + eventProducerLock.unlock(); + } + } + + @Override + public PushStream<LogEntry> createStream(Options... options) { + ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null; + if (options != null) { + for (Options option : options) { + if (Options.HISTORY.equals(option)) { + withHistory = logReaderService; + break; + } + } + } + LogEntrySource logEntrySource = new LogEntrySource(executor, withHistory); + PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource); + PushStream<LogEntry> logStream = streamBuilder.unbuffered().withExecutor(executor).create(); + logEntrySources.add(logEntrySource); + return logStream; + } + + @Override + public void logged(LogEntry entry) { + for (LogEntrySource logEntrySource : logEntrySources) { + logEntrySource.logged(entry); + } + } + +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/service/log/stream/LogStreamProvider.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/service/log/stream/LogStreamProvider.java new file mode 100644 index 000000000..99f5645c9 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/service/log/stream/LogStreamProvider.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) OSGi Alliance (2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.osgi.service.log.stream; + +import org.osgi.annotation.versioning.ProviderType; +import org.osgi.service.log.LogEntry; +import org.osgi.util.pushstream.PushStream; + +/** + * LogStreamProvider service for creating a PushStream of {@link LogEntry} + * objects. + * + * @ThreadSafe + * @author $Id$ + */ +@ProviderType +public interface LogStreamProvider { + /** + * Creation options for the PushStream of {@link LogEntry} objects. + */ + enum Options { + /** + * Include history. + * <p> + * Prime the created PushStream with the past {@link LogEntry} objects. + * The number of past {@link LogEntry} objects is implementation + * specific. + * <p> + * The created PushStream will supply the past {@link LogEntry} objects + * followed by newly created {@link LogEntry} objects. + */ + HISTORY; + } + + /** + * Create a PushStream of {@link LogEntry} objects. + * <p> + * The returned PushStream is an unbuffered stream with a parallelism of + * one. + * <p> + * When this LogStreamProvider service is released by the obtaining bundle, + * this LogStreamProvider service must call {@code close()} on the returned + * PushStream object if it has not already been closed. + * + * @param options The options to use when creating the PushStream. + * @return A PushStream of {@link LogEntry} objects. + */ + PushStream<LogEntry> createStream(Options... options); +} diff --git a/bundles/org.eclipse.equinox.logstream/src/org/osgi/service/log/stream/package-info.java b/bundles/org.eclipse.equinox.logstream/src/org/osgi/service/log/stream/package-info.java new file mode 100644 index 000000000..2e914e0b3 --- /dev/null +++ b/bundles/org.eclipse.equinox.logstream/src/org/osgi/service/log/stream/package-info.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) OSGi Alliance (2016). All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Log Stream Package Version 1.0. + * <p> + * Bundles wishing to use this package must list the package in the + * Import-Package header of the bundle's manifest. This package has two types of + * users: the consumers that use the API in this package and the providers that + * implement the API in this package. + * <p> + * Example import for consumers using the API in this package: + * <p> + * {@code Import-Package: org.osgi.service.log.stream; version="[1.0,2.0)"} + * <p> + * Example import for providers implementing the API in this package: + * <p> + * {@code Import-Package: org.osgi.service.log.stream; version="[1.0,1.1)"} + * + * @author $Id$ + */ +@Version("1.0") +package org.osgi.service.log.stream; + +import org.osgi.annotation.versioning.Version; |