Skip to main content
aboutsummaryrefslogtreecommitdiffstats
blob: 968f668f60abbbd1d8b6791c51e72e8056d92dc2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
/*
 * Copyright (c) OSGi Alliance (2015, 2016). All Rights Reserved.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.osgi.util.pushstream;

import static org.osgi.util.pushstream.PushEvent.EventType.*;

import org.osgi.annotation.versioning.ProviderType;

/**
 * A PushEvent is an immutable object that is transferred through a
 * communication channel to push information to a downstream consumer. The event
 * has three different types:
 * <ul>
 * <li>{@link EventType#DATA} – Provides access to a typed data element in the
 * stream.
 * <li>{@link EventType#CLOSE} – The stream is closed. After receiving this
 * event, no more events will follow.
 * <li>{@link EventType#ERROR} – The stream ran into an unrecoverable problem
 * and is sending the reason downstream. The stream is closed and no more events
 * will follow after this event.
 * </ul>
 *
 * @param <T> The payload type of the event.
 * @Immutable
 */
@ProviderType
public abstract class PushEvent<T> {

	/**
	 * The type of a {@link PushEvent}.
	 */
	public static enum EventType {
		/**
		 * A data event forming part of the stream
		 */
		DATA,
		/**
		 * An error event that indicates streaming has failed and that no more
		 * events will arrive
		 */
		ERROR,
		/**
		 * An event that indicates that the stream has terminated normally
		 */
		CLOSE
	}

	/**
	 * Package private default constructor.
	 */
	PushEvent() {}

	/**
	 * Get the type of this event.
	 * 
	 * @return The type of this event.
	 */
	public abstract EventType getType();

	/**
	 * Return the data for this event.
	 * 
	 * @return The data payload.
	 * @throws IllegalStateException if this event is not a
	 *             {@link EventType#DATA} event.
	 */
	public T getData() throws IllegalStateException {
		throw new IllegalStateException(
				"Not a DATA event, the event type is " + getType());
	}

	/**
	 * Return the error that terminated the stream.
	 * 
	 * @return The error that terminated the stream.
	 * @throws IllegalStateException if this event is not an
	 *             {@link EventType#ERROR} event.
	 */
	public Exception getFailure() throws IllegalStateException {
		throw new IllegalStateException(
				"Not an ERROR event, the event type is " + getType());
	}

	/**
	 * Answer if no more events will follow after this event.
	 * 
	 * @return {@code false} if this is a data event, otherwise {@code true}.
	 */
	public boolean isTerminal() {
		return true;
	}

	/**
	 * Create a new data event.
	 * 
	 * @param <T> The payload type.
	 * @param payload The payload.
	 * @return A new data event wrapping the specified payload.
	 */
	public static <T> PushEvent<T> data(T payload) {
		return new DataEvent<T>(payload);
	}

	/**
	 * Create a new error event.
	 * 
	 * @param <T> The payload type.
	 * @param e The error.
	 * @return A new error event with the specified error.
	 */
	public static <T> PushEvent<T> error(Exception e) {
		return new ErrorEvent<T>(e);
	}

	/**
	 * Create a new close event.
	 * 
	 * @param <T> The payload type.
	 * @return A new close event.
	 */
	public static <T> PushEvent<T> close() {
		return new CloseEvent<T>();
	}

	/**
	 * Convenience to cast a close/error event to another payload type. Since
	 * the payload type is not needed for these events this is harmless. This
	 * therefore allows you to forward the close/error event downstream without
	 * creating anew event.
	 * 
	 * @param <X> The new payload type.
	 * @return The current error or close event mapped to a new payload type.
	 * @throws IllegalStateException if the event is a {@link EventType#DATA}
	 *             event.
	 */
	public <X> PushEvent<X> nodata() throws IllegalStateException {
		@SuppressWarnings("unchecked")
		PushEvent<X> result = (PushEvent<X>) this;
		return result;
	}

	static final class DataEvent<T> extends PushEvent<T> {
		private final T data;

		DataEvent(T data) {
			this.data = data;
		}

		@Override
		public T getData() throws IllegalStateException {
			return data;
		}

		@Override
		public EventType getType() {
			return DATA;
		}

		@Override
		public boolean isTerminal() {
			return false;
		}

		@Override
		public <X> PushEvent<X> nodata() throws IllegalStateException {
			throw new IllegalStateException("This event is a DATA event");
		}
	}

	static final class ErrorEvent<T> extends PushEvent<T> {
		private final Exception error;

		ErrorEvent(Exception error) {
			this.error = error;
		}

		@Override
		public Exception getFailure() {
			return error;
		}

		@Override
		public EventType getType() {
			return ERROR;
		}
	}

	static final class CloseEvent<T> extends PushEvent<T> {
		@Override
		public EventType getType() {
			return CLOSE;
		}
	}
}

Back to the top