Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorThomas Watson2017-08-08 09:52:38 -0400
committerThomas Watson2017-08-08 09:52:44 -0400
commit3e193d46363000ec2ee01890557c49c52b845326 (patch)
tree794080a7688b0e45d25ca5f7acbf37b104e85034
parent6f277f0934bccc45c25a54e61545e3b14d910cb4 (diff)
downloadrt.equinox.framework-3e193d46363000ec2ee01890557c49c52b845326.tar.gz
rt.equinox.framework-3e193d46363000ec2ee01890557c49c52b845326.tar.xz
rt.equinox.framework-3e193d46363000ec2ee01890557c49c52b845326.zip
Bug 516761 - remove log.stream from rt.equinox.frameworkI20170808-2000
- Project is moved to rt.equinox.bundles repo Change-Id: I8cdfb2d1743360e1d3c08593de395d8989f0afbf Signed-off-by: Thomas Watson <tjwatson@us.ibm.com>
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.classpath12
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.gitignore1
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.project28
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.core.runtime.prefs2
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.core.prefs430
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.ui.prefs64
-rw-r--r--bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.pde.core.prefs3
-rw-r--r--bundles/org.eclipse.equinox.log.stream/META-INF/MANIFEST.MF17
-rw-r--r--bundles/org.eclipse.equinox.log.stream/about.html69
-rw-r--r--bundles/org.eclipse.equinox.log.stream/build.properties10
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/osgi.annotation.jarbin13199 -> 0 bytes
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java63
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/package-info.java38
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java60
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java1480
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java79
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java111
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java35
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java205
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventConsumer.java69
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventSource.java50
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java609
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java52
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java88
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java581
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicy.java48
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicyOption.java98
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicy.java52
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicyOption.java76
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java104
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java337
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java73
-rw-r--r--bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/package-info.java39
-rw-r--r--bundles/org.eclipse.equinox.log.stream/plugin.properties12
-rw-r--r--bundles/org.eclipse.equinox.log.stream/pom.xml39
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java136
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java166
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java93
-rw-r--r--bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java108
-rw-r--r--pom.xml1
40 files changed, 0 insertions, 5538 deletions
diff --git a/bundles/org.eclipse.equinox.log.stream/.classpath b/bundles/org.eclipse.equinox.log.stream/.classpath
deleted file mode 100644
index ca218f47e..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.classpath
+++ /dev/null
@@ -1,12 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<classpath>
- <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8"/>
- <classpathentry kind="con" path="org.eclipse.pde.core.requiredPlugins"/>
- <classpathentry kind="src" path="osgi/src">
- <attributes>
- <attribute name="ignore_optional_problems" value="true"/>
- </attributes>
- </classpathentry>
- <classpathentry kind="src" path="src"/>
- <classpathentry kind="output" path="bin"/>
-</classpath>
diff --git a/bundles/org.eclipse.equinox.log.stream/.gitignore b/bundles/org.eclipse.equinox.log.stream/.gitignore
deleted file mode 100644
index ae3c17260..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/bin/
diff --git a/bundles/org.eclipse.equinox.log.stream/.project b/bundles/org.eclipse.equinox.log.stream/.project
deleted file mode 100644
index 8314316ac..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.project
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<projectDescription>
- <name>org.eclipse.equinox.log.stream</name>
- <comment></comment>
- <projects>
- </projects>
- <buildSpec>
- <buildCommand>
- <name>org.eclipse.jdt.core.javabuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.eclipse.pde.ManifestBuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- <buildCommand>
- <name>org.eclipse.pde.SchemaBuilder</name>
- <arguments>
- </arguments>
- </buildCommand>
- </buildSpec>
- <natures>
- <nature>org.eclipse.pde.PluginNature</nature>
- <nature>org.eclipse.jdt.core.javanature</nature>
- </natures>
-</projectDescription>
diff --git a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.core.runtime.prefs b/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.core.runtime.prefs
deleted file mode 100644
index 5a0ad22d2..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.core.runtime.prefs
+++ /dev/null
@@ -1,2 +0,0 @@
-eclipse.preferences.version=1
-line.separator=\n
diff --git a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.core.prefs b/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 06cd85ea1..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,430 +0,0 @@
-eclipse.preferences.version=1
-org.eclipse.jdt.core.builder.cleanOutputFolder=clean
-org.eclipse.jdt.core.builder.duplicateResourceTask=warning
-org.eclipse.jdt.core.builder.invalidClasspath=abort
-org.eclipse.jdt.core.builder.resourceCopyExclusionFilter=*.launch
-org.eclipse.jdt.core.circularClasspath=error
-org.eclipse.jdt.core.classpath.exclusionPatterns=enabled
-org.eclipse.jdt.core.classpath.multipleOutputLocations=enabled
-org.eclipse.jdt.core.compiler.annotation.inheritNullAnnotations=disabled
-org.eclipse.jdt.core.compiler.annotation.missingNonNullByDefaultAnnotation=ignore
-org.eclipse.jdt.core.compiler.annotation.nonnull=org.eclipse.jdt.annotation.NonNull
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annotation.NonNullByDefault
-org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable
-org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
-org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
-org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.8
-org.eclipse.jdt.core.compiler.debug.lineNumber=generate
-org.eclipse.jdt.core.compiler.debug.localVariable=generate
-org.eclipse.jdt.core.compiler.debug.sourceFile=generate
-org.eclipse.jdt.core.compiler.doc.comment.support=enabled
-org.eclipse.jdt.core.compiler.maxProblemPerUnit=1000
-org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=warning
-org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
-org.eclipse.jdt.core.compiler.problem.autoboxing=ignore
-org.eclipse.jdt.core.compiler.problem.comparingIdentical=warning
-org.eclipse.jdt.core.compiler.problem.deadCode=warning
-org.eclipse.jdt.core.compiler.problem.deprecation=warning
-org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled
-org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=enabled
-org.eclipse.jdt.core.compiler.problem.discouragedReference=error
-org.eclipse.jdt.core.compiler.problem.emptyStatement=warning
-org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
-org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore
-org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled
-org.eclipse.jdt.core.compiler.problem.fieldHiding=warning
-org.eclipse.jdt.core.compiler.problem.finalParameterBound=ignore
-org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=warning
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=error
-org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=warning
-org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled
-org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=warning
-org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=ignore
-org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=warning
-org.eclipse.jdt.core.compiler.problem.invalidJavadoc=error
-org.eclipse.jdt.core.compiler.problem.invalidJavadocTags=enabled
-org.eclipse.jdt.core.compiler.problem.invalidJavadocTagsDeprecatedRef=disabled
-org.eclipse.jdt.core.compiler.problem.invalidJavadocTagsNotVisibleRef=disabled
-org.eclipse.jdt.core.compiler.problem.invalidJavadocTagsVisibility=private
-org.eclipse.jdt.core.compiler.problem.localVariableHiding=warning
-org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=warning
-org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore
-org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled
-org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=error
-org.eclipse.jdt.core.compiler.problem.missingJavadocComments=ignore
-org.eclipse.jdt.core.compiler.problem.missingJavadocCommentsOverriding=enabled
-org.eclipse.jdt.core.compiler.problem.missingJavadocCommentsVisibility=public
-org.eclipse.jdt.core.compiler.problem.missingJavadocTagDescription=all_standard_tags
-org.eclipse.jdt.core.compiler.problem.missingJavadocTags=warning
-org.eclipse.jdt.core.compiler.problem.missingJavadocTagsMethodTypeParameters=disabled
-org.eclipse.jdt.core.compiler.problem.missingJavadocTagsOverriding=disabled
-org.eclipse.jdt.core.compiler.problem.missingJavadocTagsVisibility=protected
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=enabled
-org.eclipse.jdt.core.compiler.problem.missingSerialVersion=warning
-org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore
-org.eclipse.jdt.core.compiler.problem.noEffectAssignment=warning
-org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=warning
-org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=warning
-org.eclipse.jdt.core.compiler.problem.nonnullParameterAnnotationDropped=warning
-org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error
-org.eclipse.jdt.core.compiler.problem.nullReference=warning
-org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error
-org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=warning
-org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=warning
-org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=warning
-org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore
-org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.rawTypeReference=warning
-org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=error
-org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore
-org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled
-org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=warning
-org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=disabled
-org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled
-org.eclipse.jdt.core.compiler.problem.syntacticNullAnalysisForFields=disabled
-org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=warning
-org.eclipse.jdt.core.compiler.problem.typeParameterHiding=warning
-org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=enabled
-org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=warning
-org.eclipse.jdt.core.compiler.problem.unclosedCloseable=warning
-org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=warning
-org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.unnecessaryElse=warning
-org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=warning
-org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore
-org.eclipse.jdt.core.compiler.problem.unsafeTypeOperation=warning
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=warning
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=enabled
-org.eclipse.jdt.core.compiler.problem.unusedExceptionParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedImport=error
-org.eclipse.jdt.core.compiler.problem.unusedLabel=warning
-org.eclipse.jdt.core.compiler.problem.unusedLocal=warning
-org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=enabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=enabled
-org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=error
-org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.8
-org.eclipse.jdt.core.formatter.align_fields_grouping_blank_lines=2147483647
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=0
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_for_loop_header=0
-org.eclipse.jdt.core.formatter.alignment_for_method_declaration=0
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameterized_type_references=0
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_type_arguments=0
-org.eclipse.jdt.core.formatter.alignment_for_type_parameters=0
-org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_lambda_body=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.count_line_length_from_starting_position=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=false
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=false
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=false
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=do not insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true
-org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries=true
-org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments=false
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off
-org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=true
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_type_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_lambda_arrow=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_lambda_arrow=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=800
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.parentheses_positions_in_annotation=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_catch_clause=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_enum_constant_declaration=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_for_statment=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_if_while_statement=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_lambda_declaration=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_method_delcaration=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_method_invocation=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_switch_statement=common_lines
-org.eclipse.jdt.core.formatter.parentheses_positions_in_try_clause=common_lines
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=false
-org.eclipse.jdt.core.formatter.tabulation.char=tab
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_on_off_tags=false
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_assignment_operator=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
-org.eclipse.jdt.core.formatter.wrap_before_conditional_operator=true
-org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
-org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true
-org.eclipse.jdt.core.incompatibleJDKLevel=ignore
-org.eclipse.jdt.core.incompleteClasspath=error
-org.eclipse.jdt.core.javaFormatter=org.eclipse.jdt.core.defaultJavaFormatter
diff --git a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.ui.prefs b/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.ui.prefs
deleted file mode 100644
index 8a147d5b9..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.jdt.ui.prefs
+++ /dev/null
@@ -1,64 +0,0 @@
-eclipse.preferences.version=1
-editor_save_participant_org.eclipse.jdt.ui.postsavelistener.cleanup=true
-formatter_settings_version=13
-org.eclipse.jdt.ui.ignorelowercasenames=true
-org.eclipse.jdt.ui.importorder=;
-org.eclipse.jdt.ui.ondemandthreshold=99
-org.eclipse.jdt.ui.staticondemandthreshold=99
-sp_cleanup.add_default_serial_version_id=true
-sp_cleanup.add_generated_serial_version_id=false
-sp_cleanup.add_missing_annotations=true
-sp_cleanup.add_missing_deprecated_annotations=true
-sp_cleanup.add_missing_methods=false
-sp_cleanup.add_missing_nls_tags=false
-sp_cleanup.add_missing_override_annotations=true
-sp_cleanup.add_missing_override_annotations_interface_methods=true
-sp_cleanup.add_serial_version_id=false
-sp_cleanup.always_use_blocks=true
-sp_cleanup.always_use_parentheses_in_expressions=false
-sp_cleanup.always_use_this_for_non_static_field_access=false
-sp_cleanup.always_use_this_for_non_static_method_access=false
-sp_cleanup.convert_functional_interfaces=false
-sp_cleanup.convert_to_enhanced_for_loop=false
-sp_cleanup.correct_indentation=false
-sp_cleanup.format_source_code=true
-sp_cleanup.format_source_code_changes_only=false
-sp_cleanup.insert_inferred_type_arguments=false
-sp_cleanup.make_local_variable_final=true
-sp_cleanup.make_parameters_final=false
-sp_cleanup.make_private_fields_final=true
-sp_cleanup.make_type_abstract_if_missing_method=false
-sp_cleanup.make_variable_declarations_final=false
-sp_cleanup.never_use_blocks=false
-sp_cleanup.never_use_parentheses_in_expressions=true
-sp_cleanup.on_save_use_additional_actions=false
-sp_cleanup.organize_imports=true
-sp_cleanup.qualify_static_field_accesses_with_declaring_class=false
-sp_cleanup.qualify_static_member_accesses_through_instances_with_declaring_class=true
-sp_cleanup.qualify_static_member_accesses_through_subtypes_with_declaring_class=true
-sp_cleanup.qualify_static_member_accesses_with_declaring_class=false
-sp_cleanup.qualify_static_method_accesses_with_declaring_class=false
-sp_cleanup.remove_private_constructors=true
-sp_cleanup.remove_redundant_type_arguments=false
-sp_cleanup.remove_trailing_whitespaces=false
-sp_cleanup.remove_trailing_whitespaces_all=true
-sp_cleanup.remove_trailing_whitespaces_ignore_empty=false
-sp_cleanup.remove_unnecessary_casts=true
-sp_cleanup.remove_unnecessary_nls_tags=false
-sp_cleanup.remove_unused_imports=false
-sp_cleanup.remove_unused_local_variables=false
-sp_cleanup.remove_unused_private_fields=true
-sp_cleanup.remove_unused_private_members=false
-sp_cleanup.remove_unused_private_methods=true
-sp_cleanup.remove_unused_private_types=true
-sp_cleanup.sort_members=false
-sp_cleanup.sort_members_all=false
-sp_cleanup.use_anonymous_class_creation=false
-sp_cleanup.use_blocks=false
-sp_cleanup.use_blocks_only_for_return_and_throw=false
-sp_cleanup.use_lambda=true
-sp_cleanup.use_parentheses_in_expressions=false
-sp_cleanup.use_this_for_non_static_field_access=false
-sp_cleanup.use_this_for_non_static_field_access_only_if_necessary=true
-sp_cleanup.use_this_for_non_static_method_access=false
-sp_cleanup.use_this_for_non_static_method_access_only_if_necessary=true
diff --git a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.pde.core.prefs b/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.pde.core.prefs
deleted file mode 100644
index f29e940a0..000000000
--- a/bundles/org.eclipse.equinox.log.stream/.settings/org.eclipse.pde.core.prefs
+++ /dev/null
@@ -1,3 +0,0 @@
-eclipse.preferences.version=1
-pluginProject.extensions=false
-resolve.requirebundle=false
diff --git a/bundles/org.eclipse.equinox.log.stream/META-INF/MANIFEST.MF b/bundles/org.eclipse.equinox.log.stream/META-INF/MANIFEST.MF
deleted file mode 100644
index 581edcb8d..000000000
--- a/bundles/org.eclipse.equinox.log.stream/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,17 +0,0 @@
-Manifest-Version: 1.0
-Bundle-ManifestVersion: 2
-Bundle-Name: %bundleName
-Bundle-Vendor: %bundleVendor
-Bundle-SymbolicName: org.eclipse.equinox.log.stream
-Bundle-Version: 1.0.0.qualifier
-Bundle-Activator: org.eclipse.equinox.internal.log.stream.LogStreamManager
-Bundle-RequiredExecutionEnvironment: JavaSE-1.8
-Bundle-Localization: plugin
-Import-Package: org.osgi.framework;version="[1.9.0,2.0.0)",
- org.osgi.service.log;version="[1.4.0,2.0.0)",
- org.osgi.service.log.stream;version="[1.0,1.1)",
- org.osgi.util.promise;version="[1.0.0,2.0.0)",
- org.osgi.util.pushstream;version="[1.0,1.1)",
- org.osgi.util.tracker;version="[1.5.0,2.0.0)"
-Export-Package: org.osgi.service.log.stream;version="1.0.0";uses:="org.osgi.util.pushstream",
- org.osgi.util.pushstream;version="1.0.0";uses:="org.osgi.util.promise"
diff --git a/bundles/org.eclipse.equinox.log.stream/about.html b/bundles/org.eclipse.equinox.log.stream/about.html
deleted file mode 100644
index 8d1c4affb..000000000
--- a/bundles/org.eclipse.equinox.log.stream/about.html
+++ /dev/null
@@ -1,69 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
- "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
-<head>
-<meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"/>
-<title>About</title>
-</head>
-<body lang="EN-US">
-<h2>About This Content</h2>
-
-<p>August 25, 2006</p>
-<h3>License</h3>
-
-<p>The Eclipse Foundation makes available all content in this plug-in (&quot;Content&quot;). Unless otherwise
-indicated below, the Content is provided to you under the terms and conditions of the
-Eclipse Public License Version 1.0 (&quot;EPL&quot;). A copy of the EPL is available
-at <a href="http://www.eclipse.org/legal/epl-v10.html">http://www.eclipse.org/legal/epl-v10.html</a>.
-For purposes of the EPL, &quot;Program&quot; will mean the Content.</p>
-
-<p>If you did not receive this Content directly from the Eclipse Foundation, the Content is
-being redistributed by another party (&quot;Redistributor&quot;) and different terms and conditions may
-apply to your use of any object code in the Content. Check the Redistributor's license that was
-provided with the Content. If no such license exists, contact the Redistributor. Unless otherwise
-indicated below, the terms and conditions of the EPL still apply to any source code in the Content
-and such source code may be obtained at <a href="http://www.eclipse.org">http://www.eclipse.org</a>.</p>
-
-<h3>Third Party Content</h3>
-
-<p>The Content includes items that have been sourced from third parties as set out below. If you
-did not receive this Content directly from the Eclipse Foundation, the following is provided
-for informational purposes only, and you should look to the Redistributor&rsquo;s license for
-terms and conditions of use.</p>
-
-<h4>OSGi Materials</h4>
-
-<p>All files in the following sub-directories (and their sub-directories):</p>
-
-<ul>
- <li>org/osgi</li>
-</ul>
-
-<p>shall be defined as the &quot;OSGi Materials.&quot; The OSGi Materials are:</p>
-
-<blockquote>
-Copyright (c) 2000, 2006
-<br /><br />
-OSGi Alliance
-Bishop Ranch 6<br/>
-2400 Camino Ramon, Suite 375<br/>
-San Ramon, CA 94583 USA
-<br /><br />
-All Rights Reserved.
-</blockquote>
-
-<p>The OSGi Materials are provided to you under the terms and conditions of the Apache License, Version 2.0. A copy of the license is contained
-in the file <a href="about_files/LICENSE-2.0.txt" target="_blank">LICENSE-2.0.txt</a> and is also available at <a href="http://www.apache.org/licenses/LICENSE-2.0.html" target="_blank">http://www.apache.org/licenses/LICENSE-2.0.html</a>.</p>
-
-<p>Implementation of certain elements of the OSGi Materials may be subject to third party intellectual property rights, including without limitation, patent rights (such a third party may
-or may not be a member of the OSGi Alliance). The OSGi Alliance and its members are not responsible and shall not be held responsible in any manner for identifying or failing to identify any or all such third party
-intellectual property rights.</p>
-
-<small>OSGi&trade; is a trademark, registered trademark, or service mark of The OSGi Alliance in the US and other countries. Java is a trademark,
-registered trademark, or service mark of Sun Microsystems, Inc. in the US and other countries. All other trademarks, registered trademarks, or
-service marks used in the Content are the property of their respective owners and are hereby recognized.</small>
-
-<small>Java and all Java-based trademarks and logos are trademarks or registered trademarks of Sun Microsystems, Inc. in the United States and other countries.</small>
-
-</body>
-</html>
diff --git a/bundles/org.eclipse.equinox.log.stream/build.properties b/bundles/org.eclipse.equinox.log.stream/build.properties
deleted file mode 100644
index 5e02f1eb8..000000000
--- a/bundles/org.eclipse.equinox.log.stream/build.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-source.. = osgi/src/,\
- src/
-output.. = bin/
-bin.includes = META-INF/,\
- .,\
- about.html,\
- plugin.properties
-src.includes = about.html
-
-jars.extra.classpath = osgi/osgi.annotation.jar
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/osgi.annotation.jar b/bundles/org.eclipse.equinox.log.stream/osgi/osgi.annotation.jar
deleted file mode 100644
index dda27d2fe..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/osgi.annotation.jar
+++ /dev/null
Binary files differ
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
deleted file mode 100644
index 99f5645c9..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/LogStreamProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.service.log.stream;
-
-import org.osgi.annotation.versioning.ProviderType;
-import org.osgi.service.log.LogEntry;
-import org.osgi.util.pushstream.PushStream;
-
-/**
- * LogStreamProvider service for creating a PushStream of {@link LogEntry}
- * objects.
- *
- * @ThreadSafe
- * @author $Id$
- */
-@ProviderType
-public interface LogStreamProvider {
- /**
- * Creation options for the PushStream of {@link LogEntry} objects.
- */
- enum Options {
- /**
- * Include history.
- * <p>
- * Prime the created PushStream with the past {@link LogEntry} objects.
- * The number of past {@link LogEntry} objects is implementation
- * specific.
- * <p>
- * The created PushStream will supply the past {@link LogEntry} objects
- * followed by newly created {@link LogEntry} objects.
- */
- HISTORY;
- }
-
- /**
- * Create a PushStream of {@link LogEntry} objects.
- * <p>
- * The returned PushStream is an unbuffered stream with a parallelism of
- * one.
- * <p>
- * When this LogStreamProvider service is released by the obtaining bundle,
- * this LogStreamProvider service must call {@code close()} on the returned
- * PushStream object if it has not already been closed.
- *
- * @param options The options to use when creating the PushStream.
- * @return A PushStream of {@link LogEntry} objects.
- */
- PushStream<LogEntry> createStream(Options... options);
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/package-info.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/package-info.java
deleted file mode 100644
index 2e914e0b3..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/service/log/stream/package-info.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Log Stream Package Version 1.0.
- * <p>
- * Bundles wishing to use this package must list the package in the
- * Import-Package header of the bundle's manifest. This package has two types of
- * users: the consumers that use the API in this package and the providers that
- * implement the API in this package.
- * <p>
- * Example import for consumers using the API in this package:
- * <p>
- * {@code Import-Package: org.osgi.service.log.stream; version="[1.0,2.0)"}
- * <p>
- * Example import for providers implementing the API in this package:
- * <p>
- * {@code Import-Package: org.osgi.service.log.stream; version="[1.0,1.1)"}
- *
- * @author $Id$
- */
-@Version("1.0")
-package org.osgi.service.log.stream;
-
-import org.osgi.annotation.versioning.Version;
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
deleted file mode 100644
index a37e407fd..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractBufferBuilder.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-
-abstract class AbstractBufferBuilder<R, T, U extends BlockingQueue<PushEvent< ? extends T>>>
- implements BufferBuilder<R,T,U> {
-
- protected Executor worker;
- protected int concurrency;
- protected PushbackPolicy<T,U> backPressure;
- protected QueuePolicy<T,U> bufferingPolicy;
- protected U buffer;
-
- @Override
- public BufferBuilder<R,T,U> withBuffer(U queue) {
- this.buffer = queue;
- return this;
- }
-
- @Override
- public BufferBuilder<R,T,U> withQueuePolicy(
- QueuePolicy<T,U> queuePolicy) {
- this.bufferingPolicy = queuePolicy;
- return this;
- }
-
- @Override
- public BufferBuilder<R,T,U> withQueuePolicy(
- QueuePolicyOption queuePolicyOption) {
- this.bufferingPolicy = queuePolicyOption.getPolicy();
- return this;
- }
-
- @Override
- public BufferBuilder<R,T,U> withPushbackPolicy(
- PushbackPolicy<T,U> pushbackPolicy) {
- this.backPressure = pushbackPolicy;
- return this;
- }
-
- @Override
- public BufferBuilder<R,T,U> withPushbackPolicy(
- PushbackPolicyOption pushbackPolicyOption, long time) {
- this.backPressure = pushbackPolicyOption.getPolicy(time);
- return this;
- }
-
- @Override
- public BufferBuilder<R,T,U> withParallelism(int parallelism) {
- this.concurrency = parallelism;
- return this;
- }
-
- @Override
- public BufferBuilder<R,T,U> withExecutor(Executor executor) {
- this.worker = executor;
- return this;
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
deleted file mode 100644
index 2293c1aad..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/AbstractPushStreamImpl.java
+++ /dev/null
@@ -1,1480 +0,0 @@
-package org.osgi.util.pushstream;
-
-import static java.util.Collections.emptyList;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
-import static org.osgi.util.pushstream.PushEventConsumer.*;
-
-import java.time.Duration;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.ConcurrentModificationException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.BinaryOperator;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-import java.util.function.IntSupplier;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collector;
-import java.util.stream.Collectors;
-
-import org.osgi.util.promise.Deferred;
-import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.TimeoutException;
-import org.osgi.util.pushstream.PushEvent.EventType;
-
-abstract class AbstractPushStreamImpl<T> implements PushStream<T> {
-
- static enum State {
- BUILDING, STARTED, CLOSED
- }
-
- protected final PushStreamProvider psp;
-
- protected final Executor defaultExecutor;
- protected final ScheduledExecutorService scheduler;
-
- protected final AtomicReference<State> closed = new AtomicReference<>(BUILDING);
-
- protected final AtomicReference<PushEventConsumer<T>> next = new AtomicReference<>();
-
- protected final AtomicReference<Runnable> onCloseCallback = new AtomicReference<>();
- protected final AtomicReference<Consumer<? super Throwable>> onErrorCallback = new AtomicReference<>();
-
- protected abstract boolean begin();
-
- AbstractPushStreamImpl(PushStreamProvider psp,
- Executor executor, ScheduledExecutorService scheduler) {
- this.psp = psp;
- this.defaultExecutor = executor;
- this.scheduler = scheduler;
- }
-
- protected long handleEvent(PushEvent< ? extends T> event) {
- if(closed.get() != CLOSED) {
- try {
- if(event.isTerminal()) {
- close(event.nodata());
- return ABORT;
- } else {
- PushEventConsumer<T> consumer = next.get();
- long val;
- if(consumer == null) {
- //TODO log a warning
- val = CONTINUE;
- } else {
- val = consumer.accept(event);
- }
- if(val < 0) {
- close();
- }
- return val;
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- }
- return ABORT;
- }
-
- @Override
- public void close() {
- close(PushEvent.close());
- }
-
- protected boolean close(PushEvent<T> event) {
- if(!event.isTerminal()) {
- throw new IllegalArgumentException("The event " + event + " is not a close event.");
- }
- if(closed.getAndSet(CLOSED) != CLOSED) {
- PushEventConsumer<T> aec = next.getAndSet(null);
- if(aec != null) {
- try {
- aec.accept(event);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- Runnable handler = onCloseCallback.getAndSet(null);
- if(handler != null) {
- try {
- handler.run();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- if (event.getType() == EventType.ERROR) {
- Consumer<? super Throwable> errorHandler = onErrorCallback.getAndSet(null);
- if(errorHandler != null) {
- try {
- errorHandler.accept(event.getFailure());
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- return true;
- }
- return false;
- }
-
- @Override
- public PushStream<T> onClose(Runnable closeHandler) {
- if(onCloseCallback.compareAndSet(null, closeHandler)) {
- if(closed.get() == State.CLOSED && onCloseCallback.compareAndSet(closeHandler, null)) {
- closeHandler.run();
- }
- } else {
- throw new IllegalStateException("A close handler has already been defined for this stream object");
- }
- return this;
- }
-
- @Override
- public PushStream<T> onError(Consumer< ? super Throwable> closeHandler) {
- if(onErrorCallback.compareAndSet(null, closeHandler)) {
- if(closed.get() == State.CLOSED) {
- //TODO log already closed
- onErrorCallback.set(null);
- }
- } else {
- throw new IllegalStateException("A close handler has already been defined for this stream object");
- }
- return this;
- }
-
- private void updateNext(PushEventConsumer<T> consumer) {
- if(!next.compareAndSet(null, consumer)) {
- throw new IllegalStateException("This stream has already been chained");
- } else if(closed.get() == CLOSED && next.compareAndSet(consumer, null)) {
- try {
- consumer.accept(PushEvent.close());
- } catch (Exception e) {
- //TODO log
- e.printStackTrace();
- }
- }
- }
-
- @Override
- public PushStream<T> filter(Predicate< ? super T> predicate) {
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- updateNext((event) -> {
- try {
- if (!event.isTerminal()) {
- if (predicate.test(event.getData())) {
- return eventStream.handleEvent(event);
- } else {
- return CONTINUE;
- }
- }
- return eventStream.handleEvent(event);
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public <R> PushStream<R> map(Function< ? super T, ? extends R> mapper) {
-
- AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- updateNext(event -> {
- try {
- if (!event.isTerminal()) {
- return eventStream.handleEvent(
- PushEvent.data(mapper.apply(event.getData())));
- } else {
- return eventStream.handleEvent(event.nodata());
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public <R> PushStream<R> flatMap(
- Function< ? super T, ? extends PushStream< ? extends R>> mapper) {
- AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
-
- PushEventConsumer<R> consumer = e -> {
- switch (e.getType()) {
- case ERROR :
- close(e.nodata());
- return ABORT;
- case CLOSE :
- // Close should allow the next flat mapped entry
- // without closing the stream;
- return ABORT;
- case DATA :
- long returnValue = eventStream.handleEvent(e);
- if (returnValue < 0) {
- close();
- return ABORT;
- }
- return returnValue;
- default :
- throw new IllegalArgumentException(
- "The event type " + e.getType() + " is unknown");
- }
- };
-
- updateNext(event -> {
- try {
- if (!event.isTerminal()) {
- PushStream< ? extends R> mappedStream = mapper
- .apply(event.getData());
-
- return mappedStream.forEachEvent(consumer)
- .getValue()
- .longValue();
- } else {
- return eventStream.handleEvent(event.nodata());
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public PushStream<T> distinct() {
- Set<T> set = Collections.<T>newSetFromMap(new ConcurrentHashMap<>());
- return filter(set::add);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public PushStream<T> sorted() {
- return sorted((Comparator)Comparator.naturalOrder());
- }
-
- @Override
- public PushStream<T> sorted(Comparator< ? super T> comparator) {
- List<T> list = Collections.synchronizedList(new ArrayList<>());
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- updateNext(event -> {
- try {
- switch(event.getType()) {
- case DATA :
- list.add(event.getData());
- return CONTINUE;
- case CLOSE :
- list.sort(comparator);
- sorted: for (T t : list) {
- if (eventStream
- .handleEvent(PushEvent.data(t)) < 0) {
- break sorted;
- }
- }
- // Fall through
- case ERROR :
- eventStream.handleEvent(event);
- return ABORT;
- }
- return eventStream.handleEvent(event.nodata());
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public PushStream<T> limit(long maxSize) {
- if(maxSize <= 0) {
- throw new IllegalArgumentException("The limit must be greater than zero");
- }
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- AtomicLong counter = new AtomicLong(maxSize);
- updateNext(event -> {
- try {
- if (!event.isTerminal()) {
- long count = counter.decrementAndGet();
- if (count > 0) {
- return eventStream.handleEvent(event);
- } else if (count == 0) {
- eventStream.handleEvent(event);
- }
- return ABORT;
- } else {
- return eventStream.handleEvent(event.nodata());
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public PushStream<T> limit(Duration maxTime) {
-
- Runnable start = () -> scheduler.schedule(() -> close(),
- maxTime.toNanos(), NANOSECONDS);
-
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, defaultExecutor, scheduler, this) {
- @Override
- protected void beginning() {
- start.run();
- }
- };
- updateNext((event) -> {
- try {
- return eventStream.handleEvent(event);
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public PushStream<T> timeout(Duration maxTime) {
-
- AtomicLong lastTime = new AtomicLong();
- long timeout = maxTime.toNanos();
-
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<T>(
- psp, defaultExecutor, scheduler, this) {
- @Override
- protected void beginning() {
- lastTime.set(System.nanoTime());
- scheduler.schedule(() -> check(lastTime, timeout), timeout,
- NANOSECONDS);
- }
- };
- updateNext((event) -> {
- try {
- return eventStream.handleEvent(event);
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- void check(AtomicLong lastTime, long timeout) {
- long now = System.nanoTime();
-
- long elapsed = now - lastTime.get();
-
- if (elapsed < timeout) {
- scheduler.schedule(() -> check(lastTime, timeout),
- timeout - elapsed, NANOSECONDS);
- } else {
- close(PushEvent.error(new TimeoutException()));
- }
- }
-
- @Override
- public PushStream<T> skip(long n) {
- if (n < 0) {
- throw new IllegalArgumentException(
- "The number to skip must be greater than or equal to zero");
- }
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- AtomicLong counter = new AtomicLong(n);
- updateNext(event -> {
- try {
- if (!event.isTerminal()) {
- if (counter.get() > 0 && counter.decrementAndGet() >= 0) {
- return CONTINUE;
- } else {
- return eventStream.handleEvent(event);
- }
- } else {
- return eventStream.handleEvent(event.nodata());
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public PushStream<T> fork(int n, int delay, Executor ex) {
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, ex, scheduler, this);
- Semaphore s = new Semaphore(n);
- updateNext(event -> {
- try {
- if (event.isTerminal()) {
- s.acquire(n);
- eventStream.close(event.nodata());
- return ABORT;
- }
-
- s.acquire(1);
-
- ex.execute(() -> {
- try {
- if (eventStream.handleEvent(event) < 0) {
- eventStream.close(PushEvent.close());
- }
- } catch (Exception e1) {
- close(PushEvent.error(e1));
- } finally {
- s.release(1);
- }
- });
-
- return s.getQueueLength() * delay;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
-
- return eventStream;
- }
-
- @Override
- public PushStream<T> buffer() {
- return psp.createStream(c -> {
- forEachEvent(c);
- return this;
- });
- }
-
- @Override
- public <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer() {
- return psp.buildStream(c -> {
- forEachEvent(c);
- return this;
- });
- }
-
- @Override
- public PushStream<T> merge(
- PushEventSource< ? extends T> source) {
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- AtomicInteger count = new AtomicInteger(2);
- PushEventConsumer<T> consumer = event -> {
- try {
- if (!event.isTerminal()) {
- return eventStream.handleEvent(event);
- }
-
- if (count.decrementAndGet() == 0) {
- eventStream.handleEvent(event.nodata());
- return ABORT;
- }
- return CONTINUE;
- } catch (Exception e) {
- PushEvent<T> error = PushEvent.error(e);
- close(error);
- eventStream.close(event.nodata());
- return ABORT;
- }
- };
- updateNext(consumer);
- AutoCloseable second;
- try {
- second = source.open((PushEvent< ? extends T> event) -> {
- return consumer.accept(event);
- });
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- throw new IllegalStateException(
- "Unable to merge events as the event source could not be opened.",
- e);
- }
-
- return eventStream.onClose(() -> {
- try {
- second.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }).map(Function.identity());
- }
-
- @Override
- public PushStream<T> merge(PushStream< ? extends T> source) {
-
- AtomicInteger count = new AtomicInteger(2);
- Consumer<AbstractPushStreamImpl<T>> start = downstream -> {
- PushEventConsumer<T> consumer = e -> {
- long toReturn;
- try {
- if (!e.isTerminal()) {
- toReturn = downstream.handleEvent(e);
- } else if (count.decrementAndGet() == 0) {
- downstream.handleEvent(e);
- toReturn = ABORT;
- } else {
- return ABORT;
- }
- } catch (Exception ex) {
- try {
- downstream.handleEvent(PushEvent.error(ex));
- } catch (Exception ex2) { /* Just ignore this */}
- toReturn = ABORT;
- }
- if (toReturn < 0) {
- try {
- close();
- } catch (Exception ex2) { /* Just ignore this */}
- try {
- source.close();
- } catch (Exception ex2) { /* Just ignore this */}
- }
- return toReturn;
- };
- forEachEvent(consumer);
- source.forEachEvent(consumer);
- };
-
- @SuppressWarnings("resource")
- AbstractPushStreamImpl<T> eventStream = new AbstractPushStreamImpl<T>(
- psp, defaultExecutor, scheduler) {
- @Override
- protected boolean begin() {
- if (closed.compareAndSet(BUILDING, STARTED)) {
- start.accept(this);
- return true;
- }
- return false;
- }
- };
-
-
- return eventStream.onClose(() -> {
- try {
- close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- try {
- source.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }).map(Function.identity());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public PushStream<T>[] split(Predicate< ? super T>... predicates) {
- Predicate<? super T>[] tests = Arrays.copyOf(predicates, predicates.length);
- AbstractPushStreamImpl<T>[] rsult = new AbstractPushStreamImpl[tests.length];
- for(int i = 0; i < tests.length; i++) {
- rsult[i] = new IntermediatePushStreamImpl<>(psp, defaultExecutor,
- scheduler, this);
- }
-
- Boolean[] array = new Boolean[tests.length];
- Arrays.fill(array, Boolean.TRUE);
- AtomicReferenceArray<Boolean> off = new AtomicReferenceArray<>(array);
-
- AtomicInteger count = new AtomicInteger(tests.length);
- updateNext(event -> {
- if (!event.isTerminal()) {
- long delay = CONTINUE;
- for (int i = 0; i < tests.length; i++) {
- try {
- if (off.get(i).booleanValue()
- && tests[i].test(event.getData())) {
- long accept = rsult[i].handleEvent(event);
- if (accept < 0) {
- off.set(i, Boolean.TRUE);
- count.decrementAndGet();
- } else if (accept > delay) {
- accept = delay;
- }
- }
- } catch (Exception e) {
- try {
- rsult[i].close(PushEvent.error(e));
- } catch (Exception e2) {
- //TODO log
- }
- off.set(i, Boolean.TRUE);
- }
- }
- if (count.get() == 0)
- return ABORT;
-
- return delay;
- }
- for (AbstractPushStreamImpl<T> as : rsult) {
- try {
- as.handleEvent(event.nodata());
- } catch (Exception e) {
- try {
- as.close(PushEvent.error(e));
- } catch (Exception e2) {
- //TODO log
- }
- }
- }
- return ABORT;
- });
- return Arrays.copyOf(rsult, tests.length);
- }
-
- @Override
- public PushStream<T> sequential() {
- AbstractPushStreamImpl<T> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- Lock lock = new ReentrantLock();
- updateNext((event) -> {
- try {
- lock.lock();
- try {
- return eventStream.handleEvent(event);
- } finally {
- lock.unlock();
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public <R> PushStream<R> coalesce(
- Function< ? super T,Optional<R>> accumulator) {
- AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<>(
- psp, defaultExecutor, scheduler, this);
- updateNext((event) -> {
- try {
- if (!event.isTerminal()) {
- Optional<PushEvent<R>> coalesced = accumulator
- .apply(event.getData()).map(PushEvent::data);
- if (coalesced.isPresent()) {
- try {
- return eventStream.handleEvent(coalesced.get());
- } catch (Exception ex) {
- close(PushEvent.error(ex));
- return ABORT;
- }
- } else {
- return CONTINUE;
- }
- }
- return eventStream.handleEvent(event.nodata());
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- @Override
- public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f) {
- if (count <= 0)
- throw new IllegalArgumentException(
- "A coalesce operation must collect a positive number of events");
- // This could be optimised to only use a single collection queue.
- // It would save some GC, but is it worth it?
- return coalesce(() -> count, f);
- }
-
- @Override
- public <R> PushStream<R> coalesce(IntSupplier count,
- Function<Collection<T>,R> f) {
- AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
- null);
-
- Runnable init = () -> queueRef
- .set(getQueueForInternalBuffering(count.getAsInt()));
-
- @SuppressWarnings("resource")
- AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, defaultExecutor, scheduler, this) {
- @Override
- protected void beginning() {
- init.run();
- }
- };
-
- AtomicBoolean endPending = new AtomicBoolean();
- Object lock = new Object();
- updateNext((event) -> {
- try {
- Queue<T> queue;
- if (!event.isTerminal()) {
- synchronized (lock) {
- for (;;) {
- queue = queueRef.get();
- if (queue == null) {
- if (endPending.get()) {
- return ABORT;
- } else {
- continue;
- }
- } else if (queue.offer(event.getData())) {
- return CONTINUE;
- } else {
- queueRef.lazySet(null);
- break;
- }
- }
- }
-
- queueRef.set(
- getQueueForInternalBuffering(count.getAsInt()));
-
- // This call is on the same thread and so must happen
- // outside
- // the synchronized block.
- return aggregateAndForward(f, eventStream, event,
- queue);
- } else {
- synchronized (lock) {
- queue = queueRef.get();
- queueRef.lazySet(null);
- endPending.set(true);
- }
- if (queue != null) {
- eventStream.handleEvent(
- PushEvent.data(f.apply(queue)));
- }
- }
- return eventStream.handleEvent(event.nodata());
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- private <R> long aggregateAndForward(Function<Collection<T>,R> f,
- AbstractPushStreamImpl<R> eventStream,
- PushEvent< ? extends T> event, Queue<T> queue) {
- if (!queue.offer(event.getData())) {
- ((ArrayQueue<T>) queue).forcePush(event.getData());
- }
- return eventStream.handleEvent(PushEvent.data(f.apply(queue)));
- }
-
-
- @Override
- public <R> PushStream<R> window(Duration time,
- Function<Collection<T>,R> f) {
- return window(time, defaultExecutor, f);
- }
-
- @Override
- public <R> PushStream<R> window(Duration time, Executor executor,
- Function<Collection<T>,R> f) {
- return window(() -> time, () -> 0, executor, (t, c) -> f.apply(c));
- }
-
- @Override
- public <R> PushStream<R> window(Supplier<Duration> time,
- IntSupplier maxEvents,
- BiFunction<Long,Collection<T>,R> f) {
- return window(time, maxEvents, defaultExecutor, f);
- }
-
- @Override
- public <R> PushStream<R> window(Supplier<Duration> time,
- IntSupplier maxEvents, Executor ex,
- BiFunction<Long,Collection<T>,R> f) {
-
- AtomicLong timestamp = new AtomicLong();
- AtomicLong previousWindowSize = new AtomicLong();
- AtomicLong counter = new AtomicLong();
- Object lock = new Object();
- AtomicReference<Queue<T>> queueRef = new AtomicReference<Queue<T>>(
- null);
-
- // This code is declared as a separate block to avoid any confusion
- // about which instance's methods and variables are in scope
- Consumer<AbstractPushStreamImpl<R>> begin = p -> {
-
- synchronized (lock) {
- timestamp.lazySet(System.nanoTime());
- long count = counter.get();
-
-
- long windowSize = time.get().toNanos();
- previousWindowSize.set(windowSize);
- scheduler.schedule(
- getWindowTask(p, f, time, maxEvents, lock, count,
- queueRef, timestamp, counter,
- previousWindowSize, ex),
- windowSize, NANOSECONDS);
- }
-
- queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
- };
-
- @SuppressWarnings("resource")
- AbstractPushStreamImpl<R> eventStream = new IntermediatePushStreamImpl<R>(
- psp, ex, scheduler, this) {
- @Override
- protected void beginning() {
- begin.accept(this);
- }
- };
-
- AtomicBoolean endPending = new AtomicBoolean(false);
- updateNext((event) -> {
- try {
- if (eventStream.closed.get() == CLOSED) {
- return ABORT;
- }
- Queue<T> queue;
- if (!event.isTerminal()) {
- long elapsed;
- long newCount;
- synchronized (lock) {
- for (;;) {
- queue = queueRef.get();
- if (queue == null) {
- if (endPending.get()) {
- return ABORT;
- } else {
- continue;
- }
- } else if (queue.offer(event.getData())) {
- return CONTINUE;
- } else {
- queueRef.lazySet(null);
- break;
- }
- }
-
- long now = System.nanoTime();
- elapsed = now - timestamp.get();
- timestamp.lazySet(now);
- newCount = counter.get() + 1;
- counter.lazySet(newCount);
-
- // This is a non-blocking call, and must happen in the
- // synchronized block to avoid re=ordering the executor
- // enqueue with a subsequent incoming close operation
- aggregateAndForward(f, eventStream, event, queue,
- ex, elapsed);
- }
- // These must happen outside the synchronized block as we
- // call out to user code
- queueRef.set(
- getQueueForInternalBuffering(maxEvents.getAsInt()));
- long nextWindow = time.get().toNanos();
- long backpressure = previousWindowSize.getAndSet(nextWindow)
- - elapsed;
- scheduler.schedule(
- getWindowTask(eventStream, f, time, maxEvents, lock,
- newCount, queueRef, timestamp, counter,
- previousWindowSize, ex),
- nextWindow, NANOSECONDS);
-
- return backpressure < 0 ? CONTINUE
- : NANOSECONDS.toMillis(backpressure);
- } else {
- long elapsed;
- synchronized (lock) {
- queue = queueRef.get();
- queueRef.lazySet(null);
- endPending.set(true);
- long now = System.nanoTime();
- elapsed = now - timestamp.get();
- counter.lazySet(counter.get() + 1);
- }
- Collection<T> collected = queue == null ? emptyList()
- : queue;
- ex.execute(() -> {
- try {
- eventStream
- .handleEvent(PushEvent.data(f.apply(
- Long.valueOf(NANOSECONDS
- .toMillis(elapsed)),
- collected)));
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- });
- }
- ex.execute(() -> eventStream.handleEvent(event.nodata()));
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- return eventStream;
- }
-
- protected Queue<T> getQueueForInternalBuffering(int size) {
- if (size == 0) {
- return new LinkedList<T>();
- } else {
- return new ArrayQueue<>(size - 1);
- }
- }
-
- @SuppressWarnings("unchecked")
- /**
- * A special queue that keeps one element in reserve and can have that last
- * element set using forcePush. After the element is set the capacity is
- * permanently increased by one and cannot grow further.
- *
- * @param <E> The element type
- */
- private static class ArrayQueue<E> extends AbstractQueue<E>
- implements Queue<E> {
-
- final Object[] store;
-
- int normalLength;
-
- int nextIndex;
-
- int size;
-
- ArrayQueue(int capacity) {
- store = new Object[capacity + 1];
- normalLength = store.length - 1;
- }
-
- @Override
- public boolean offer(E e) {
- if (e == null)
- throw new NullPointerException("Null values are not supported");
- if (size < normalLength) {
- store[nextIndex] = e;
- size++;
- nextIndex++;
- nextIndex = nextIndex % normalLength;
- return true;
- }
- return false;
- }
-
- public void forcePush(E e) {
- store[normalLength] = e;
- normalLength++;
- size++;
- }
-
- @Override
- public E poll() {
- if (size == 0) {
- return null;
- } else {
- int idx = nextIndex - size;
- if (idx < 0) {
- idx += normalLength;
- }
- E value = (E) store[idx];
- store[idx] = null;
- size--;
- return value;
- }
- }
-
- @Override
- public E peek() {
- if (size == 0) {
- return null;
- } else {
- int idx = nextIndex - size;
- if (idx < 0) {
- idx += normalLength;
- }
- return (E) store[idx];
- }
- }
-
- @Override
- public Iterator<E> iterator() {
- final int previousNext = nextIndex;
- return new Iterator<E>() {
-
- int idx;
-
- int remaining = size;
-
- {
- idx = nextIndex - size;
- if (idx < 0) {
- idx += normalLength;
- }
- }
-
- @Override
- public boolean hasNext() {
- if (nextIndex != previousNext) {
- throw new ConcurrentModificationException(
- "The queue was concurrently modified");
- }
- return remaining > 0;
- }
-
- @Override
- public E next() {
- if (!hasNext()) {
- throw new NoSuchElementException(
- "The iterator has no more values");
- }
- E value = (E) store[idx];
- idx++;
- remaining--;
- if (idx == normalLength) {
- idx = 0;
- }
- return value;
- }
-
- };
- }
-
- @Override
- public int size() {
- return size;
- }
-
- }
-
- private <R> Runnable getWindowTask(AbstractPushStreamImpl<R> eventStream,
- BiFunction<Long,Collection<T>,R> f, Supplier<Duration> time,
- IntSupplier maxEvents, Object lock, long expectedCounter,
- AtomicReference<Queue<T>> queueRef, AtomicLong timestamp,
- AtomicLong counter, AtomicLong previousWindowSize,
- Executor executor) {
- return () -> {
-
- Queue<T> queue = null;
- long elapsed;
- synchronized (lock) {
-
- if (counter.get() != expectedCounter) {
- return;
- }
- counter.lazySet(expectedCounter + 1);
-
- long now = System.nanoTime();
- elapsed = now - timestamp.get();
- timestamp.lazySet(now);
-
- queue = queueRef.get();
- queueRef.lazySet(null);
-
- // This is a non-blocking call, and must happen in the
- // synchronized block to avoid re=ordering the executor
- // enqueue with a subsequent incoming close operation
-
- Collection<T> collected = queue == null ? emptyList() : queue;
- executor.execute(() -> {
- try {
- eventStream.handleEvent(PushEvent.data(f.apply(
- Long.valueOf(NANOSECONDS.toMillis(elapsed)),
- collected)));
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- });
- }
-
- // These must happen outside the synchronized block as we
- // call out to user code
- long nextWindow = time.get().toNanos();
- previousWindowSize.set(nextWindow);
- queueRef.set(getQueueForInternalBuffering(maxEvents.getAsInt()));
- scheduler.schedule(
- getWindowTask(eventStream, f, time, maxEvents, lock,
- expectedCounter + 1, queueRef, timestamp, counter,
- previousWindowSize, executor),
- nextWindow, NANOSECONDS);
- };
- }
-
- private <R> void aggregateAndForward(BiFunction<Long,Collection<T>,R> f,
- AbstractPushStreamImpl<R> eventStream,
- PushEvent< ? extends T> event, Queue<T> queue, Executor executor,
- long elapsed) {
- executor.execute(() -> {
- try {
- if (!queue.offer(event.getData())) {
- ((ArrayQueue<T>) queue).forcePush(event.getData());
- }
- long result = eventStream.handleEvent(PushEvent.data(
- f.apply(Long.valueOf(NANOSECONDS.toMillis(elapsed)),
- queue)));
- if (result < 0) {
- close();
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- });
- }
-
- @Override
- public Promise<Void> forEach(Consumer< ? super T> action) {
- Deferred<Void> d = new Deferred<>();
- updateNext((event) -> {
- try {
- switch(event.getType()) {
- case DATA:
- action.accept(event.getData());
- return CONTINUE;
- case CLOSE:
- d.resolve(null);
- break;
- case ERROR:
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- d.fail(e);
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
- @Override
- public Promise<Object[]> toArray() {
- return collect(Collectors.toList())
- .map(List::toArray);
- }
-
- @Override
- public <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator) {
- return collect(Collectors.toList())
- .map(l -> l.toArray(generator.apply(l.size())));
- }
-
- @Override
- public Promise<T> reduce(T identity, BinaryOperator<T> accumulator) {
- Deferred<T> d = new Deferred<>();
- AtomicReference<T> iden = new AtomicReference<T>(identity);
-
- updateNext(event -> {
- try {
- switch(event.getType()) {
- case DATA:
- iden.accumulateAndGet(event.getData(), accumulator);
- return CONTINUE;
- case CLOSE:
- d.resolve(iden.get());
- break;
- case ERROR:
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
- @Override
- public Promise<Optional<T>> reduce(BinaryOperator<T> accumulator) {
- Deferred<Optional<T>> d = new Deferred<>();
- AtomicReference<T> iden = new AtomicReference<T>(null);
-
- updateNext(event -> {
- try {
- switch(event.getType()) {
- case DATA:
- if (!iden.compareAndSet(null, event.getData()))
- iden.accumulateAndGet(event.getData(), accumulator);
- return CONTINUE;
- case CLOSE:
- d.resolve(Optional.ofNullable(iden.get()));
- break;
- case ERROR:
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
- @Override
- public <U> Promise<U> reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
- Deferred<U> d = new Deferred<>();
- AtomicReference<U> iden = new AtomicReference<>(identity);
-
- updateNext(event -> {
- try {
- switch(event.getType()) {
- case DATA:
- iden.updateAndGet((e) -> accumulator.apply(e, event.getData()));
- return CONTINUE;
- case CLOSE:
- d.resolve(iden.get());
- break;
- case ERROR:
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
- @Override
- public <R, A> Promise<R> collect(Collector<? super T, A, R> collector) {
- A result = collector.supplier().get();
- BiConsumer<A, ? super T> accumulator = collector.accumulator();
- Deferred<R> d = new Deferred<>();
- PushEventConsumer<T> consumer;
-
- if (collector.characteristics()
- .contains(Collector.Characteristics.CONCURRENT)) {
- consumer = event -> {
- try {
- switch (event.getType()) {
- case DATA :
- accumulator.accept(result, event.getData());
- return CONTINUE;
- case CLOSE :
- d.resolve(collector.finisher().apply(result));
- break;
- case ERROR :
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- };
- } else {
- consumer = event -> {
- try {
- switch (event.getType()) {
- case DATA :
- synchronized (result) {
- accumulator.accept(result, event.getData());
- }
- return CONTINUE;
- case CLOSE :
- d.resolve(collector.finisher().apply(result));
- break;
- case ERROR :
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- };
- }
-
- updateNext(consumer);
- begin();
- return d.getPromise();
- }
-
- @Override
- public Promise<Optional<T>> min(Comparator<? super T> comparator) {
- return reduce((a, b) -> comparator.compare(a, b) <= 0 ? a : b);
- }
-
- @Override
- public Promise<Optional<T>> max(Comparator<? super T> comparator) {
- return reduce((a, b) -> comparator.compare(a, b) > 0 ? a : b);
- }
-
- @Override
- public Promise<Long> count() {
- Deferred<Long> d = new Deferred<>();
- LongAdder counter = new LongAdder();
- updateNext((event) -> {
- try {
- switch(event.getType()) {
- case DATA:
- counter.add(1);
- return CONTINUE;
- case CLOSE:
- d.resolve(Long.valueOf(counter.sum()));
- break;
- case ERROR:
- d.fail(event.getFailure());
- break;
- }
- close(event.nodata());
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
- @Override
- public Promise<Boolean> anyMatch(Predicate<? super T> predicate) {
- return filter(predicate).findAny()
- .map(Optional::isPresent);
- }
-
- @Override
- public Promise<Boolean> allMatch(Predicate<? super T> predicate) {
- return filter(x -> !predicate.test(x)).findAny()
- .map(o -> Boolean.valueOf(!o.isPresent()));
- }
-
- @Override
- public Promise<Boolean> noneMatch(Predicate<? super T> predicate) {
- return filter(predicate).findAny()
- .map(o -> Boolean.valueOf(!o.isPresent()));
- }
-
- @Override
- public Promise<Optional<T>> findFirst() {
- Deferred<Optional<T>> d = new Deferred<>();
- updateNext((event) -> {
- try {
- Optional<T> o = null;
- switch(event.getType()) {
- case DATA:
- o = Optional.of(event.getData());
- break;
- case CLOSE:
- o = Optional.empty();
- break;
- case ERROR:
- d.fail(event.getFailure());
- return ABORT;
- }
- if(!d.getPromise().isDone())
- d.resolve(o);
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
- @Override
- public Promise<Optional<T>> findAny() {
- return findFirst();
- }
-
- @Override
- public Promise<Long> forEachEvent(PushEventConsumer< ? super T> action) {
- Deferred<Long> d = new Deferred<>();
- LongAdder la = new LongAdder();
- updateNext((event) -> {
- try {
- switch(event.getType()) {
- case DATA:
- long value = action.accept(event);
- la.add(value);
- return value;
- case CLOSE:
- try {
- action.accept(event);
- } finally {
- d.resolve(Long.valueOf(la.sum()));
- }
- break;
- case ERROR:
- try {
- action.accept(event);
- } finally {
- d.fail(event.getFailure());
- }
- break;
- }
- return ABORT;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- });
- begin();
- return d.getPromise();
- }
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
deleted file mode 100644
index 2aa6ec763..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferBuilder.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-
-/**
- * Create a buffered section of a Push-based stream
- *
- * @param <R> The type of object being built
- * @param <T> The type of objects in the {@link PushEvent}
- * @param <U> The type of the Queue used in the user specified buffer
- */
-public interface BufferBuilder<R, T, U extends BlockingQueue<PushEvent<? extends T>>> {
-
- /**
- * The BlockingQueue implementation to use as a buffer
- *
- * @param queue
- * @return this builder
- */
- BufferBuilder<R, T, U> withBuffer(U queue);
-
- /**
- * Set the {@link QueuePolicy} of this Builder
- *
- * @param queuePolicy
- * @return this builder
- */
- BufferBuilder<R,T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
-
- /**
- * Set the {@link QueuePolicy} of this Builder
- *
- * @param queuePolicyOption
- * @return this builder
- */
- BufferBuilder<R, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
-
- /**
- * Set the {@link PushbackPolicy} of this builder
- *
- * @param pushbackPolicy
- * @return this builder
- */
- BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy);
-
- /**
- * Set the {@link PushbackPolicy} of this builder
- *
- * @param pushbackPolicyOption
- * @param time
- * @return this builder
- */
- BufferBuilder<R, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long time);
-
- /**
- * Set the maximum permitted number of concurrent event deliveries allowed
- * from this buffer
- *
- * @param parallelism
- * @return this builder
- */
- BufferBuilder<R, T, U> withParallelism(int parallelism);
-
- /**
- * Set the {@link Executor} that should be used to deliver events from this
- * buffer
- *
- * @param executor
- * @return this builder
- */
- BufferBuilder<R, T, U> withExecutor(Executor executor);
-
- /**
- * @return the object being built
- */
- R create();
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
deleted file mode 100644
index 7cedafb5c..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/BufferedPushStreamImpl.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.osgi.util.pushstream;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
-import static org.osgi.util.pushstream.PushEventConsumer.ABORT;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-
-class BufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
- extends UnbufferedPushStreamImpl<T,U> implements PushStream<T> {
-
- private final U eventQueue;
-
- private final Semaphore semaphore;
-
- private final Executor worker;
-
- private final QueuePolicy<T, U> queuePolicy;
-
- private final PushbackPolicy<T, U> pushbackPolicy;
-
- /**
- * Indicates that a terminal event has been received, that we should stop
- * collecting new events, and that we must drain the buffer before
- * continuing
- */
- private final AtomicBoolean softClose = new AtomicBoolean();
-
- private final int parallelism;
-
- BufferedPushStreamImpl(PushStreamProvider psp,
- ScheduledExecutorService scheduler, U eventQueue,
- int parallelism, Executor worker, QueuePolicy<T,U> queuePolicy,
- PushbackPolicy<T,U> pushbackPolicy,
- Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, worker, scheduler, connector);
- this.eventQueue = eventQueue;
- this.parallelism = parallelism;
- this.semaphore = new Semaphore(parallelism);
- this.worker = worker;
- this.queuePolicy = queuePolicy;
- this.pushbackPolicy = pushbackPolicy;
- }
-
- @Override
- protected long handleEvent(PushEvent< ? extends T> event) {
-
- // If we have already been soft closed, or hard closed then abort
- if (!softClose.compareAndSet(false, event.isTerminal())
- || closed.get() == CLOSED) {
- return ABORT;
- }
-
- try {
- queuePolicy.doOffer(eventQueue, event);
- long backPressure = pushbackPolicy.pushback(eventQueue);
- if(backPressure < 0) {
- close();
- return ABORT;
- }
- if(semaphore.tryAcquire()) {
- startWorker();
- }
- return backPressure;
- } catch (Exception e) {
- close(PushEvent.error(e));
- return ABORT;
- }
- }
-
- private void startWorker() {
- worker.execute(() -> {
- try {
- PushEvent< ? extends T> event;
- while ((event = eventQueue.poll()) != null) {
- if (event.isTerminal()) {
- // Wait for the other threads to finish
- semaphore.acquire(parallelism - 1);
- }
-
- long backpressure = super.handleEvent(event);
- if(backpressure < 0) {
- close();
- return;
- } else if(backpressure > 0) {
- scheduler.schedule(this::startWorker, backpressure,
- MILLISECONDS);
- return;
- }
- }
-
- semaphore.release();
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- if(eventQueue.peek() != null && semaphore.tryAcquire()) {
- try {
- startWorker();
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- }
- });
-
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
deleted file mode 100644
index 3a4da2fd9..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/IntermediatePushStreamImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.osgi.util.pushstream;
-
-import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-
-class IntermediatePushStreamImpl<T> extends AbstractPushStreamImpl<T>
- implements PushStream<T> {
-
- private final AbstractPushStreamImpl< ? > previous;
-
- IntermediatePushStreamImpl(PushStreamProvider psp,
- Executor executor, ScheduledExecutorService scheduler,
- AbstractPushStreamImpl< ? > previous) {
- super(psp, executor, scheduler);
- this.previous = previous;
- }
-
- @Override
- protected boolean begin() {
- if(closed.compareAndSet(BUILDING, STARTED)) {
- beginning();
- previous.begin();
- return true;
- }
- return false;
- }
-
- protected void beginning() {
- // The base implementation has nothing to do, but
- // this method is used in windowing
- }
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
deleted file mode 100644
index 028f0a392..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEvent.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import static org.osgi.util.pushstream.PushEvent.EventType.*;
-
-/**
- * A PushEvent is an immutable object that is transferred through a
- * communication channel to push information to a downstream consumer. The event
- * has three different types:
- * <ul>
- * <li>{@link EventType#DATA} – Provides access to a typed data element in the
- * stream.
- * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this
- * event, no more events will follow.
- * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem
- * and is sending the reason downstream. The stream is closed and no more events
- * will follow after this event.
- * </ul>
- *
- * @param <T> The payload type of the event.
- * @Immutable
- */
-public abstract class PushEvent<T> {
-
- /**
- * The type of a {@link PushEvent}.
- */
- public static enum EventType {
- /**
- * A data event forming part of the stream
- */
- DATA,
- /**
- * An error event that indicates streaming has failed and that no more
- * events will arrive
- */
- ERROR,
- /**
- * An event that indicates that the stream has terminated normally
- */
- CLOSE
- }
-
- /**
- * Package private default constructor.
- */
- PushEvent() {}
-
- /**
- * Get the type of this event.
- *
- * @return The type of this event.
- */
- public abstract EventType getType();
-
- /**
- * Return the data for this event.
- *
- * @return The data payload.
- * @throws IllegalStateException if this event is not a
- * {@link EventType#DATA} event.
- */
- public T getData() throws IllegalStateException {
- throw new IllegalStateException(
- "Not a DATA event, the event type is " + getType());
- }
-
- /**
- * Return the error that terminated the stream.
- *
- * @return The error that terminated the stream.
- * @throws IllegalStateException if this event is not an
- * {@link EventType#ERROR} event.
- */
- public Exception getFailure() throws IllegalStateException {
- throw new IllegalStateException(
- "Not an ERROR event, the event type is " + getType());
- }
-
- /**
- * Answer if no more events will follow after this event.
- *
- * @return {@code false} if this is a data event, otherwise {@code true}.
- */
- public boolean isTerminal() {
- return true;
- }
-
- /**
- * Create a new data event.
- *
- * @param <T> The payload type.
- * @param payload The payload.
- * @return A new data event wrapping the specified payload.
- */
- public static <T> PushEvent<T> data(T payload) {
- return new DataEvent<T>(payload);
- }
-
- /**
- * Create a new error event.
- *
- * @param <T> The payload type.
- * @param e The error.
- * @return A new error event with the specified error.
- */
- public static <T> PushEvent<T> error(Exception e) {
- return new ErrorEvent<T>(e);
- }
-
- /**
- * Create a new close event.
- *
- * @param <T> The payload type.
- * @return A new close event.
- */
- public static <T> PushEvent<T> close() {
- return new CloseEvent<T>();
- }
-
- /**
- * Convenience to cast a close/error event to another payload type. Since
- * the payload type is not needed for these events this is harmless. This
- * therefore allows you to forward the close/error event downstream without
- * creating anew event.
- *
- * @param <X> The new payload type.
- * @return The current error or close event mapped to a new payload type.
- * @throws IllegalStateException if the event is a {@link EventType#DATA}
- * event.
- */
- public <X> PushEvent<X> nodata() throws IllegalStateException {
- @SuppressWarnings("unchecked")
- PushEvent<X> result = (PushEvent<X>) this;
- return result;
- }
-
- static final class DataEvent<T> extends PushEvent<T> {
- private final T data;
-
- DataEvent(T data) {
- this.data = data;
- }
-
- @Override
- public T getData() throws IllegalStateException {
- return data;
- }
-
- @Override
- public EventType getType() {
- return DATA;
- }
-
- @Override
- public boolean isTerminal() {
- return false;
- }
-
- @Override
- public <X> PushEvent<X> nodata() throws IllegalStateException {
- throw new IllegalStateException("This event is a DATA event");
- }
- }
-
- static final class ErrorEvent<T> extends PushEvent<T> {
- private final Exception error;
-
- ErrorEvent(Exception error) {
- this.error = error;
- }
-
- @Override
- public Exception getFailure() {
- return error;
- }
-
- @Override
- public EventType getType() {
- return ERROR;
- }
- }
-
- static final class CloseEvent<T> extends PushEvent<T> {
- @Override
- public EventType getType() {
- return CLOSE;
- }
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventConsumer.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventConsumer.java
deleted file mode 100644
index 43de152ae..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventConsumer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import org.osgi.annotation.versioning.ConsumerType;
-
-/**
- * An Async Event Consumer asynchronously receives Data events until it receives
- * either a Close or Error event.
- *
- * @param <T>
- * The type for the event payload
- */
-@ConsumerType
-@FunctionalInterface
-public interface PushEventConsumer<T> {
-
- /**
- * If ABORT is used as return value, the sender should close the channel all
- * the way to the upstream source. The ABORT will not guarantee that no
- * more events are delivered since this is impossible in a concurrent
- * environment. The consumer should accept subsequent events and close/clean
- * up when the Close or Error event is received.
- *
- * Though ABORT has the value -1, any value less than 0 will act as an
- * abort.
- */
- long ABORT = -1;
-
- /**
- * A 0 indicates that the consumer is willing to receive subsequent events
- * at full speeds.
- *
- * Any value more than 0 will indicate that the consumer is becoming
- * overloaded and wants a delay of the given milliseconds before the next
- * event is sent. This allows the consumer to pushback the event delivery
- * speed.
- */
- long CONTINUE = 0;
-
- /**
- * Accept an event from a source. Events can be delivered on multiple
- * threads simultaneously. However, Close and Error events are the last
- * events received, no more events must be sent after them.
- *
- * @param event The event
- * @return less than 0 means abort, 0 means continue, more than 0 means
- * delay ms
- * @throws Exception to indicate that an error has occured and that no
- * further events should be delivered to this
- * {@link PushEventConsumer}
- */
- long accept(PushEvent<? extends T> event) throws Exception;
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventSource.java
deleted file mode 100644
index d43399d77..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushEventSource.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import org.osgi.annotation.versioning.ConsumerType;
-
-/**
- * An event source. An event source can open a channel between a source and a
- * consumer. Once the channel is opened (even before it returns) the source can
- * send events to the consumer.
- *
- * A source should stop sending and automatically close the channel when sending
- * an event returns a negative value, see {@link PushEventConsumer#ABORT}.
- * Values that are larger than 0 should be treated as a request to delay the
- * next events with those number of milliseconds.
- *
- * @param <T>
- * The payload type
- */
-@ConsumerType
-@FunctionalInterface
-public interface PushEventSource<T> {
-
- /**
- * Open the asynchronous channel between the source and the consumer. The
- * call returns an {@link AutoCloseable}. This can be closed, and should
- * close the channel, including sending a Close event if the channel was not
- * already closed. The returned object must be able to be closed multiple
- * times without sending more than one Close events.
- *
- * @param aec the consumer (not null)
- * @return a {@link AutoCloseable} that can be used to close the stream
- * @throws Exception
- */
- AutoCloseable open(PushEventConsumer< ? super T> aec) throws Exception;
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
deleted file mode 100644
index c26bc8c4d..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStream.java
+++ /dev/null
@@ -1,609 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.function.BiFunction;
-import java.util.function.BinaryOperator;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-import java.util.function.IntSupplier;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-import java.util.stream.Collector;
-
-import org.osgi.annotation.versioning.ProviderType;
-import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.TimeoutException;
-
-/**
- * A Push Stream fulfills the same role as the Java 8 stream but it reverses the
- * control direction. The Java 8 stream is pull based and this is push based. A
- * Push Stream makes it possible to build a pipeline of transformations using a
- * builder kind of model. Just like streams, it provides a number of terminating
- * methods that will actually open the channel and perform the processing until
- * the channel is closed (The source sends a Close event). The results of the
- * processing will be send to a Promise, just like any error events. A stream
- * can be used multiple times. The Push Stream represents a pipeline. Upstream
- * is in the direction of the source, downstream is in the direction of the
- * terminating method. Events are sent downstream asynchronously with no
- * guarantee for ordering or concurrency. Methods are available to provide
- * serialization of the events and splitting in background threads.
- *
- * @param <T> The Payload type
- */
-@ProviderType
-public interface PushStream<T> extends AutoCloseable {
-
- /**
- * Must be run after the channel is closed. This handler will run after the
- * downstream methods have processed the close event and before the upstream
- * methods have closed.
- *
- * @param closeHandler Will be called on close
- * @return This stream
- */
- PushStream<T> onClose(Runnable closeHandler);
-
- /**
- * Must be run after the channel is closed. This handler will run after the
- * downstream methods have processed the close event and before the upstream
- * methods have closed.
- *
- * @param closeHandler Will be called on close
- * @return This stream
- */
- PushStream<T> onError(Consumer< ? super Throwable> closeHandler);
-
- /**
- * Only pass events downstream when the predicate tests true.
- *
- * @param predicate The predicate that is tested (not null)
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> filter(Predicate< ? super T> predicate);
-
- /**
- * Map a payload value.
- *
- * @param mapper The map function
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> map(Function< ? super T, ? extends R> mapper);
-
- /**
- * Flat map the payload value (turn one event into 0..n events of
- * potentially another type).
- *
- * @param mapper The flat map function
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> flatMap(
- Function< ? super T, ? extends PushStream< ? extends R>> mapper);
-
- /**
- * Remove any duplicates. Notice that this can be expensive in a large
- * stream since it must track previous payloads.
- *
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> distinct();
-
- /**
- * Sorted the elements, assuming that T extends Comparable. This is of
- * course expensive for large or infinite streams since it requires
- * buffering the stream until close.
- *
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> sorted();
-
- /**
- * Sorted the elements with the given comparator. This is of course
- * expensive for large or infinite streams since it requires buffering the
- * stream until close.
- *
- * @param comparator
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> sorted(Comparator< ? super T> comparator);
-
- /**
- * Automatically close the channel after the maxSize number of elements is
- * received.
- *
- * @param maxSize Maximum number of elements has been received
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> limit(long maxSize);
-
- /**
- * Automatically close the channel after the given amount of time has
- * elapsed.
- *
- * @param maxTime The maximum time that the stream should remain open
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> limit(Duration maxTime);
-
- /**
- * Automatically fail the channel if no events are received for the
- * indicated length of time. If the timeout is reached then a failure event
- * containing a {@link TimeoutException} will be sent.
- *
- * @param idleTime The length of time that the stream should remain open
- * when no events are being received.
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> timeout(Duration idleTime);
-
- /**
- * Skip a number of events in the channel.
- *
- * @param n number of elements to skip
- * @throws IllegalArgumentException if the number of events to skip is
- * negative
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> skip(long n);
-
- /**
- * Execute the downstream events in up to n background threads. If more
- * requests are outstanding apply delay * nr of delayed threads back
- * pressure. A downstream channel that is closed or throws an exception will
- * cause all execution to cease and the stream to close
- *
- * @param n number of simultaneous background threads to use
- * @param delay Nr of ms/thread that is queued back pressure
- * @param e an executor to use for the background threads.
- * @return Builder style (can be a new or the same object)
- * @throws IllegalArgumentException if the number of threads is < 1 or the
- * delay is < 0
- * @throws NullPointerException if the Executor is null
- */
- PushStream<T> fork(int n, int delay, Executor e)
- throws IllegalArgumentException, NullPointerException;
-
- /**
- * Buffer the events in a queue using default values for the queue size and
- * other behaviours. Buffered work will be processed asynchronously in the
- * rest of the chain. Buffering also blocks the transmission of back
- * pressure to previous elements in the chain, although back pressure is
- * honoured by the buffer.
- * <p>
- * Buffers are useful for "bursty" event sources which produce a number of
- * events close together, then none for some time. These bursts can
- * sometimes overwhelm downstream event consumers. Buffering will not,
- * however, protect downstream components from a source which produces
- * events faster than they can be consumed. For fast sources
- * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
- * {@link #fork(int, int, Executor)} are better choices.
- *
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> buffer();
-
- /**
- * Build a buffer to enqueue events in a queue using custom values for the
- * queue size and other behaviours. Buffered work will be processed
- * asynchronously in the rest of the chain. Buffering also blocks the
- * transmission of back pressure to previous elements in the chain, although
- * back pressure is honoured by the buffer.
- * <p>
- * Buffers are useful for "bursty" event sources which produce a number of
- * events close together, then none for some time. These bursts can
- * sometimes overwhelm downstream event consumers. Buffering will not,
- * however, protect downstream components from a source which produces
- * events faster than they can be consumed. For fast sources
- * {@link #filter(Predicate)} and {@link #coalesce(int, Function)}
- * {@link #fork(int, int, Executor)} are better choices.
- * <p>
- * Buffers are also useful as "circuit breakers" in the pipeline. If a
- * {@link QueuePolicyOption#FAIL} is used then a full buffer will trigger
- * the stream to close, preventing an event storm from reaching the client.
- *
- * @param parallelism
- * @param executor
- * @param queue
- * @param queuePolicy
- * @param pushbackPolicy
- * @return Builder style (can be a new or the same object)
- */
- <U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildBuffer();
-
- /**
- * Merge in the events from another source. The resulting channel is not
- * closed until this channel and the channel from the source are closed.
- *
- * @param source The source to merge in.
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> merge(PushEventSource< ? extends T> source);
-
- /**
- * Merge in the events from another PushStream. The resulting channel is not
- * closed until this channel and the channel from the source are closed.
- *
- * @param source The source to merge in.
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> merge(PushStream< ? extends T> source);
-
- /**
- * Split the events to different streams based on a predicate. If the
- * predicate is true, the event is dispatched to that channel on the same
- * position. All predicates are tested for every event.
- * <p>
- * This method differs from other methods of AsyncStream in three
- * significant ways:
- * <ul>
- * <li>The return value contains multiple streams.</li>
- * <li>This stream will only close when all of these child streams have
- * closed.</li>
- * <li>Event delivery is made to all open children that accept the event.
- * </li>
- * </ul>
- *
- * @param predicates the predicates to test
- * @return streams that map to the predicates
- */
- @SuppressWarnings("unchecked")
- PushStream<T>[] split(Predicate< ? super T>... predicates);
-
- /**
- * Ensure that any events are delivered sequentially. That is, no
- * overlapping calls downstream. This can be used to turn a forked stream
- * (where for example a heavy conversion is done in multiple threads) back
- * into a sequential stream so a reduce is simple to do.
- *
- * @return Builder style (can be a new or the same object)
- */
- PushStream<T> sequential();
-
- /**
- * Coalesces a number of events into a new type of event. The input events
- * are forwarded to a accumulator function. This function returns an
- * Optional. If the optional is present, it's value is send downstream,
- * otherwise it is ignored.
- *
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> coalesce(Function< ? super T,Optional<R>> f);
-
- /**
- * Coalesces a number of events into a new type of event. A fixed number of
- * input events are forwarded to a accumulator function. This function
- * returns new event data to be forwarded on.
- *
- * @param count
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- public <R> PushStream<R> coalesce(int count, Function<Collection<T>,R> f);
-
- /**
- * Coalesces a number of events into a new type of event. A variable number
- * of input events are forwarded to a accumulator function. The number of
- * events to be forwarded is determined by calling the count function. The
- * accumulator function then returns new event data to be forwarded on.
- *
- * @param count
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- public <R> PushStream<R> coalesce(IntSupplier count,
- Function<Collection<T>,R> f);
-
- /**
- * Buffers a number of events over a fixed time interval and then forwards
- * the events to an accumulator function. This function returns new event
- * data to be forwarded on. Note that:
- * <ul>
- * <li>The collection forwarded to the accumulator function will be empty if
- * no events arrived during the time interval.</li>
- * <li>The accumulator function will be run and the forwarded event
- * delivered as a different task, (and therefore potentially on a different
- * thread) from the one that delivered the event to this {@link PushStream}.
- * </li>
- * <li>Due to the buffering and asynchronous delivery required, this method
- * prevents the propagation of back-pressure to earlier stages</li>
- * </ul>
- *
- * @param d
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> window(Duration d, Function<Collection<T>,R> f);
-
- /**
- * Buffers a number of events over a fixed time interval and then forwards
- * the events to an accumulator function. This function returns new event
- * data to be forwarded on. Note that:
- * <ul>
- * <li>The collection forwarded to the accumulator function will be empty if
- * no events arrived during the time interval.</li>
- * <li>The accumulator function will be run and the forwarded event
- * delivered by a task given to the supplied executor.</li>
- * <li>Due to the buffering and asynchronous delivery required, this method
- * prevents the propagation of back-pressure to earlier stages</li>
- * </ul>
- *
- * @param d
- * @param executor
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> window(Duration d, Executor executor,
- Function<Collection<T>,R> f);
-
- /**
- * Buffers a number of events over a variable time interval and then
- * forwards the events to an accumulator function. The length of time over
- * which events are buffered is determined by the time function. A maximum
- * number of events can also be requested, if this number of events is
- * reached then the accumulator will be called early. The accumulator
- * function returns new event data to be forwarded on. It is also given the
- * length of time for which the buffer accumulated data. This may be less
- * than the requested interval if the buffer reached the maximum number of
- * requested events early. Note that:
- * <ul>
- * <li>The collection forwarded to the accumulator function will be empty if
- * no events arrived during the time interval.</li>
- * <li>The accumulator function will be run and the forwarded event
- * delivered as a different task, (and therefore potentially on a different
- * thread) from the one that delivered the event to this {@link PushStream}.
- * </li>
- * <li>Due to the buffering and asynchronous delivery required, this method
- * prevents the propagation of back-pressure to earlier stages</li>
- * <li>If the window finishes by hitting the maximum number of events then
- * the remaining time in the window will be applied as back-pressure to the
- * previous stage, attempting to slow the producer to the expected windowing
- * threshold.</li>
- * </ul>
- *
- * @param timeSupplier
- * @param maxEvents
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> window(Supplier<Duration> timeSupplier,
- IntSupplier maxEvents, BiFunction<Long,Collection<T>,R> f);
-
- /**
- * Buffers a number of events over a variable time interval and then
- * forwards the events to an accumulator function. The length of time over
- * which events are buffered is determined by the time function. A maximum
- * number of events can also be requested, if this number of events is
- * reached then the accumulator will be called early. The accumulator
- * function returns new event data to be forwarded on. It is also given the
- * length of time for which the buffer accumulated data. This may be less
- * than the requested interval if the buffer reached the maximum number of
- * requested events early. Note that:
- * <ul>
- * <li>The collection forwarded to the accumulator function will be empty if
- * no events arrived during the time interval.</li>
- * <li>The accumulator function will be run and the forwarded event
- * delivered as a different task, (and therefore potentially on a different
- * thread) from the one that delivered the event to this {@link PushStream}.
- * </li>
- * <li>If the window finishes by hitting the maximum number of events then
- * the remaining time in the window will be applied as back-pressure to the
- * previous stage, attempting to slow the producer to the expected windowing
- * threshold.</li>
- * </ul>
- *
- * @param timeSupplier
- * @param maxEvents
- * @param executor
- * @param f
- * @return Builder style (can be a new or the same object)
- */
- <R> PushStream<R> window(Supplier<Duration> timeSupplier,
- IntSupplier maxEvents, Executor executor,
- BiFunction<Long,Collection<T>,R> f);
-
- /**
- * Execute the action for each event received until the channel is closed.
- * This is a terminating method, the returned promise is resolved when the
- * channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param action The action to perform
- * @return A promise that is resolved when the channel closes.
- */
- Promise<Void> forEach(Consumer< ? super T> action);
-
- /**
- * Collect the payloads in an Object array after the channel is closed. This
- * is a terminating method, the returned promise is resolved when the
- * channel is closed.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @return A promise that is resolved with all the payloads received over
- * the channel
- */
- Promise<Object[]> toArray();
-
- /**
- * Collect the payloads in an Object array after the channel is closed. This
- * is a terminating method, the returned promise is resolved when the
- * channel is closed. The type of the array is handled by the caller using a
- * generator function that gets the length of the desired array.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param generator
- * @return A promise that is resolved with all the payloads received over
- * the channel
- */
- <A extends T> Promise<A[]> toArray(IntFunction<A[]> generator);
-
- /**
- * Standard reduce, see Stream. The returned promise will be resolved when
- * the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param identity The identity/begin value
- * @param accumulator The accumulator
- * @return A
- */
- Promise<T> reduce(T identity, BinaryOperator<T> accumulator);
-
- /**
- * Standard reduce without identity, so the return is an Optional. The
- * returned promise will be resolved when the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param accumulator The accumulator
- * @return an Optional
- */
- Promise<Optional<T>> reduce(BinaryOperator<T> accumulator);
-
- /**
- * Standard reduce with identity, accumulator and combiner. The returned
- * promise will be resolved when the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param identity
- * @param accumulator
- * @param combiner combines to U's into one U (e.g. how combine two lists)
- * @return The promise
- */
- <U> Promise<U> reduce(U identity, BiFunction<U, ? super T,U> accumulator,
- BinaryOperator<U> combiner);
-
- /**
- * See Stream. Will resolve onces the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param collector
- * @return A Promise representing the collected results
- */
- <R, A> Promise<R> collect(Collector< ? super T,A,R> collector);
-
- /**
- * See Stream. Will resolve onces the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param comparator
- * @return A Promise representing the minimum value, or null if no values
- * are seen before the end of the stream
- */
- Promise<Optional<T>> min(Comparator< ? super T> comparator);
-
- /**
- * See Stream. Will resolve onces the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param comparator
- * @return A Promise representing the maximum value, or null if no values
- * are seen before the end of the stream
- */
- Promise<Optional<T>> max(Comparator< ? super T> comparator);
-
- /**
- * See Stream. Will resolve onces the channel closes.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @return A Promise representing the number of values in the stream
- */
- Promise<Long> count();
-
- /**
- * Close the channel and resolve the promise with true when the predicate
- * matches a payload. If the channel is closed before the predicate matches,
- * the promise is resolved with false.
- * <p>
- * This is a <strong>short circuiting terminal operation</strong>
- *
- * @param predicate
- * @return A Promise that will resolve when an event matches the predicate,
- * or the end of the stream is reached
- */
- Promise<Boolean> anyMatch(Predicate< ? super T> predicate);
-
- /**
- * Closes the channel and resolve the promise with false when the predicate
- * does not matches a pay load.If the channel is closed before, the promise
- * is resolved with true.
- * <p>
- * This is a <strong>short circuiting terminal operation</strong>
- *
- * @param predicate
- * @return A Promise that will resolve when an event fails to match the
- * predicate, or the end of the stream is reached
- */
- Promise<Boolean> allMatch(Predicate< ? super T> predicate);
-
- /**
- * Closes the channel and resolve the promise with false when the predicate
- * matches any pay load. If the channel is closed before, the promise is
- * resolved with true.
- * <p>
- * This is a <strong>short circuiting terminal operation</strong>
- *
- * @param predicate
- * @return A Promise that will resolve when an event matches the predicate,
- * or the end of the stream is reached
- */
- Promise<Boolean> noneMatch(Predicate< ? super T> predicate);
-
- /**
- * Close the channel and resolve the promise with the first element. If the
- * channel is closed before, the Optional will have no value.
- *
- * @return a promise
- */
- Promise<Optional<T>> findFirst();
-
- /**
- * Close the channel and resolve the promise with the first element. If the
- * channel is closed before, the Optional will have no value.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @return a promise
- */
- Promise<Optional<T>> findAny();
-
- /**
- * Pass on each event to another consumer until the stream is closed.
- * <p>
- * This is a <strong>terminal operation</strong>
- *
- * @param action
- * @return a promise
- */
- Promise<Long> forEachEvent(PushEventConsumer< ? super T> action);
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
deleted file mode 100644
index d59c8d9d3..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilder.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-
-/**
- * A Builder for a PushStream. This Builder extends the support of a standard
- * BufferBuilder by allowing the PushStream to be unbuffered.
- *
- *
- * @param <T> The type of objects in the {@link PushEvent}
- * @param <U> The type of the Queue used in the user specified buffer
- */
-public interface PushStreamBuilder<T, U extends BlockingQueue<PushEvent< ? extends T>>>
- extends BufferBuilder<PushStream<T>,T,U> {
-
- /**
- * Tells this {@link PushStreamBuilder} to create an unbuffered stream which
- * delivers events directly to its consumer using the incoming delivery
- * thread.
- *
- * @return the builder
- */
- PushStreamBuilder<T,U> unbuffered();
-
- /*
- * Overridden methods to allow the covariant return of a PushStreamBuilder
- */
-
- @Override
- PushStreamBuilder<T,U> withBuffer(U queue);
-
- @Override
- PushStreamBuilder<T,U> withQueuePolicy(QueuePolicy<T,U> queuePolicy);
-
- @Override
- PushStreamBuilder<T,U> withQueuePolicy(QueuePolicyOption queuePolicyOption);
-
- @Override
- PushStreamBuilder<T,U> withPushbackPolicy(
- PushbackPolicy<T,U> pushbackPolicy);
-
- @Override
- PushStreamBuilder<T,U> withPushbackPolicy(
- PushbackPolicyOption pushbackPolicyOption, long time);
-
- @Override
- PushStreamBuilder<T,U> withParallelism(int parallelism);
-
- @Override
- PushStreamBuilder<T,U> withExecutor(Executor executor);
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
deleted file mode 100644
index 5ec7cb336..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamBuilderImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-
-class PushStreamBuilderImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
- extends AbstractBufferBuilder<PushStream<T>,T,U>
- implements PushStreamBuilder<T,U> {
-
- private final PushStreamProvider psp;
- private final PushEventSource<T> eventSource;
- private final Executor previousExecutor;
-
- private boolean unbuffered;
-
- PushStreamBuilderImpl(PushStreamProvider psp, Executor defaultExecutor,
- PushEventSource<T> eventSource) {
- this.psp = psp;
- this.previousExecutor = defaultExecutor;
- this.eventSource = eventSource;
- this.worker = defaultExecutor;
- }
-
- @Override
- public PushStreamBuilder<T,U> withBuffer(U queue) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withBuffer(queue);
- }
-
- @Override
- public PushStreamBuilder<T,U> withQueuePolicy(
- QueuePolicy<T,U> queuePolicy) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withQueuePolicy(queuePolicy);
- }
-
- @Override
- public PushStreamBuilder<T,U> withQueuePolicy(
- QueuePolicyOption queuePolicyOption) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withQueuePolicy(
- queuePolicyOption);
- }
-
- @Override
- public PushStreamBuilder<T,U> withPushbackPolicy(
- PushbackPolicy<T,U> pushbackPolicy) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
- pushbackPolicy);
- }
-
- @Override
- public PushStreamBuilder<T,U> withPushbackPolicy(
- PushbackPolicyOption pushbackPolicyOption, long time) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withPushbackPolicy(
- pushbackPolicyOption, time);
- }
-
- @Override
- public PushStreamBuilder<T,U> withParallelism(int parallelism) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withParallelism(parallelism);
- }
-
- @Override
- public PushStreamBuilder<T,U> withExecutor(Executor executor) {
- unbuffered = false;
- return (PushStreamBuilder<T,U>) super.withExecutor(executor);
- }
-
- @Override
- public PushStreamBuilder<T,U> unbuffered() {
- unbuffered = true;
- return this;
- }
-
- @Override
- public PushStream<T> create() {
- if (unbuffered) {
- return psp.createUnbufferedStream(eventSource, previousExecutor);
- } else {
- return psp.createStream(eventSource, concurrency, worker, buffer,
- bufferingPolicy, backPressure);
- }
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
deleted file mode 100644
index be87c6bce..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushStreamProvider.java
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.CLOSED;
-import static org.osgi.util.pushstream.PushEvent.*;
-import static org.osgi.util.pushstream.PushbackPolicyOption.LINEAR;
-import static org.osgi.util.pushstream.QueuePolicyOption.FAIL;
-
-import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
-/**
- * A factory for {@link PushStream} instances, and utility methods for handling
- * {@link PushEventSource}s and {@link PushEventConsumer}s
- */
-public final class PushStreamProvider {
-
- private final Lock lock = new ReentrantLock(true);
-
- private int schedulerReferences;
-
- private ScheduledExecutorService scheduler;
-
- private ScheduledExecutorService acquireScheduler() {
- try {
- lock.lockInterruptibly();
- try {
- schedulerReferences += 1;
-
- if (schedulerReferences == 1) {
- scheduler = Executors.newSingleThreadScheduledExecutor();
- }
- return scheduler;
- } finally {
- lock.unlock();
- }
- } catch (InterruptedException e) {
- throw new IllegalStateException("Unable to acquire the Scheduler",
- e);
- }
- }
-
- private void releaseScheduler() {
- try {
- lock.lockInterruptibly();
- try {
- schedulerReferences -= 1;
-
- if (schedulerReferences == 0) {
- scheduler.shutdown();
- scheduler = null;
- }
- } finally {
- lock.unlock();
- }
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- /**
- * Create a stream with the default configured buffer, executor size, queue,
- * queue policy and pushback policy. This is equivalent to calling
- *
- * <code>
- * buildStream(source).create();
- * </code>
- *
- * <p>
- * This stream will be buffered from the event producer, and will honour
- * back pressure even if the source does not.
- *
- * <p>
- * Buffered streams are useful for "bursty" event sources which produce a
- * number of events close together, then none for some time. These bursts
- * can sometimes overwhelm downstream processors. Buffering will not,
- * however, protect downstream components from a source which produces
- * events faster (on average) than they can be consumed.
- *
- * <p>
- * Event delivery will not begin until a terminal operation is reached on
- * the chain of AsyncStreams. Once a terminal operation is reached the
- * stream will be connected to the event source.
- *
- * @param eventSource
- * @return A {@link PushStream} with a default initial buffer
- */
- public <T> PushStream<T> createStream(PushEventSource<T> eventSource) {
- return createStream(eventSource, 1, null, new ArrayBlockingQueue<>(32),
- FAIL.getPolicy(), LINEAR.getPolicy(1000));
- }
-
- /**
- * Builds a push stream with custom configuration.
- *
- * <p>
- *
- * The resulting {@link PushStream} may be buffered or unbuffered depending
- * on how it is configured.
- *
- * @param eventSource The source of the events
- *
- * @return A {@link PushStreamBuilder} for the stream
- */
- public <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStreamBuilder<T,U> buildStream(
- PushEventSource<T> eventSource) {
- return new PushStreamBuilderImpl<T,U>(this, null, eventSource);
- }
-
- @SuppressWarnings({
- "rawtypes", "unchecked"
- })
- <T, U extends BlockingQueue<PushEvent< ? extends T>>> PushStream<T> createStream(
- PushEventSource<T> eventSource, int parallelism, Executor executor,
- U queue, QueuePolicy<T,U> queuePolicy,
- PushbackPolicy<T,U> pushbackPolicy) {
-
- if (eventSource == null) {
- throw new NullPointerException("There is no source of events");
- }
-
- if (parallelism < 0) {
- throw new IllegalArgumentException(
- "The supplied parallelism cannot be less than zero. It was "
- + parallelism);
- } else if (parallelism == 0) {
- parallelism = 1;
- }
-
- boolean closeExecutorOnClose;
- Executor toUse;
- if (executor == null) {
- toUse = Executors.newFixedThreadPool(parallelism);
- closeExecutorOnClose = true;
- } else {
- toUse = executor;
- closeExecutorOnClose = false;
- }
-
- if (queue == null) {
- queue = (U) new ArrayBlockingQueue(32);
- }
-
- if (queuePolicy == null) {
- queuePolicy = FAIL.getPolicy();
- }
-
- if (pushbackPolicy == null) {
- pushbackPolicy = LINEAR.getPolicy(1000);
- }
-
- @SuppressWarnings("resource")
- PushStream<T> stream = new BufferedPushStreamImpl<>(this,
- acquireScheduler(), queue, parallelism, toUse, queuePolicy,
- pushbackPolicy, aec -> {
- try {
- return eventSource.open(aec);
- } catch (Exception e) {
- throw new RuntimeException(
- "Unable to connect to event source", e);
- }
- });
-
- stream = stream.onClose(() -> {
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- }).map(Function.identity());
- return stream;
- }
-
- <T> PushStream<T> createUnbufferedStream(PushEventSource<T> eventSource,
- Executor executor) {
-
- boolean closeExecutorOnClose;
- Executor toUse;
- if (executor == null) {
- toUse = Executors.newFixedThreadPool(2);
- closeExecutorOnClose = true;
- } else {
- toUse = executor;
- closeExecutorOnClose = false;
- }
-
- @SuppressWarnings("resource")
- PushStream<T> stream = new UnbufferedPushStreamImpl<>(this, toUse,
- acquireScheduler(), aec -> {
- try {
- return eventSource.open(aec);
- } catch (Exception e) {
- throw new RuntimeException(
- "Unable to connect to event source", e);
- }
- });
-
- stream = stream.onClose(() -> {
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- }).map(Function.identity());
-
- return stream;
- }
-
- /**
- * Convert an {@link PushStream} into an {@link PushEventSource}. The first
- * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
- * processing.
- *
- * The {@link PushEventSource} will remain active until the backing stream
- * is closed, and permits multiple consumers to
- * {@link PushEventSource#open(PushEventConsumer)} it.
- *
- * This is equivalent to: <code>
- * buildEventSourceFromStream(stream).create();
- * </code>
- *
- * @param stream
- * @return a {@link PushEventSource} backed by the {@link PushStream}
- */
- public <T> PushEventSource<T> createEventSourceFromStream(
- PushStream<T> stream) {
- return buildEventSourceFromStream(stream).create();
- }
-
- /**
- * Convert an {@link PushStream} into an {@link PushEventSource}. The first
- * call to {@link PushEventSource#open(PushEventConsumer)} will begin event
- * processing.
- *
- * The {@link PushEventSource} will remain active until the backing stream
- * is closed, and permits multiple consumers to
- * {@link PushEventSource#open(PushEventConsumer)} it.
- *
- * @param stream
- *
- * @return a {@link PushEventSource} backed by the {@link PushStream}
- */
- public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventSource<T>,T,U> buildEventSourceFromStream(
- PushStream<T> stream) {
- return new AbstractBufferBuilder<PushEventSource<T>,T,U>() {
- @Override
- public PushEventSource<T> create() {
- SimplePushEventSource<T> spes = createSimplePushEventSource(
- concurrency, worker, buffer, bufferingPolicy, () -> {
- try {
- stream.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- });
- spes.connectPromise()
- .then(p -> stream.forEach(t -> spes.publish(t))
- .onResolve(() -> spes.close()));
- return spes;
- }
- };
- }
-
-
- /**
- * Create a {@link SimplePushEventSource} with the supplied type and default
- * buffering behaviours. The SimplePushEventSource will respond to back
- * pressure requests from the consumers connected to it.
- *
- * This is equivalent to: <code>
- * buildSimpleEventSource(type).create();
- * </code>
- *
- * @param type
- * @return a {@link SimplePushEventSource}
- */
- public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> type) {
- return createSimplePushEventSource(1, null,
- new ArrayBlockingQueue<>(32),
- FAIL.getPolicy(), () -> { /* Nothing else to do */ });
- }
-
- /**
- *
- * Build a {@link SimplePushEventSource} with the supplied type and custom
- * buffering behaviours. The SimplePushEventSource will respond to back
- * pressure requests from the consumers connected to it.
- *
- * @param type
- *
- * @return a {@link SimplePushEventSource}
- */
-
- public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<SimplePushEventSource<T>,T,U> buildSimpleEventSource(
- Class<T> type) {
- return new AbstractBufferBuilder<SimplePushEventSource<T>,T,U>() {
- @Override
- public SimplePushEventSource<T> create() {
- return createSimplePushEventSource(concurrency, worker, buffer,
- bufferingPolicy, () -> { /* Nothing else to do */ });
- }
- };
- }
-
- @SuppressWarnings({
- "unchecked", "rawtypes"
- })
- <T, U extends BlockingQueue<PushEvent< ? extends T>>> SimplePushEventSource<T> createSimplePushEventSource(
- int parallelism, Executor executor, U queue,
- QueuePolicy<T,U> queuePolicy, Runnable onClose) {
-
- if (parallelism < 0) {
- throw new IllegalArgumentException(
- "The supplied parallelism cannot be less than zero. It was "
- + parallelism);
- } else if (parallelism == 0) {
- parallelism = 1;
- }
-
- boolean closeExecutorOnClose;
- Executor toUse;
- if (executor == null) {
- toUse = Executors.newFixedThreadPool(2);
- closeExecutorOnClose = true;
- } else {
- toUse = executor;
- closeExecutorOnClose = false;
- }
-
- if (queue == null) {
- queue = (U) new ArrayBlockingQueue(32);
- }
-
- if (queuePolicy == null) {
- queuePolicy = FAIL.getPolicy();
- }
-
- SimplePushEventSourceImpl<T,U> spes = new SimplePushEventSourceImpl<T,U>(
- toUse, acquireScheduler(), queuePolicy, queue, parallelism,
- () -> {
- try {
- onClose.run();
- } catch (Exception e) {
- // TODO log this?
- }
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- });
- return spes;
- }
-
- /**
- * Create a buffered {@link PushEventConsumer} with the default configured
- * buffer, executor size, queue, queue policy and pushback policy. This is
- * equivalent to calling
- *
- * <code>
- * buildBufferedConsumer(delegate).create();
- * </code>
- *
- * <p>
- * The returned consumer will be buffered from the event source, and will
- * honour back pressure requests from its delegate even if the event source
- * does not.
- *
- * <p>
- * Buffered consumers are useful for "bursty" event sources which produce a
- * number of events close together, then none for some time. These bursts
- * can sometimes overwhelm the consumer. Buffering will not, however,
- * protect downstream components from a source which produces events faster
- * than they can be consumed.
- *
- * @param delegate
- * @return a {@link PushEventConsumer} with a buffer directly before it
- */
- public <T> PushEventConsumer<T> createBufferedConsumer(
- PushEventConsumer<T> delegate) {
- return buildBufferedConsumer(delegate).create();
- }
-
- /**
- * Build a buffered {@link PushEventConsumer} with custom configuration.
- * <p>
- * The returned consumer will be buffered from the event source, and will
- * honour back pressure requests from its delegate even if the event source
- * does not.
- * <p>
- * Buffered consumers are useful for "bursty" event sources which produce a
- * number of events close together, then none for some time. These bursts
- * can sometimes overwhelm the consumer. Buffering will not, however,
- * protect downstream components from a source which produces events faster
- * than they can be consumed.
- * <p>
- * Buffers are also useful as "circuit breakers". If a
- * {@link QueuePolicyOption#FAIL} is used then a full buffer will request
- * that the stream close, preventing an event storm from reaching the
- * client.
- * <p>
- * Note that this buffered consumer will close when it receives a terminal
- * event, or if the delegate returns negative backpressure. No further
- * events will be propagated after this time.
- *
- * @param delegate
- * @return a {@link PushEventConsumer} with a buffer directly before it
- */
- public <T, U extends BlockingQueue<PushEvent< ? extends T>>> BufferBuilder<PushEventConsumer<T>,T,U> buildBufferedConsumer(
- PushEventConsumer<T> delegate) {
- return new AbstractBufferBuilder<PushEventConsumer<T>,T,U>() {
- @Override
- public PushEventConsumer<T> create() {
- PushEventPipe<T> pipe = new PushEventPipe<>();
-
- createStream(pipe, concurrency, worker, buffer, bufferingPolicy, backPressure)
- .forEachEvent(delegate);
-
- return pipe;
- }
- };
- }
-
- static final class PushEventPipe<T>
- implements PushEventConsumer<T>, PushEventSource<T> {
-
- volatile PushEventConsumer< ? super T> delegate;
-
- @Override
- public AutoCloseable open(PushEventConsumer< ? super T> pec)
- throws Exception {
- return () -> { /* Nothing else to do */ };
- }
-
- @Override
- public long accept(PushEvent< ? extends T> event) throws Exception {
- return delegate.accept(event);
- }
-
- }
-
- /**
- * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
- * data from the stream will be pushed into the PushStream synchronously as
- * it is opened. This may make terminal operations blocking unless a buffer
- * has been added to the {@link PushStream}. Care should be taken with
- * infinite {@link Stream}s to avoid blocking indefinitely.
- *
- * @param items The items to push into the PushStream
- * @return A PushStream containing the items from the Java Stream
- */
- public <T> PushStream<T> streamOf(Stream<T> items) {
- PushEventSource<T> pes = aec -> {
- AtomicBoolean closed = new AtomicBoolean(false);
-
- items.mapToLong(i -> {
- try {
- long returnValue = closed.get() ? -1 : aec.accept(data(i));
- if (returnValue < 0) {
- aec.accept(PushEvent.<T> close());
- }
- return returnValue;
- } catch (Exception e) {
- try {
- aec.accept(PushEvent.<T> error(e));
- } catch (Exception e2) {/* No further events needed */}
- return -1;
- }
- }).filter(i -> i < 0).findFirst().orElseGet(() -> {
- try {
- return aec.accept(PushEvent.<T> close());
- } catch (Exception e) {
- return -1;
- }
- });
-
- return () -> closed.set(true);
- };
-
- return this.<T> createUnbufferedStream(pes, null);
- }
-
- /**
- * Create an Unbuffered {@link PushStream} from a Java {@link Stream} The
- * data from the stream will be pushed into the PushStream asynchronously
- * using the supplied Executor.
- *
- * @param executor The worker to use to push items from the Stream into the
- * PushStream
- * @param items The items to push into the PushStream
- * @return A PushStream containing the items from the Java Stream
- */
- public <T> PushStream<T> streamOf(Executor executor, Stream<T> items) {
-
- boolean closeExecutorOnClose;
- Executor toUse;
- if (executor == null) {
- toUse = Executors.newFixedThreadPool(2);
- closeExecutorOnClose = true;
- } else {
- toUse = executor;
- closeExecutorOnClose = false;
- }
-
- @SuppressWarnings("resource")
- PushStream<T> stream = new UnbufferedPushStreamImpl<T,BlockingQueue<PushEvent< ? extends T>>>(
- this, toUse, acquireScheduler(), aec -> {
- return () -> { /* No action to take */ };
- }) {
-
- @Override
- protected boolean begin() {
- if (super.begin()) {
- Iterator<T> it = items.iterator();
-
- toUse.execute(() -> pushData(it));
-
- return true;
- }
- return false;
- }
-
- private void pushData(Iterator<T> it) {
- while (it.hasNext()) {
- try {
- long returnValue = closed.get() == CLOSED ? -1
- : handleEvent(data(it.next()));
- if (returnValue != 0) {
- if (returnValue < 0) {
- close();
- return;
- } else {
- scheduler.schedule(
- () -> toUse.execute(() -> pushData(it)),
- returnValue, MILLISECONDS);
- return;
- }
- }
- } catch (Exception e) {
- close(error(e));
- }
- }
- close();
- }
- };
-
- stream = stream.onClose(() -> {
- if (closeExecutorOnClose) {
- ((ExecutorService) toUse).shutdown();
- }
- releaseScheduler();
- }).map(Function.identity());
-
- return stream;
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicy.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicy.java
deleted file mode 100644
index 4f7f1864f..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicy.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-import org.osgi.annotation.versioning.ConsumerType;
-
-/**
- * A {@link PushbackPolicy} is used to calculate how much back pressure to apply
- * based on the current buffer. The {@link PushbackPolicy} will be called after
- * an event has been queued, and the returned value will be used as back
- * pressure.
- *
- * @see PushbackPolicyOption
- *
- *
- * @param <T> The type of the data
- * @param <U> The type of the queue
- */
-@ConsumerType
-@FunctionalInterface
-public interface PushbackPolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> {
-
- /**
- * Given the current state of the queue, determine the level of back
- * pressure that should be applied
- *
- * @param queue
- * @return a back pressure value in nanoseconds
- * @throws Exception
- */
- public long pushback(U queue) throws Exception;
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicyOption.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicyOption.java
deleted file mode 100644
index ecd0e3ea3..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/PushbackPolicyOption.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * {@link PushbackPolicyOption} provides a standard set of simple
- * {@link PushbackPolicy} implementations.
- *
- * @see PushbackPolicy
- */
-public enum PushbackPolicyOption {
-
- /**
- * Returns a fixed amount of back pressure, independent of how full the
- * buffer is
- */
- FIXED {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
- return q -> value;
- }
- },
- /**
- * Returns zero back pressure until the buffer is full, then it returns a
- * fixed value
- */
- ON_FULL_FIXED {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
- return q -> q.remainingCapacity() == 0 ? value : 0;
- }
- },
- /**
- * Returns zero back pressure until the buffer is full, then it returns an
- * exponentially increasing amount, starting with the supplied value and
- * doubling it each time. Once the buffer is no longer full the back
- * pressure returns to zero.
- */
- ON_FULL_EXPONENTIAL {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
- AtomicInteger backoffCount = new AtomicInteger(0);
- return q -> {
- if (q.remainingCapacity() == 0) {
- return value << backoffCount.getAndIncrement();
- }
- backoffCount.set(0);
- return 0;
- };
-
- }
- },
- /**
- * Returns zero back pressure when the buffer is empty, then it returns a
- * linearly increasing amount of back pressure based on how full the buffer
- * is. The maximum value will be returned when the buffer is full.
- */
- LINEAR {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value) {
- return q -> {
- long remainingCapacity = q.remainingCapacity();
- long used = q.size();
- return (value * used) / (used + remainingCapacity);
- };
- }
- };
-
- /**
- * Create a {@link PushbackPolicy} instance configured with a base back
- * pressure time in nanoseconds
- *
- * The actual backpressure returned will vary based on the selected
- * implementation, the base value, and the state of the buffer.
- *
- * @param value
- * @return A {@link PushbackPolicy} to use
- */
- public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> PushbackPolicy<T, U> getPolicy(long value);
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicy.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicy.java
deleted file mode 100644
index cba94b16c..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-
-import org.osgi.annotation.versioning.ConsumerType;
-import org.osgi.util.pushstream.PushEvent.EventType;
-
-/**
- * A {@link QueuePolicy} is used to control how events should be queued in the
- * current buffer. The {@link QueuePolicy} will be called when an event has
- * arrived.
- *
- * @see QueuePolicyOption
- *
- *
- * @param <T> The type of the data
- * @param <U> The type of the queue
- */
-
-@ConsumerType
-@FunctionalInterface
-public interface QueuePolicy<T, U extends BlockingQueue<PushEvent<? extends T>>> {
-
- /**
- * Enqueue the event and return the remaining capacity available for events
- *
- * @param queue
- * @param event
- * @throws Exception If an error ocurred adding the event to the queue. This
- * exception will cause the connection between the
- * {@link PushEventSource} and the {@link PushEventConsumer} to be
- * closed with an {@link EventType#ERROR}
- */
- public void doOffer(U queue, PushEvent<? extends T> event) throws Exception;
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicyOption.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicyOption.java
deleted file mode 100644
index 35df890ee..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/QueuePolicyOption.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * {@link QueuePolicyOption} provides a standard set of simple
- * {@link QueuePolicy} implementations.
- *
- * @see QueuePolicy
- */
-public enum QueuePolicyOption {
- /**
- * Attempt to add the supplied event to the queue. If the queue is unable to
- * immediately accept the value then discard the value at the head of the
- * queue and try again. Repeat this process until the event is enqueued.
- */
- DISCARD_OLDEST {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
- return (queue, event) -> {
- while (!queue.offer(event)) {
- queue.poll();
- }
- };
- }
- },
- /**
- * Attempt to add the supplied event to the queue, blocking until the
- * enqueue is successful.
- */
- BLOCK {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
- return (queue, event) -> {
- try {
- queue.put(event);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- };
- }
- },
- /**
- * Attempt to add the supplied event to the queue, throwing an exception if
- * the queue is full.
- */
- FAIL {
- @Override
- public <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy() {
- return (queue, event) -> queue.add(event);
- }
- };
-
- /**
- * @return a {@link QueuePolicy} implementation
- */
- public abstract <T, U extends BlockingQueue<PushEvent<? extends T>>> QueuePolicy<T, U> getPolicy();
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
deleted file mode 100644
index 747b4530d..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSource.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.osgi.util.pushstream;
-
-import org.osgi.annotation.versioning.ProviderType;
-import org.osgi.util.promise.Promise;
-
-/**
- * A {@link SimplePushEventSource} is a helper that makes it simpler to write a
- * {@link PushEventSource}. Users do not need to manage multiple registrations
- * to the stream, nor do they have to be concerned with back pressure.
- *
- * @param <T> The type of the events produced by this source
- */
-@ProviderType
-public interface SimplePushEventSource<T>
- extends PushEventSource<T>, AutoCloseable {
- /**
- * Close this source. Calling this method indicates that there will never be
- * any more events published by it. Calling this method sends a close event
- * to all connected consumers. After calling this method any
- * {@link PushEventConsumer} that tries to {@link #open(PushEventConsumer)}
- * this source will immediately receive a close event.
- */
- @Override
- void close();
-
- /**
- * Asynchronously publish an event to this stream and all connected
- * {@link PushEventConsumer} instances. When this method returns there is no
- * guarantee that all consumers have been notified. Events published by a
- * single thread will maintain their relative ordering, however they may be
- * interleaved with events from other threads.
- *
- * @param t
- * @throws IllegalStateException if the source is closed
- */
- void publish(T t);
-
- /**
- * Close this source for now, but potentially reopen it later. Calling this
- * method asynchronously sends a close event to all connected consumers.
- * After calling this method any {@link PushEventConsumer} that wishes may
- * {@link #open(PushEventConsumer)} this source, and will receive subsequent
- * events.
- */
- void endOfStream();
-
- /**
- * Close this source for now, but potentially reopen it later. Calling this
- * method asynchronously sends an error event to all connected consumers.
- * After calling this method any {@link PushEventConsumer} that wishes may
- * {@link #open(PushEventConsumer)} this source, and will receive subsequent
- * events.
- *
- * @param e the error
- */
- void error(Exception e);
-
- /**
- * Determine whether there are any {@link PushEventConsumer}s for this
- * {@link PushEventSource}. This can be used to skip expensive event
- * creation logic when there are no listeners.
- *
- * @return true if any consumers are currently connected
- */
- boolean isConnected();
-
- /**
- * This method can be used to delay event generation until an event source
- * has connected. The returned promise will resolve as soon as one or more
- * {@link PushEventConsumer} instances have opened the
- * SimplePushEventSource.
- * <p>
- * The returned promise may already be resolved if this
- * {@link SimplePushEventSource} already has connected consumers. If the
- * {@link SimplePushEventSource} is closed before the returned Promise
- * resolves then it will be failed with an {@link IllegalStateException}.
- * <p>
- * Note that the connected consumers are able to asynchronously close their
- * connections to this {@link SimplePushEventSource}, and therefore it is
- * possible that once the promise resolves this
- * {@link SimplePushEventSource} may no longer be connected to any
- * consumers.
- *
- * @return A promise representing the connection state of this EventSource
- */
- Promise<Void> connectPromise();
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
deleted file mode 100644
index e31c9bf59..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/SimplePushEventSourceImpl.java
+++ /dev/null
@@ -1,337 +0,0 @@
-package org.osgi.util.pushstream;
-
-import static java.util.Collections.emptyList;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.stream.Collectors.toList;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-
-import org.osgi.util.promise.Deferred;
-import org.osgi.util.promise.Promise;
-import org.osgi.util.promise.Promises;
-
-class SimplePushEventSourceImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
- implements SimplePushEventSource<T> {
-
- private final Object lock = new Object();
-
- private final Executor worker;
-
- private final ScheduledExecutorService scheduler;
-
- private final QueuePolicy<T,U> queuePolicy;
-
- private final U queue;
-
- private final int parallelism;
-
- private final Semaphore semaphore;
-
- private final List<PushEventConsumer< ? super T>> connected = new ArrayList<>();
-
- private final Runnable onClose;
-
- private boolean closed;
-
- private Deferred<Void> connectPromise;
-
- private boolean waitForFinishes;
-
-
- public SimplePushEventSourceImpl(Executor worker,
- ScheduledExecutorService scheduler, QueuePolicy<T,U> queuePolicy,
- U queue, int parallelism, Runnable onClose) {
- this.worker = worker;
- this.scheduler = scheduler;
- this.queuePolicy = queuePolicy;
- this.queue = queue;
- this.parallelism = parallelism;
- this.semaphore = new Semaphore(parallelism);
- this.onClose = onClose;
- this.closed = false;
- this.connectPromise = null;
- }
-
- @Override
- public AutoCloseable open(PushEventConsumer< ? super T> pec)
- throws Exception {
- Deferred<Void> toResolve = null;
- synchronized (lock) {
- if (closed) {
- throw new IllegalStateException(
- "This PushEventConsumer is closed");
- }
-
- toResolve = connectPromise;
- connectPromise = null;
-
- connected.add(pec);
- }
-
- if (toResolve != null) {
- toResolve.resolve(null);
- }
-
- return () -> {
- closeConsumer(pec, PushEvent.close());
- };
- }
-
- private void closeConsumer(PushEventConsumer< ? super T> pec,
- PushEvent<T> event) {
- boolean sendClose;
- synchronized (lock) {
- sendClose = connected.remove(pec);
- }
- if (sendClose) {
- doSend(pec, event);
- }
- }
-
- private void doSend(PushEventConsumer< ? super T> pec, PushEvent<T> event) {
- try {
- worker.execute(() -> safePush(pec, event));
- } catch (RejectedExecutionException ree) {
- // TODO log?
- if (!event.isTerminal()) {
- close(PushEvent.error(ree));
- } else {
- safePush(pec, event);
- }
- }
- }
-
- @SuppressWarnings("boxing")
- private Promise<Long> doSendWithBackPressure(
- PushEventConsumer< ? super T> pec, PushEvent<T> event) {
- Deferred<Long> d = new Deferred<>();
- try {
- worker.execute(
- () -> d.resolve(System.nanoTime() + safePush(pec, event)));
- } catch (RejectedExecutionException ree) {
- // TODO log?
- if (!event.isTerminal()) {
- close(PushEvent.error(ree));
- return Promises.resolved(System.nanoTime());
- } else {
- return Promises
- .resolved(System.nanoTime() + safePush(pec, event));
- }
- }
- return d.getPromise();
- }
-
- private long safePush(PushEventConsumer< ? super T> pec,
- PushEvent<T> event) {
- try {
- long backpressure = pec.accept(event) * 1000000;
- if (backpressure < 0 && !event.isTerminal()) {
- closeConsumer(pec, PushEvent.close());
- return -1;
- }
- return backpressure;
- } catch (Exception e) {
- // TODO log?
- if (!event.isTerminal()) {
- closeConsumer(pec, PushEvent.error(e));
- }
- return -1;
- }
- }
-
- @Override
- public void close() {
- close(PushEvent.close());
- }
-
- private void close(PushEvent<T> event) {
- List<PushEventConsumer< ? super T>> toClose;
- Deferred<Void> toFail = null;
- synchronized (lock) {
- if(!closed) {
- closed = true;
-
- toClose = new ArrayList<>(connected);
- connected.clear();
- queue.clear();
-
- if(connectPromise != null) {
- toFail = connectPromise;
- connectPromise = null;
- }
- } else {
- toClose = emptyList();
- }
- }
-
- toClose.stream().forEach(pec -> doSend(pec, event));
-
- if (toFail != null) {
- toFail.resolveWith(closedConnectPromise());
- }
-
- onClose.run();
- }
-
- @Override
- public void publish(T t) {
- enqueueEvent(PushEvent.data(t));
- }
-
- @Override
- public void endOfStream() {
- enqueueEvent(PushEvent.close());
- }
-
- @Override
- public void error(Exception e) {
- enqueueEvent(PushEvent.error(e));
- }
-
- private void enqueueEvent(PushEvent<T> event) {
- synchronized (lock) {
- if (closed || connected.isEmpty()) {
- return;
- }
- }
-
- try {
- queuePolicy.doOffer(queue, event);
- boolean start;
- synchronized (lock) {
- start = !waitForFinishes && semaphore.tryAcquire();
- }
- if (start) {
- startWorker();
- }
- } catch (Exception e) {
- close(PushEvent.error(e));
- throw new IllegalStateException(
- "The queue policy threw an exception", e);
- }
- }
-
- @SuppressWarnings({
- "unchecked", "boxing"
- })
- private void startWorker() {
- worker.execute(() -> {
- try {
-
- for(;;) {
- PushEvent<T> event;
- List<PushEventConsumer< ? super T>> toCall;
- boolean resetWait = false;
- synchronized (lock) {
- if(waitForFinishes) {
- semaphore.release();
- while(waitForFinishes) {
- lock.notifyAll();
- lock.wait();
- }
- semaphore.acquire();
- }
-
- event = (PushEvent<T>) queue.poll();
-
- if(event == null) {
- break;
- }
-
- toCall = new ArrayList<>(connected);
- if (event.isTerminal()) {
- waitForFinishes = true;
- resetWait = true;
- connected.clear();
- while (!semaphore.tryAcquire(parallelism - 1)) {
- lock.wait();
- }
- }
- }
-
- List<Promise<Long>> calls = toCall.stream().map(pec -> {
- if (semaphore.tryAcquire()) {
- try {
- return doSendWithBackPressure(pec, event);
- } finally {
- semaphore.release();
- }
- } else {
- return Promises.resolved(
- System.nanoTime() + safePush(pec, event));
- }
- }).collect(toList());
-
- long toWait = Promises.all(calls)
- .map(l -> l.stream()
- .max(Long::compareTo)
- .orElseGet(() -> System.nanoTime()))
- .getValue() - System.nanoTime();
-
-
- if (toWait > 0) {
- scheduler.schedule(this::startWorker, toWait,
- NANOSECONDS);
- return;
- }
-
- if (resetWait == true) {
- synchronized (lock) {
- waitForFinishes = false;
- lock.notifyAll();
- }
- }
- }
-
- semaphore.release();
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- if (queue.peek() != null && semaphore.tryAcquire()) {
- try {
- startWorker();
- } catch (Exception e) {
- close(PushEvent.error(e));
- }
- }
- });
-
- }
-
- @Override
- public boolean isConnected() {
- synchronized (lock) {
- return !connected.isEmpty();
- }
- }
-
- @Override
- public Promise<Void> connectPromise() {
- synchronized (lock) {
- if (closed) {
- return closedConnectPromise();
- }
-
- if (connected.isEmpty()) {
- if (connectPromise == null) {
- connectPromise = new Deferred<>();
- }
- return connectPromise.getPromise();
- } else {
- return Promises.resolved(null);
- }
- }
- }
-
- private Promise<Void> closedConnectPromise() {
- return Promises.failed(new IllegalStateException(
- "This SimplePushEventSource is closed"));
- }
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
deleted file mode 100644
index faf9e6584..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/UnbufferedPushStreamImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.osgi.util.pushstream;
-
-import static java.util.Optional.ofNullable;
-import static org.osgi.util.pushstream.AbstractPushStreamImpl.State.*;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-class UnbufferedPushStreamImpl<T, U extends BlockingQueue<PushEvent< ? extends T>>>
- extends AbstractPushStreamImpl<T> implements PushStream<T> {
-
- protected final Function<PushEventConsumer<T>,AutoCloseable> connector;
-
- protected final AtomicReference<AutoCloseable> upstream = new AtomicReference<AutoCloseable>();
-
- UnbufferedPushStreamImpl(PushStreamProvider psp,
- Executor executor, ScheduledExecutorService scheduler,
- Function<PushEventConsumer<T>,AutoCloseable> connector) {
- super(psp, executor, scheduler);
- this.connector = connector;
- }
-
- @Override
- protected boolean close(PushEvent<T> event) {
- if(super.close(event)) {
- ofNullable(upstream.getAndSet(() -> {
- // This block doesn't need to do anything, but the presence
- // of the Closable is needed to prevent duplicate begins
- })).ifPresent(c -> {
- try {
- c.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- });
- return true;
- }
- return false;
- }
-
- @Override
- protected boolean begin() {
- if(closed.compareAndSet(BUILDING, STARTED)) {
- AutoCloseable toClose = connector.apply(this::handleEvent);
- if(!upstream.compareAndSet(null,toClose)) {
- //TODO log that we tried to connect twice...
- try {
- toClose.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- if (closed.get() == CLOSED
- && upstream.compareAndSet(toClose, null)) {
- // We closed before setting the upstream - close it now
- try {
- toClose.close();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- return true;
- }
- return false;
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/package-info.java b/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/package-info.java
deleted file mode 100644
index 6a28fa0b5..000000000
--- a/bundles/org.eclipse.equinox.log.stream/osgi/src/org/osgi/util/pushstream/package-info.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) OSGi Alliance (2015). All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-/**
- * Push Stream Package Version 1.0.
- *
- * <p>
- * Bundles wishing to use this package must list the package in the
- * Import-Package header of the bundle's manifest.
- *
- * <p>
- * Example import for consumers using the API in this package:
- * <p>
- * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,2.0)"}
- * <p>
- * Example import for providers implementing the API in this package:
- * <p>
- * {@code Import-Package: org.osgi.util.pushstream; version="[1.0,1.1)"}
- *
- * @author $Id$
- */
-
-@Version("1.0")
-package org.osgi.util.pushstream;
-
-import org.osgi.annotation.versioning.Version;
diff --git a/bundles/org.eclipse.equinox.log.stream/plugin.properties b/bundles/org.eclipse.equinox.log.stream/plugin.properties
deleted file mode 100644
index c4c991400..000000000
--- a/bundles/org.eclipse.equinox.log.stream/plugin.properties
+++ /dev/null
@@ -1,12 +0,0 @@
-###############################################################################
-# Copyright (c) 2005, 2009 IBM Corporation and others.
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Eclipse Public License v1.0
-# which accompanies this distribution, and is available at
-# http://www.eclipse.org/legal/epl-v10.html
-#
-# Contributors:
-# IBM Corporation - initial API and implementation
-###############################################################################
-bundleVendor = Eclipse.org - Equinox
-bundleName = Log Stream Provider
diff --git a/bundles/org.eclipse.equinox.log.stream/pom.xml b/bundles/org.eclipse.equinox.log.stream/pom.xml
deleted file mode 100644
index 7cef6b14c..000000000
--- a/bundles/org.eclipse.equinox.log.stream/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Copyright (c) 2012 Eclipse Foundation.
- All rights reserved. This program and the accompanying materials
- are made available under the terms of the Eclipse Distribution License v1.0
- which accompanies this distribution, and is available at
- http://www.eclipse.org/org/documents/edl-v10.php
-
- Contributors:
- Igor Fedorenko - initial implementation
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>rt.equinox.framework</artifactId>
- <groupId>org.eclipse.equinox.framework</groupId>
- <version>4.8.0-SNAPSHOT</version>
- <relativePath>../../</relativePath>
- </parent>
- <groupId>org.eclipse.equinox</groupId>
- <artifactId>org.eclipse.equinox.log.stream</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <packaging>eclipse-plugin</packaging>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.eclipse.tycho</groupId>
- <artifactId>tycho-compiler-plugin</artifactId>
- <version>${tycho.version}</version>
- <configuration>
- <compilerArgs>
- <arg>-nowarn:[${project.basedir}/osgi/src]</arg>
- </compilerArgs>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java
deleted file mode 100644
index 9c54ed7a0..000000000
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogEntrySource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2017 IBM Corporation and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * IBM Corporation - initial API and implementation
- *******************************************************************************/
-package org.eclipse.equinox.internal.log.stream;
-
-import java.io.Closeable;
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import org.osgi.service.log.LogEntry;
-import org.osgi.service.log.LogReaderService;
-import org.osgi.util.pushstream.PushEvent;
-import org.osgi.util.pushstream.PushEventConsumer;
-import org.osgi.util.pushstream.PushEventSource;
-import org.osgi.util.pushstream.PushStream;
-import org.osgi.util.tracker.ServiceTracker;
-
-public class LogEntrySource implements PushEventSource<LogEntry> {
- private final Set<PushEventConsumer<? super LogEntry>> consumers = new CopyOnWriteArraySet<>();
- private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory;
- private volatile PushStream<LogEntry> logStream;
- private final ReentrantLock historyLock = new ReentrantLock();
-
- public LogEntrySource(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory) {
- this.withHistory = withHistory;
-
- }
-
- public PushStream<LogEntry> getLogStream() {
- return logStream;
- }
-
- public void setLogStream(PushStream<LogEntry> logStream) {
- this.logStream = logStream;
- }
-
- /* Open method isused to connect to the source and begin receiving a stream of events.
- * It returns an AutoCloseable which can be used to close the event stream.
- * If the close method is called on this object then the stream is terminated by sending a close event.
- * (non-Javadoc)
- * @see org.osgi.util.pushstream.PushEventSource#open(org.osgi.util.pushstream.PushEventConsumer)
- */
-
- @Override
- public Closeable open(PushEventConsumer<? super LogEntry> aec) throws Exception {
-
- LinkedBlockingDeque<LogEntry> historyList = new LinkedBlockingDeque<>();
-
- if (!consumers.add(aec)) {
- throw new IllegalStateException("Cannot add the same consumer multiple times"); //$NON-NLS-1$
- }
-
- /*when history is not equal to null then we acquire a lock to provide the full history
- * to the consumer first before any other new entries
- */
- if (withHistory != null) {
- historyLock.lock();
- try {
- AtomicReference<LogReaderService> readerRef = withHistory.getService();
- LogReaderService reader = readerRef.get();
- if (reader != null) {
- // Enumeration has the most recent entry first
- Enumeration<LogEntry> e = reader.getLog();
- if (e != null) {
- while (e.hasMoreElements()) {
- historyList.add(e.nextElement());
- }
- }
- //Logging the history in the order of their appearance
- if (historyList != null) {
- while (!historyList.isEmpty()) {
- LogEntry logEntry = historyList.removeLast();
- logged(logEntry);
- }
- }
- }
- } finally {
- historyLock.unlock();
- }
- }
-
- Closeable result = () -> {
- if (consumers.remove(aec)) {
- try {
- aec.accept(PushEvent.close());
- } catch (Exception e) {
- // ignore here for log stream
- }
- }
- };
-
- return result;
- }
-
- public void logged(LogEntry entry) {
- if (withHistory != null) {
- historyLock.lock();
- }
-
- /*consumer accepts the incoming log entries and returns a back pressure.
- * A return of zero indicates that event delivery may continue immediately.
- * A positive return value indicates that the source should delay sending any further events for the requested number of milliseconds.
- * A return value of -1 indicates that no further events should be sent and that the stream can be closed.
- * @see org.osgi.util.pushstream.PushEventConsumer<T>
- */
- try {
- for (PushEventConsumer<? super LogEntry> consumer : consumers) {
- try {
- long status = consumer.accept(PushEvent.data(entry));
-
- if (status < 0) {
- consumer.accept(PushEvent.close());
- }
-
- } catch (Exception e) {
- // we ignore exceptions here for log stream
- }
- }
- } finally {
- if (withHistory != null) {
- historyLock.unlock();
- }
-
- }
- }
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
deleted file mode 100644
index 8cf9a5cfa..000000000
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamManager.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2017 IBM Corporation and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * IBM Corporation - initial API and implementation
- *******************************************************************************/
-package org.eclipse.equinox.internal.log.stream;
-
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.log.LogEntry;
-import org.osgi.service.log.LogListener;
-import org.osgi.service.log.LogReaderService;
-import org.osgi.service.log.stream.LogStreamProvider;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
-
-/* LogStreamManager is used to start and stop the bundle and keeps the track of the logs using the
- * ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>> which listens to
- * the incoming logs using the LogListener. It is also responsible to provide service tracker
- * and each log entry to the LogStreamProviderFactory.
- *
- */
-public class LogStreamManager implements BundleActivator, ServiceTrackerCustomizer<LogReaderService, AtomicReference<LogReaderService>>, LogListener {
- private ServiceRegistration<LogStreamProvider> logStreamServiceRegistration;
- private LogStreamProviderFactory logStreamProviderFactory;
- private ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
- BundleContext context;
- ReentrantLock eventProducerLock = new ReentrantLock();
-
- /*
- * (non-Javadoc)
- * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
- */
- @Override
- public void start(BundleContext bc) throws Exception {
-
- this.context = bc;
- logReaderService = new ServiceTracker<>(context, LogReaderService.class, this);
- logReaderService.open();
-
- logStreamProviderFactory = new LogStreamProviderFactory(logReaderService);
- logStreamServiceRegistration = context.registerService(LogStreamProvider.class, logStreamProviderFactory, null);
-
- }
-
- /*
- * (non-Javadoc)
- * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
- */
- @Override
- public void stop(BundleContext bundleContext) throws Exception {
- logReaderService.close();
- logStreamServiceRegistration.unregister();
- logStreamServiceRegistration = null;
- }
-
- /*
- * (non-Javadoc)
- * @see org.osgi.util.tracker.ServiceTrackerCustomizer#addingService(org.osgi.framework.ServiceReference)
- */
-
- @Override
- public AtomicReference<LogReaderService> addingService(ServiceReference<LogReaderService> reference) {
- AtomicReference<LogReaderService> tracked = new AtomicReference<>();
- modifiedService(reference, tracked);
- return tracked;
- }
-
- /*
- * (non-Javadoc)
- * @see org.osgi.util.tracker.ServiceTrackerCustomizer#modifiedService(org.osgi.framework.ServiceReference, java.lang.Object)
- */
- @Override
- public void modifiedService(ServiceReference<LogReaderService> modifiedServiceRef, AtomicReference<LogReaderService> modifiedTracked) {
- eventProducerLock.lock();
- try {
- // Check if the currently used reader service is lower ranked that the modified serviceRef
- ServiceReference<LogReaderService> currentServiceRef = logReaderService.getServiceReference();
- if (currentServiceRef == null || modifiedServiceRef.compareTo(currentServiceRef) > 0) {
- // The modified service reference is higher ranked than the currently used one;
- // Use the modified service reference instead.
- LogReaderService readerService = context.getService(modifiedServiceRef);
- if (readerService != null) {
- if (modifiedTracked.get() == null) {
- // update our tracked object for the reference with the real service
- modifiedTracked.set(readerService);
- }
- // remove our listener from the currently used service
- if (currentServiceRef != null) {
- AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentServiceRef);
- if (currentTracked != null) {
- LogReaderService currentLogReader = currentTracked.get();
- if (currentLogReader != null) {
- // we were really using this service;
- // remove our listener and unget the service
- currentLogReader.removeLogListener(this);
- context.ungetService(currentServiceRef);
- // finally null out our tracked reference
- currentTracked.set(null);
- }
- }
- }
-
- readerService.addLogListener(this);
- }
- }
- } finally {
- eventProducerLock.unlock();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see org.osgi.util.tracker.ServiceTrackerCustomizer#removedService(org.osgi.framework.ServiceReference, java.lang.Object)
- */
- @Override
- public void removedService(ServiceReference<LogReaderService> removedRef, AtomicReference<LogReaderService> removedTracked) {
- eventProducerLock.lock();
- try {
- LogReaderService removedLogReader = removedTracked.get();
- if (removedLogReader != null) {
- // remove the listener
- removedLogReader.removeLogListener(this);
- context.ungetService(removedRef);
- removedTracked.set(null);
- }
- ServiceReference<LogReaderService> currentRef = logReaderService.getServiceReference();
- if (currentRef != null) {
- AtomicReference<LogReaderService> currentTracked = logReaderService.getService(currentRef);
- if (currentTracked != null) {
- LogReaderService currentLogReader = currentTracked.get();
- if (currentLogReader == null) {
- currentLogReader = context.getService(currentRef);
- currentTracked.set(currentLogReader);
- }
- if (currentLogReader != null) {
- currentLogReader.addLogListener(this);
- }
- }
- }
- } finally {
- eventProducerLock.unlock();
- }
- }
-
- /* It is used to post each log entry to the LogStreamProviderFactory
- * (non-Javadoc)
- * @see org.osgi.service.log.LogListener#logged(org.osgi.service.log.LogEntry)
- */
-
- @Override
- public void logged(LogEntry entry) {
-
- logStreamProviderFactory.postLogEntry(entry);
- }
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
deleted file mode 100644
index f47d0fa19..000000000
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderFactory.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2017 IBM Corporation and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * IBM Corporation - initial API and implementation
- *******************************************************************************/
-package org.eclipse.equinox.internal.log.stream;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.ServiceFactory;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.log.LogEntry;
-import org.osgi.service.log.LogReaderService;
-import org.osgi.service.log.stream.LogStreamProvider;
-import org.osgi.util.tracker.ServiceTracker;
-
-public class LogStreamProviderFactory implements ServiceFactory<LogStreamProvider> {
-
- Map<Bundle, LogStreamProviderImpl> providers = new HashMap<>();
- ReentrantReadWriteLock eventProducerLock = new ReentrantReadWriteLock();
- ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
-
- public LogStreamProviderFactory(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
- this.logReaderService = logReaderService;
- }
-
- /*Reader of providers map
- * 1) for each provider
- * - post entry to provider
- */
- public void postLogEntry(LogEntry entry) {
- eventProducerLock.readLock().lock();
- try {
- for (LogStreamProviderImpl provider : providers.values()) {
- provider.logged(entry);
- }
- } finally {
- eventProducerLock.readLock().unlock();
- }
-
- }
-
- /* Writer to providers map
- * 1) create new LogStreamProviderImpl
- * 2) put new instance in map
- * 3) return new instance
- * (non-Javadoc)
- * @see org.osgi.framework.ServiceFactory#getService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration)
- */
-
- @Override
- public LogStreamProviderImpl getService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration) {
- LogStreamProviderImpl logStreamProviderImpl = new LogStreamProviderImpl(logReaderService);
- eventProducerLock.writeLock().lock();
- try {
- providers.put(bundle, logStreamProviderImpl);
- return logStreamProviderImpl;
- } finally {
- eventProducerLock.writeLock().unlock();
- }
- }
-
- /* 1) Remove the logStreamProviderImpl instance associated with the bundle
- * 2) close all existing LogStreams from the provider, outside the write lock
- * (non-Javadoc)
- * @see org.osgi.framework.ServiceFactory#ungetService(org.osgi.framework.Bundle, org.osgi.framework.ServiceRegistration, java.lang.Object)
- */
-
- @Override
- public void ungetService(Bundle bundle, ServiceRegistration<LogStreamProvider> registration, LogStreamProvider service) {
-
- LogStreamProviderImpl logStreamProviderImpl;
-
- eventProducerLock.writeLock().lock();
- try {
- logStreamProviderImpl = providers.remove(bundle);
- } finally {
- eventProducerLock.writeLock().unlock();
- }
-
- logStreamProviderImpl.close();
-
- }
-
-}
diff --git a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java b/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
deleted file mode 100644
index e32bf225e..000000000
--- a/bundles/org.eclipse.equinox.log.stream/src/org/eclipse/equinox/internal/log/stream/LogStreamProviderImpl.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*******************************************************************************
- * Copyright (c) 2017 IBM Corporation and others.
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * which accompanies this distribution, and is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * Contributors:
- * IBM Corporation - initial API and implementation
- *******************************************************************************/
-package org.eclipse.equinox.internal.log.stream;
-
-import java.util.Collections;
-import java.util.Set;
-import java.util.WeakHashMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.osgi.service.log.LogEntry;
-import org.osgi.service.log.LogReaderService;
-import org.osgi.service.log.stream.LogStreamProvider;
-import org.osgi.util.pushstream.PushEvent;
-import org.osgi.util.pushstream.PushStream;
-import org.osgi.util.pushstream.PushStreamBuilder;
-import org.osgi.util.pushstream.PushStreamProvider;
-import org.osgi.util.tracker.ServiceTracker;
-
-public class LogStreamProviderImpl implements LogStreamProvider {
- private final PushStreamProvider pushStreamProvider = new PushStreamProvider();
- private final ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService;
- private final WeakHashMap<LogEntrySource, Boolean> weakMap = new WeakHashMap<>();
- private final Set<LogEntrySource> logEntrySources = Collections.newSetFromMap(weakMap);
-
- private final ReentrantReadWriteLock historyLock = new ReentrantReadWriteLock();
-
- public LogStreamProviderImpl(ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> logReaderService) {
- this.logReaderService = logReaderService;
- }
-
- /* Create a PushStream of {@link LogEntry} objects.
- * The returned PushStream is an unbuffered stream with a parallelism of one.
- * (non-Javadoc)
- * @see org.osgi.service.log.stream.LogStreamProvider#createStream(org.osgi.service.log.stream.LogStreamProvider.Options[])
- */
-
- @Override
- public PushStream<LogEntry> createStream(Options... options) {
- ServiceTracker<LogReaderService, AtomicReference<LogReaderService>> withHistory = null;
- if (options != null) {
- for (Options option : options) {
- if (Options.HISTORY.equals(option)) {
- withHistory = logReaderService;
- }
- }
- }
-
- // A write lock is acquired in order to add logEntrySource into the Set of logEntrySources.
- historyLock.writeLock().lock();
- try {
- LogEntrySource logEntrySource = new LogEntrySource(withHistory);
- PushStreamBuilder<LogEntry, BlockingQueue<PushEvent<? extends LogEntry>>> streamBuilder = pushStreamProvider.buildStream(logEntrySource);
- //creating an unbuffered stream
- PushStream<LogEntry> logStream = streamBuilder.unbuffered().create();
- logEntrySource.setLogStream(logStream);
- // Adding to sources makes the source start listening for new entries
- logEntrySources.add(logEntrySource);
- return logStream;
- } finally {
- historyLock.writeLock().unlock();
- }
- }
-
- /*
- * Send the incoming log entries to the logEntrySource.logged(entry) for the consumer to accept it.
- */
- public void logged(LogEntry entry) {
- historyLock.readLock().lock();
- try {
- for (LogEntrySource logEntrySource : logEntrySources) {
- logEntrySource.logged(entry);
- }
- } finally {
- historyLock.readLock().unlock();
- }
- }
-
- /*
- * Closing the stream for each source.
- */
- public void close() {
- PushStream<LogEntry> logStream;
- historyLock.readLock().lock();
- try {
- for (LogEntrySource logEntrySource : logEntrySources) {
- logStream = logEntrySource.getLogStream();
- try {
- logStream.close();
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } finally {
- historyLock.readLock().unlock();
- }
- }
-
-}
diff --git a/pom.xml b/pom.xml
index 980b0c430..79542f9a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,6 @@
<module>bundles/org.eclipse.osgi.services</module>
<module>bundles/org.eclipse.osgi.util</module>
<module>bundles/org.eclipse.osgi/supplement</module>
- <module>bundles/org.eclipse.equinox.log.stream</module>
<module>bundles/org.eclipse.equinox.launcher</module>
<module>bundles/org.eclipse.equinox.launcher.cocoa.macosx.x86_64</module>

Back to the top