diff options
author | slewis | 2013-11-20 23:22:09 +0000 |
---|---|---|
committer | slewis | 2013-11-20 23:22:09 +0000 |
commit | 93bdbb1c89767cac98401787a41694630e82b2f7 (patch) | |
tree | 0cec14520d3a5efed4cd64495c5572f44cc865c5 /incubation | |
parent | b7bfc73ef77d17364320bb76824cc877ebdf4493 (diff) | |
download | org.eclipse.ecf-93bdbb1c89767cac98401787a41694630e82b2f7.tar.gz org.eclipse.ecf-93bdbb1c89767cac98401787a41694630e82b2f7.tar.xz org.eclipse.ecf-93bdbb1c89767cac98401787a41694630e82b2f7.zip |
More additions to the Paho-based mqtt provider
Diffstat (limited to 'incubation')
7 files changed, 124 insertions, 30 deletions
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java index d10be9693..6b1690bf8 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java @@ -1,10 +1,13 @@ package org.eclipse.ecf.provider.mqtt.paho.container; import java.io.IOException; +import java.io.Serializable; import java.util.Map; +import org.eclipse.core.runtime.IStatus; import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.core.util.Trace; import org.eclipse.ecf.provider.comm.IConnectionListener; import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace; @@ -21,15 +24,15 @@ public abstract class AbstractPahoConnection { protected MqttClient client; protected String mqttTopic; protected boolean connected; - + protected String getMqttTopic() { return this.mqttTopic; } - + public AbstractPahoConnection(PahoID clientID) { this.clientID = clientID; } - + protected MqttCallback callback = new MqttCallback() { public void connectionLost(Throwable cause) { handleConnectionLost(cause); @@ -53,14 +56,15 @@ public abstract class AbstractPahoConnection { // TODO Auto-generated method stub } - protected void handleMessageArrived(String topic2, MqttMessage message) { - // TODO Auto-generated method stub - } + protected abstract void handleMessageArrived(String topic2, + MqttMessage message); - protected synchronized void connectAndSubscribe(PahoID targetID, MqttConnectOptions opts) throws ECFException { + protected synchronized void connectAndSubscribe(PahoID targetID, + MqttConnectOptions opts) throws ECFException { // Create client try { - this.client = new MqttClient(targetID.getServerURI(), targetID.getClientId()); + this.client = new MqttClient(targetID.getServerURI(), + targetID.getClientId()); // Set callback this.client.setCallback(callback); // Connect to broker with connectOpts @@ -72,16 +76,19 @@ public abstract class AbstractPahoConnection { this.client.subscribe(targetID.getClientId()); this.connected = true; } catch (MqttException e) { - throw new ECFException("Could not connect to targetID" + targetID.getName()); + throw new ECFException("Could not connect to targetID" + + targetID.getName()); } } - + @SuppressWarnings("rawtypes") public Map getProperties() { return null; } + public void addListener(IConnectionListener listener) { } + public void removeListener(IConnectionListener listener) { } @@ -111,10 +118,10 @@ public abstract class AbstractPahoConnection { try { this.client.publish(getMqttTopic(), message.createMessage()); } catch (MqttException e) { - throw new IOException(e.getMessage(),e); + throw new IOException(e.getMessage(), e); } } - + public synchronized void disconnect() { if (isConnected()) { try { @@ -130,9 +137,75 @@ public abstract class AbstractPahoConnection { public synchronized void sendAsynch(ID receiver, byte[] data) throws IOException { - if (!isConnected()) throw new IOException("PahoConnection not connected"); - if (receiver != null && !PahoNamespace.NAME.equals(receiver.getNamespace().getName())) throw new IOException("receiver not in PahoID namespace"); + if (!isConnected()) + throw new IOException("PahoConnection not connected"); + if (receiver != null + && !PahoNamespace.NAME + .equals(receiver.getNamespace().getName())) + throw new IOException("receiver not in PahoID namespace"); publishMessage(new PahoMessage((PahoID) receiver, data)); } + protected Object synch = new Object(); + protected boolean waitDone; + protected Serializable reply; + protected int connectWaitDuration = 15000; + + public Object sendSynch(ID receiver, byte[] data) throws IOException { + if (!isConnected()) + throw new IOException("PahoConnection not connected"); + if (receiver == null) + throw new IOException("receiver id must not be null"); + if (receiver != null + && !PahoNamespace.NAME + .equals(receiver.getNamespace().getName())) + throw new IOException("receiver not in PahoID namespace"); + return sendAndWait(new SyncPahoRequest((PahoID) receiver, data), + connectWaitDuration); + } + + private String PLUGIN_ID = "org.eclipse.ecf.provider.mqtt.paho"; + + protected void throwIOException(String method, String msg, Throwable t) + throws IOException { + Trace.throwing(PLUGIN_ID, PahoDebugOptions.EXCEPTIONS_CATCHING, + this.getClass(), method, t); + throw new IOException(msg + ": " + t.getMessage()); //$NON-NLS-1$ + } + + protected void traceAndLogExceptionCatch(int code, String method, + Throwable e) { + Trace.catching(PLUGIN_ID, PahoDebugOptions.EXCEPTIONS_CATCHING, + this.getClass(), method, e); + } + + private Serializable sendAndWait(SyncPahoRequest syncPahoRequest, + int waitDuration) throws IOException { + synchronized (synch) { + try { + waitDone = false; + long waittimeout = System.currentTimeMillis() + waitDuration; + reply = null; + this.client.publish(getMqttTopic(), + syncPahoRequest.createMessage()); + while (!waitDone + && (waittimeout - System.currentTimeMillis() > 0)) { + synch.wait(waitDuration / 10); + } + waitDone = true; + if (reply == null) + throw new IOException("timeout waiting for response"); //$NON-NLS-1$ + } catch (MqttException e) { + Trace.catching(PLUGIN_ID, PahoDebugOptions.EXCEPTIONS_CATCHING, + this.getClass(), "sendAndWait", e); //$NON-NLS-1$ + throwIOException("sendAndWait", "MqttException in sendAndWait", //$NON-NLS-1$ //$NON-NLS-2$ + e); + } catch (InterruptedException e) { + traceAndLogExceptionCatch(IStatus.ERROR, + "handleTopicMessage", e); //$NON-NLS-1$ + } + return reply; + } + } + } diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java index 30cbce7d9..8b796b660 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java @@ -1,7 +1,5 @@ package org.eclipse.ecf.provider.mqtt.paho.container; -import java.io.IOException; - import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.ECFException; import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; @@ -27,9 +25,9 @@ public class PahoClientConnection extends AbstractPahoConnection implements MqttConnectOptions connectOpts = createConnectionOptions(targetID, data, timeout); - + connectAndSubscribe(pahoTargetID, connectOpts); - + try { // publish to topic this.client.publish(pahoTargetID.getTopic(), new MqttMessage()); @@ -64,9 +62,10 @@ public class PahoClientConnection extends AbstractPahoConnection implements return false; } - public Object sendSynch(ID receiver, byte[] data) throws IOException { + @Override + protected void handleMessageArrived(String topic2, MqttMessage message) { // TODO Auto-generated method stub - return null; + } } diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java index 82c79e1bb..f17ef61fd 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java @@ -8,7 +8,8 @@ import org.eclipse.ecf.core.provider.BaseRemoteServiceContainerInstantiator; public class PahoClientContainerInstantiator extends BaseRemoteServiceContainerInstantiator { - public IContainer createInstance(ContainerTypeDescription description, Object[] parameters) throws ContainerCreateException { + public IContainer createInstance(ContainerTypeDescription description, + Object[] parameters) throws ContainerCreateException { // XXX todo return null; } diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java new file mode 100644 index 000000000..b2a34c0a2 --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java @@ -0,0 +1,7 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +public interface PahoDebugOptions { + + public static String EXCEPTIONS_CATCHING = "exceptions catching"; + +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java index 8d13accbd..29f68002e 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java @@ -13,20 +13,20 @@ public class PahoMessage implements Serializable { private PahoID targetID; private byte[] bytes; - + public PahoMessage(PahoID targetID, byte[] bytes) { this.targetID = targetID; this.bytes = bytes; } - + public PahoID getTargetID() { return this.targetID; } - + public byte[] getBytes() { return this.bytes; } - + public MqttMessage createMessage() throws IOException { byte[] objectBytes = SOContainer.serialize(this); return new MqttMessage(objectBytes); diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java index 2012ffe0a..eed0562c4 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java @@ -1,13 +1,13 @@ package org.eclipse.ecf.provider.mqtt.paho.container; -import java.io.IOException; - import org.eclipse.ecf.core.identity.ID; import org.eclipse.ecf.core.util.ECFException; import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; +import org.eclipse.paho.client.mqttv3.MqttMessage; -public class PahoServerConnection extends AbstractPahoConnection implements ISynchAsynchConnection { +public class PahoServerConnection extends AbstractPahoConnection implements + ISynchAsynchConnection { public PahoServerConnection(PahoID clientID) { super(clientID); @@ -33,9 +33,10 @@ public class PahoServerConnection extends AbstractPahoConnection implements ISyn return false; } - public Object sendSynch(ID receiver, byte[] data) throws IOException { + @Override + protected void handleMessageArrived(String topic2, MqttMessage message) { // TODO Auto-generated method stub - return null; + } } diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java new file mode 100644 index 000000000..c5048c19f --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java @@ -0,0 +1,13 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; + +public class SyncPahoResponse extends PahoMessage { + + private static final long serialVersionUID = 4085439339371979310L; + + public SyncPahoResponse(PahoID targetID, byte[] bytes) { + super(targetID, bytes); + } + +} |