Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2013-11-19 17:46:46 -0500
committerslewis2013-11-19 17:46:46 -0500
commitf08d394caa00c3b15b62e2dadd6024be7a3ae589 (patch)
tree26cea84be1f1d596bfde6fff1a4401b6d2683d31 /incubation
parent982ddf9c0d51967676fed3874db233e73a612b30 (diff)
downloadorg.eclipse.ecf-f08d394caa00c3b15b62e2dadd6024be7a3ae589.tar.gz
org.eclipse.ecf-f08d394caa00c3b15b62e2dadd6024be7a3ae589.tar.xz
org.eclipse.ecf-f08d394caa00c3b15b62e2dadd6024be7a3ae589.zip
Additions for paho provider
Diffstat (limited to 'incubation')
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java3
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java75
2 files changed, 70 insertions, 8 deletions
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java
index a032cd4fa..5b7176372 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java
@@ -7,7 +7,6 @@ import org.eclipse.ecf.core.sharedobject.ISharedObjectContainerConfig;
import org.eclipse.ecf.provider.comm.ConnectionCreateException;
import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
import org.eclipse.ecf.provider.generic.ClientSOContainer;
-import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace;
public class PahoClientContainer extends ClientSOContainer {
@@ -24,7 +23,7 @@ public class PahoClientContainer extends ClientSOContainer {
@Override
protected ISynchAsynchConnection createConnection(ID targetID, Object data)
throws ConnectionCreateException {
- return new PahoConnection((PahoID) targetID,data);
+ return new PahoConnection();
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java
index 9fc781b72..68bff0f77 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java
@@ -8,24 +8,88 @@ import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.provider.comm.IConnectionListener;
import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.MqttSecurityException;
public class PahoConnection implements ISynchAsynchConnection {
- public PahoConnection(PahoID targetID, Object data) {
- // TODO Auto-generated constructor stub
+ private PahoID clientID;
+ private MqttClient client;
+ private String mqttTopic;
+
+ private MqttCallback callback = new MqttCallback() {
+
+ public void connectionLost(Throwable cause) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void messageArrived(String topic, MqttMessage message)
+ throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // TODO Auto-generated method stub
+
+ }
+
+ };
+
+ public PahoConnection() {
}
- public void sendAsynch(ID receiver, byte[] data) throws IOException {
+ public synchronized void sendAsynch(ID receiver, byte[] data) throws IOException {
// TODO Auto-generated method stub
}
- public Object connect(ID targetID, Object data, int timeout)
+ public synchronized Object connect(ID targetID, Object data, int timeout)
throws ECFException {
+ if (targetID == null) throw new ECFException("targetID cannot be null");
+ if (!(targetID instanceof PahoID)) throw new ECFException("targetID must be in PahoID namespace");
+ PahoID pahoTargetID = (PahoID) targetID;
+
+ MqttConnectOptions connectOpts = createConnectionOptions(targetID, data, timeout);
+ String serverURI = null;
+ String clientId = null;
+ String topic = null;
+ this.mqttTopic = null;
+ // connect to broker
+ try {
+ this.client = new MqttClient(serverURI, clientId);
+ this.client.setCallback(callback);
+ this.client.connect(connectOpts);
+ this.client.subscribe(topic);
+ byte[] connectPayload = null;
+ this.client.publish(mqttTopic, new MqttMessage(connectPayload));
+ } catch (MqttSecurityException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (MqttException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
// TODO Auto-generated method stub
return null;
}
+ private MqttConnectOptions createConnectionOptions(ID targetID,
+ Object data, int timeout) {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setConnectionTimeout(timeout);
+ options.setKeepAliveInterval(timeout);
+ return options;
+ }
+
public void disconnect() {
// TODO Auto-generated method stub
@@ -37,8 +101,7 @@ public class PahoConnection implements ISynchAsynchConnection {
}
public ID getLocalID() {
- // TODO Auto-generated method stub
- return null;
+ return clientID;
}
public void start() {

Back to the top