From f08d394caa00c3b15b62e2dadd6024be7a3ae589 Mon Sep 17 00:00:00 2001 From: slewis Date: Tue, 19 Nov 2013 14:46:46 -0800 Subject: Additions for paho provider --- .../mqtt/paho/container/PahoClientContainer.java | 3 +- .../mqtt/paho/container/PahoConnection.java | 75 ++++++++++++++++++++-- 2 files changed, 70 insertions(+), 8 deletions(-) (limited to 'incubation') diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java index a032cd4fa..5b7176372 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java @@ -7,7 +7,6 @@ import org.eclipse.ecf.core.sharedobject.ISharedObjectContainerConfig; import org.eclipse.ecf.provider.comm.ConnectionCreateException; import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; import org.eclipse.ecf.provider.generic.ClientSOContainer; -import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace; public class PahoClientContainer extends ClientSOContainer { @@ -24,7 +23,7 @@ public class PahoClientContainer extends ClientSOContainer { @Override protected ISynchAsynchConnection createConnection(ID targetID, Object data) throws ConnectionCreateException { - return new PahoConnection((PahoID) targetID,data); + return new PahoConnection(); } } diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java index 9fc781b72..68bff0f77 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java @@ -8,24 +8,88 @@ import org.eclipse.ecf.core.util.ECFException; import org.eclipse.ecf.provider.comm.IConnectionListener; import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; public class PahoConnection implements ISynchAsynchConnection { - public PahoConnection(PahoID targetID, Object data) { - // TODO Auto-generated constructor stub + private PahoID clientID; + private MqttClient client; + private String mqttTopic; + + private MqttCallback callback = new MqttCallback() { + + public void connectionLost(Throwable cause) { + // TODO Auto-generated method stub + + } + + public void messageArrived(String topic, MqttMessage message) + throws Exception { + // TODO Auto-generated method stub + + } + + public void deliveryComplete(IMqttDeliveryToken token) { + // TODO Auto-generated method stub + + } + + }; + + public PahoConnection() { } - public void sendAsynch(ID receiver, byte[] data) throws IOException { + public synchronized void sendAsynch(ID receiver, byte[] data) throws IOException { // TODO Auto-generated method stub } - public Object connect(ID targetID, Object data, int timeout) + public synchronized Object connect(ID targetID, Object data, int timeout) throws ECFException { + if (targetID == null) throw new ECFException("targetID cannot be null"); + if (!(targetID instanceof PahoID)) throw new ECFException("targetID must be in PahoID namespace"); + PahoID pahoTargetID = (PahoID) targetID; + + MqttConnectOptions connectOpts = createConnectionOptions(targetID, data, timeout); + String serverURI = null; + String clientId = null; + String topic = null; + this.mqttTopic = null; + // connect to broker + try { + this.client = new MqttClient(serverURI, clientId); + this.client.setCallback(callback); + this.client.connect(connectOpts); + this.client.subscribe(topic); + byte[] connectPayload = null; + this.client.publish(mqttTopic, new MqttMessage(connectPayload)); + } catch (MqttSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (MqttException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + // TODO Auto-generated method stub return null; } + private MqttConnectOptions createConnectionOptions(ID targetID, + Object data, int timeout) { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(timeout); + return options; + } + public void disconnect() { // TODO Auto-generated method stub @@ -37,8 +101,7 @@ public class PahoConnection implements ISynchAsynchConnection { } public ID getLocalID() { - // TODO Auto-generated method stub - return null; + return clientID; } public void start() { -- cgit v1.2.3