Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java')
-rw-r--r--plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java159
1 files changed, 159 insertions, 0 deletions
diff --git a/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java
new file mode 100644
index 0000000000..dd5aa142b4
--- /dev/null
+++ b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java
@@ -0,0 +1,159 @@
+/***************************************************************************
+ * Copyright (c) 2004 - 2007 Eike Stepper, Germany.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Eike Stepper - initial API and implementation
+ **************************************************************************/
+package org.eclipse.net4j.internal.jms;
+
+import org.eclipse.net4j.internal.jms.bundle.OM;
+import org.eclipse.net4j.internal.util.lifecycle.Lifecycle;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class MessageConsumerImpl extends Lifecycle implements MessageConsumer
+{
+ private static final long DO_NOT_WAIT = -1L;
+
+ private SessionImpl session;
+
+ private long consumerID;
+
+ private DestinationImpl destination;
+
+ private String messageSelector;
+
+ private MessageListener messageListener;
+
+ /**
+ * Incoming messages
+ */
+ private BlockingQueue<MessageImpl> messages = new LinkedBlockingQueue();
+
+ public MessageConsumerImpl(SessionImpl session, long consumerID, DestinationImpl destination, String messageSelector)
+ {
+ this.session = session;
+ this.consumerID = consumerID;
+ this.destination = destination;
+ this.messageSelector = messageSelector;
+ }
+
+ public SessionImpl getSession()
+ {
+ return session;
+ }
+
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public DestinationImpl getDestination()
+ {
+ return destination;
+ }
+
+ public String getMessageSelector()
+ {
+ return messageSelector;
+ }
+
+ public MessageListener getMessageListener()
+ {
+ return messageListener;
+ }
+
+ public void setMessageListener(MessageListener listener)
+ {
+ messageListener = listener;
+ if (messageListener != null)
+ {
+ session.addWork(this);
+ }
+ }
+
+ public Message receive() throws JMSException
+ {
+ return receive(Long.MAX_VALUE);
+ }
+
+ public Message receive(long timeout) throws JMSException
+ {
+ if (messageListener != null)
+ {
+ throw new JMSException("Consumer is configured for asynchronous delivery");
+ }
+
+ if (timeout == DO_NOT_WAIT)
+ {
+ return messages.poll();
+ }
+
+ try
+ {
+ return messages.poll(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ex)
+ {
+ throw new JMSException(ex.getMessage());
+ }
+ }
+
+ public Message receiveNoWait() throws JMSException
+ {
+ return receive(DO_NOT_WAIT);
+ }
+
+ public void close()
+ {
+ throw new NotYetImplementedException();
+ }
+
+ public void handleServerMessage(MessageImpl message)
+ {
+ messages.add(message);
+ if (messageListener != null)
+ {
+ session.addWork(this);
+ }
+ }
+
+ public void dispatchMessage()
+ {
+ MessageListener listener = messageListener;
+ if (listener != null)
+ {
+ MessageImpl message = messages.poll();
+ if (message == null)
+ {
+ OM.LOG.warn("No message to dispatch");
+ return;
+ }
+
+ try
+ {
+ listener.onMessage(message);
+ if (!session.getTransacted() && session.getAcknowledgeMode() != Session.CLIENT_ACKNOWLEDGE)
+ {
+ session.acknowledgeMessages(this);
+ }
+ }
+ catch (RuntimeException ex)
+ {
+ OM.LOG.warn(ex);
+ }
+ }
+ }
+}

Back to the top