diff options
Diffstat (limited to 'plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java')
-rw-r--r-- | plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java new file mode 100644 index 0000000000..dd5aa142b4 --- /dev/null +++ b/plugins/org.eclipse.net4j.jms/src/org/eclipse/net4j/internal/jms/MessageConsumerImpl.java @@ -0,0 +1,159 @@ +/*************************************************************************** + * Copyright (c) 2004 - 2007 Eike Stepper, Germany. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eike Stepper - initial API and implementation + **************************************************************************/ +package org.eclipse.net4j.internal.jms; + +import org.eclipse.net4j.internal.jms.bundle.OM; +import org.eclipse.net4j.internal.util.lifecycle.Lifecycle; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class MessageConsumerImpl extends Lifecycle implements MessageConsumer +{ + private static final long DO_NOT_WAIT = -1L; + + private SessionImpl session; + + private long consumerID; + + private DestinationImpl destination; + + private String messageSelector; + + private MessageListener messageListener; + + /** + * Incoming messages + */ + private BlockingQueue<MessageImpl> messages = new LinkedBlockingQueue(); + + public MessageConsumerImpl(SessionImpl session, long consumerID, DestinationImpl destination, String messageSelector) + { + this.session = session; + this.consumerID = consumerID; + this.destination = destination; + this.messageSelector = messageSelector; + } + + public SessionImpl getSession() + { + return session; + } + + public long getConsumerID() + { + return consumerID; + } + + public DestinationImpl getDestination() + { + return destination; + } + + public String getMessageSelector() + { + return messageSelector; + } + + public MessageListener getMessageListener() + { + return messageListener; + } + + public void setMessageListener(MessageListener listener) + { + messageListener = listener; + if (messageListener != null) + { + session.addWork(this); + } + } + + public Message receive() throws JMSException + { + return receive(Long.MAX_VALUE); + } + + public Message receive(long timeout) throws JMSException + { + if (messageListener != null) + { + throw new JMSException("Consumer is configured for asynchronous delivery"); + } + + if (timeout == DO_NOT_WAIT) + { + return messages.poll(); + } + + try + { + return messages.poll(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ex) + { + throw new JMSException(ex.getMessage()); + } + } + + public Message receiveNoWait() throws JMSException + { + return receive(DO_NOT_WAIT); + } + + public void close() + { + throw new NotYetImplementedException(); + } + + public void handleServerMessage(MessageImpl message) + { + messages.add(message); + if (messageListener != null) + { + session.addWork(this); + } + } + + public void dispatchMessage() + { + MessageListener listener = messageListener; + if (listener != null) + { + MessageImpl message = messages.poll(); + if (message == null) + { + OM.LOG.warn("No message to dispatch"); + return; + } + + try + { + listener.onMessage(message); + if (!session.getTransacted() && session.getAcknowledgeMode() != Session.CLIENT_ACKNOWLEDGE) + { + session.acknowledgeMessages(this); + } + } + catch (RuntimeException ex) + { + OM.LOG.warn(ex); + } + } + } +} |