Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2013-11-20 18:22:09 -0500
committerslewis2013-11-20 18:22:09 -0500
commit93bdbb1c89767cac98401787a41694630e82b2f7 (patch)
tree0cec14520d3a5efed4cd64495c5572f44cc865c5 /incubation
parentb7bfc73ef77d17364320bb76824cc877ebdf4493 (diff)
downloadorg.eclipse.ecf-93bdbb1c89767cac98401787a41694630e82b2f7.tar.gz
org.eclipse.ecf-93bdbb1c89767cac98401787a41694630e82b2f7.tar.xz
org.eclipse.ecf-93bdbb1c89767cac98401787a41694630e82b2f7.zip
More additions to the Paho-based mqtt provider
Diffstat (limited to 'incubation')
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java101
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java11
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java3
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java7
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java8
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java11
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java13
7 files changed, 124 insertions, 30 deletions
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
index d10be9693..6b1690bf8 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
@@ -1,10 +1,13 @@
package org.eclipse.ecf.provider.mqtt.paho.container;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Map;
+import org.eclipse.core.runtime.IStatus;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.core.util.Trace;
import org.eclipse.ecf.provider.comm.IConnectionListener;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace;
@@ -21,15 +24,15 @@ public abstract class AbstractPahoConnection {
protected MqttClient client;
protected String mqttTopic;
protected boolean connected;
-
+
protected String getMqttTopic() {
return this.mqttTopic;
}
-
+
public AbstractPahoConnection(PahoID clientID) {
this.clientID = clientID;
}
-
+
protected MqttCallback callback = new MqttCallback() {
public void connectionLost(Throwable cause) {
handleConnectionLost(cause);
@@ -53,14 +56,15 @@ public abstract class AbstractPahoConnection {
// TODO Auto-generated method stub
}
- protected void handleMessageArrived(String topic2, MqttMessage message) {
- // TODO Auto-generated method stub
- }
+ protected abstract void handleMessageArrived(String topic2,
+ MqttMessage message);
- protected synchronized void connectAndSubscribe(PahoID targetID, MqttConnectOptions opts) throws ECFException {
+ protected synchronized void connectAndSubscribe(PahoID targetID,
+ MqttConnectOptions opts) throws ECFException {
// Create client
try {
- this.client = new MqttClient(targetID.getServerURI(), targetID.getClientId());
+ this.client = new MqttClient(targetID.getServerURI(),
+ targetID.getClientId());
// Set callback
this.client.setCallback(callback);
// Connect to broker with connectOpts
@@ -72,16 +76,19 @@ public abstract class AbstractPahoConnection {
this.client.subscribe(targetID.getClientId());
this.connected = true;
} catch (MqttException e) {
- throw new ECFException("Could not connect to targetID" + targetID.getName());
+ throw new ECFException("Could not connect to targetID"
+ + targetID.getName());
}
}
-
+
@SuppressWarnings("rawtypes")
public Map getProperties() {
return null;
}
+
public void addListener(IConnectionListener listener) {
}
+
public void removeListener(IConnectionListener listener) {
}
@@ -111,10 +118,10 @@ public abstract class AbstractPahoConnection {
try {
this.client.publish(getMqttTopic(), message.createMessage());
} catch (MqttException e) {
- throw new IOException(e.getMessage(),e);
+ throw new IOException(e.getMessage(), e);
}
}
-
+
public synchronized void disconnect() {
if (isConnected()) {
try {
@@ -130,9 +137,75 @@ public abstract class AbstractPahoConnection {
public synchronized void sendAsynch(ID receiver, byte[] data)
throws IOException {
- if (!isConnected()) throw new IOException("PahoConnection not connected");
- if (receiver != null && !PahoNamespace.NAME.equals(receiver.getNamespace().getName())) throw new IOException("receiver not in PahoID namespace");
+ if (!isConnected())
+ throw new IOException("PahoConnection not connected");
+ if (receiver != null
+ && !PahoNamespace.NAME
+ .equals(receiver.getNamespace().getName()))
+ throw new IOException("receiver not in PahoID namespace");
publishMessage(new PahoMessage((PahoID) receiver, data));
}
+ protected Object synch = new Object();
+ protected boolean waitDone;
+ protected Serializable reply;
+ protected int connectWaitDuration = 15000;
+
+ public Object sendSynch(ID receiver, byte[] data) throws IOException {
+ if (!isConnected())
+ throw new IOException("PahoConnection not connected");
+ if (receiver == null)
+ throw new IOException("receiver id must not be null");
+ if (receiver != null
+ && !PahoNamespace.NAME
+ .equals(receiver.getNamespace().getName()))
+ throw new IOException("receiver not in PahoID namespace");
+ return sendAndWait(new SyncPahoRequest((PahoID) receiver, data),
+ connectWaitDuration);
+ }
+
+ private String PLUGIN_ID = "org.eclipse.ecf.provider.mqtt.paho";
+
+ protected void throwIOException(String method, String msg, Throwable t)
+ throws IOException {
+ Trace.throwing(PLUGIN_ID, PahoDebugOptions.EXCEPTIONS_CATCHING,
+ this.getClass(), method, t);
+ throw new IOException(msg + ": " + t.getMessage()); //$NON-NLS-1$
+ }
+
+ protected void traceAndLogExceptionCatch(int code, String method,
+ Throwable e) {
+ Trace.catching(PLUGIN_ID, PahoDebugOptions.EXCEPTIONS_CATCHING,
+ this.getClass(), method, e);
+ }
+
+ private Serializable sendAndWait(SyncPahoRequest syncPahoRequest,
+ int waitDuration) throws IOException {
+ synchronized (synch) {
+ try {
+ waitDone = false;
+ long waittimeout = System.currentTimeMillis() + waitDuration;
+ reply = null;
+ this.client.publish(getMqttTopic(),
+ syncPahoRequest.createMessage());
+ while (!waitDone
+ && (waittimeout - System.currentTimeMillis() > 0)) {
+ synch.wait(waitDuration / 10);
+ }
+ waitDone = true;
+ if (reply == null)
+ throw new IOException("timeout waiting for response"); //$NON-NLS-1$
+ } catch (MqttException e) {
+ Trace.catching(PLUGIN_ID, PahoDebugOptions.EXCEPTIONS_CATCHING,
+ this.getClass(), "sendAndWait", e); //$NON-NLS-1$
+ throwIOException("sendAndWait", "MqttException in sendAndWait", //$NON-NLS-1$ //$NON-NLS-2$
+ e);
+ } catch (InterruptedException e) {
+ traceAndLogExceptionCatch(IStatus.ERROR,
+ "handleTopicMessage", e); //$NON-NLS-1$
+ }
+ return reply;
+ }
+ }
+
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
index 30cbce7d9..8b796b660 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
@@ -1,7 +1,5 @@
package org.eclipse.ecf.provider.mqtt.paho.container;
-import java.io.IOException;
-
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
@@ -27,9 +25,9 @@ public class PahoClientConnection extends AbstractPahoConnection implements
MqttConnectOptions connectOpts = createConnectionOptions(targetID,
data, timeout);
-
+
connectAndSubscribe(pahoTargetID, connectOpts);
-
+
try {
// publish to topic
this.client.publish(pahoTargetID.getTopic(), new MqttMessage());
@@ -64,9 +62,10 @@ public class PahoClientConnection extends AbstractPahoConnection implements
return false;
}
- public Object sendSynch(ID receiver, byte[] data) throws IOException {
+ @Override
+ protected void handleMessageArrived(String topic2, MqttMessage message) {
// TODO Auto-generated method stub
- return null;
+
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java
index 82c79e1bb..f17ef61fd 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainerInstantiator.java
@@ -8,7 +8,8 @@ import org.eclipse.ecf.core.provider.BaseRemoteServiceContainerInstantiator;
public class PahoClientContainerInstantiator extends
BaseRemoteServiceContainerInstantiator {
- public IContainer createInstance(ContainerTypeDescription description, Object[] parameters) throws ContainerCreateException {
+ public IContainer createInstance(ContainerTypeDescription description,
+ Object[] parameters) throws ContainerCreateException {
// XXX todo
return null;
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java
new file mode 100644
index 000000000..b2a34c0a2
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java
@@ -0,0 +1,7 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+public interface PahoDebugOptions {
+
+ public static String EXCEPTIONS_CATCHING = "exceptions catching";
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
index 8d13accbd..29f68002e 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
@@ -13,20 +13,20 @@ public class PahoMessage implements Serializable {
private PahoID targetID;
private byte[] bytes;
-
+
public PahoMessage(PahoID targetID, byte[] bytes) {
this.targetID = targetID;
this.bytes = bytes;
}
-
+
public PahoID getTargetID() {
return this.targetID;
}
-
+
public byte[] getBytes() {
return this.bytes;
}
-
+
public MqttMessage createMessage() throws IOException {
byte[] objectBytes = SOContainer.serialize(this);
return new MqttMessage(objectBytes);
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
index 2012ffe0a..eed0562c4 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
@@ -1,13 +1,13 @@
package org.eclipse.ecf.provider.mqtt.paho.container;
-import java.io.IOException;
-
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
-public class PahoServerConnection extends AbstractPahoConnection implements ISynchAsynchConnection {
+public class PahoServerConnection extends AbstractPahoConnection implements
+ ISynchAsynchConnection {
public PahoServerConnection(PahoID clientID) {
super(clientID);
@@ -33,9 +33,10 @@ public class PahoServerConnection extends AbstractPahoConnection implements ISyn
return false;
}
- public Object sendSynch(ID receiver, byte[] data) throws IOException {
+ @Override
+ protected void handleMessageArrived(String topic2, MqttMessage message) {
// TODO Auto-generated method stub
- return null;
+
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java
new file mode 100644
index 000000000..c5048c19f
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java
@@ -0,0 +1,13 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class SyncPahoResponse extends PahoMessage {
+
+ private static final long serialVersionUID = 4085439339371979310L;
+
+ public SyncPahoResponse(PahoID targetID, byte[] bytes) {
+ super(targetID, bytes);
+ }
+
+}

Back to the top