diff options
author | slewis | 2013-11-20 22:56:30 +0000 |
---|---|---|
committer | slewis | 2013-11-20 22:56:30 +0000 |
commit | b7bfc73ef77d17364320bb76824cc877ebdf4493 (patch) | |
tree | 07841838081e28376b6806eaaa7951c65f3955f7 /incubation | |
parent | f08d394caa00c3b15b62e2dadd6024be7a3ae589 (diff) | |
download | org.eclipse.ecf-b7bfc73ef77d17364320bb76824cc877ebdf4493.tar.gz org.eclipse.ecf-b7bfc73ef77d17364320bb76824cc877ebdf4493.tar.xz org.eclipse.ecf-b7bfc73ef77d17364320bb76824cc877ebdf4493.zip |
Additions for both server and client for Paho provider.
Diffstat (limited to 'incubation')
9 files changed, 321 insertions, 155 deletions
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java new file mode 100644 index 000000000..d10be9693 --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java @@ -0,0 +1,138 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import java.io.IOException; +import java.util.Map; + +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.provider.comm.IConnectionListener; +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public abstract class AbstractPahoConnection { + + protected PahoID clientID; + protected MqttClient client; + protected String mqttTopic; + protected boolean connected; + + protected String getMqttTopic() { + return this.mqttTopic; + } + + public AbstractPahoConnection(PahoID clientID) { + this.clientID = clientID; + } + + protected MqttCallback callback = new MqttCallback() { + public void connectionLost(Throwable cause) { + handleConnectionLost(cause); + } + + public void messageArrived(String topic, MqttMessage message) + throws Exception { + handleMessageArrived(topic, message); + } + + public void deliveryComplete(IMqttDeliveryToken token) { + handleDeliveryComplete(token); + } + }; + + protected void handleConnectionLost(Throwable cause) { + // xxx todo + } + + protected void handleDeliveryComplete(IMqttDeliveryToken token) { + // TODO Auto-generated method stub + } + + protected void handleMessageArrived(String topic2, MqttMessage message) { + // TODO Auto-generated method stub + } + + protected synchronized void connectAndSubscribe(PahoID targetID, MqttConnectOptions opts) throws ECFException { + // Create client + try { + this.client = new MqttClient(targetID.getServerURI(), targetID.getClientId()); + // Set callback + this.client.setCallback(callback); + // Connect to broker with connectOpts + if (opts == null) + this.client.connect(); + else + this.client.connect(opts); + // Subscribe to topic + this.client.subscribe(targetID.getClientId()); + this.connected = true; + } catch (MqttException e) { + throw new ECFException("Could not connect to targetID" + targetID.getName()); + } + } + + @SuppressWarnings("rawtypes") + public Map getProperties() { + return null; + } + public void addListener(IConnectionListener listener) { + } + public void removeListener(IConnectionListener listener) { + } + + public ID getLocalID() { + return clientID; + } + + @SuppressWarnings("rawtypes") + public Object getAdapter(Class adapter) { + return null; + } + + protected MqttConnectOptions createConnectionOptions(ID targetID, + Object data, int timeout) { + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(timeout); + return options; + } + + public boolean isConnected() { + return connected; + } + + protected void publishMessage(PahoMessage message) throws IOException { + try { + this.client.publish(getMqttTopic(), message.createMessage()); + } catch (MqttException e) { + throw new IOException(e.getMessage(),e); + } + } + + public synchronized void disconnect() { + if (isConnected()) { + try { + this.client.disconnect(); + } catch (MqttException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + this.client = null; + this.connected = false; + } + } + + public synchronized void sendAsynch(ID receiver, byte[] data) + throws IOException { + if (!isConnected()) throw new IOException("PahoConnection not connected"); + if (receiver != null && !PahoNamespace.NAME.equals(receiver.getNamespace().getName())) throw new IOException("receiver not in PahoID namespace"); + publishMessage(new PahoMessage((PahoID) receiver, data)); + } + +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java new file mode 100644 index 000000000..30cbce7d9 --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java @@ -0,0 +1,72 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import java.io.IOException; + +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class PahoClientConnection extends AbstractPahoConnection implements + ISynchAsynchConnection { + + public PahoClientConnection(PahoID clientID) { + super(clientID); + } + + public synchronized Object connect(ID targetID, Object data, int timeout) + throws ECFException { + if (targetID == null) + throw new ECFException("targetID cannot be null"); + if (!(targetID instanceof PahoID)) + throw new ECFException("targetID must be in PahoID namespace"); + PahoID pahoTargetID = (PahoID) targetID; + + MqttConnectOptions connectOpts = createConnectionOptions(targetID, + data, timeout); + + connectAndSubscribe(pahoTargetID, connectOpts); + + try { + // publish to topic + this.client.publish(pahoTargetID.getTopic(), new MqttMessage()); + } catch (MqttException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + // wait for response + // ConnectResponseMessage crm = + // AbstractMessage.createFromByteArray(bytes); + // TODO Auto-generated method stub + return null; + } + + public void disconnect() { + // TODO Auto-generated method stub + + } + + public void start() { + // TODO Auto-generated method stub + + } + + public void stop() { + // TODO Auto-generated method stub + + } + + public boolean isStarted() { + // TODO Auto-generated method stub + return false; + } + + public Object sendSynch(ID receiver, byte[] data) throws IOException { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java index 5b7176372..df4f83c59 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java @@ -23,7 +23,8 @@ public class PahoClientContainer extends ClientSOContainer { @Override protected ISynchAsynchConnection createConnection(ID targetID, Object data) throws ConnectionCreateException { - return new PahoConnection(); + // XXX todo + return null; } } diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java deleted file mode 100644 index 68bff0f77..000000000 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java +++ /dev/null @@ -1,149 +0,0 @@ -package org.eclipse.ecf.provider.mqtt.paho.container; - -import java.io.IOException; -import java.util.Map; - -import org.eclipse.ecf.core.identity.ID; -import org.eclipse.ecf.core.util.ECFException; -import org.eclipse.ecf.provider.comm.IConnectionListener; -import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; -import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttSecurityException; - -public class PahoConnection implements ISynchAsynchConnection { - - private PahoID clientID; - private MqttClient client; - private String mqttTopic; - - private MqttCallback callback = new MqttCallback() { - - public void connectionLost(Throwable cause) { - // TODO Auto-generated method stub - - } - - public void messageArrived(String topic, MqttMessage message) - throws Exception { - // TODO Auto-generated method stub - - } - - public void deliveryComplete(IMqttDeliveryToken token) { - // TODO Auto-generated method stub - - } - - }; - - public PahoConnection() { - } - - public synchronized void sendAsynch(ID receiver, byte[] data) throws IOException { - // TODO Auto-generated method stub - - } - - public synchronized Object connect(ID targetID, Object data, int timeout) - throws ECFException { - if (targetID == null) throw new ECFException("targetID cannot be null"); - if (!(targetID instanceof PahoID)) throw new ECFException("targetID must be in PahoID namespace"); - PahoID pahoTargetID = (PahoID) targetID; - - MqttConnectOptions connectOpts = createConnectionOptions(targetID, data, timeout); - String serverURI = null; - String clientId = null; - String topic = null; - this.mqttTopic = null; - // connect to broker - try { - this.client = new MqttClient(serverURI, clientId); - this.client.setCallback(callback); - this.client.connect(connectOpts); - this.client.subscribe(topic); - byte[] connectPayload = null; - this.client.publish(mqttTopic, new MqttMessage(connectPayload)); - } catch (MqttSecurityException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (MqttException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - // TODO Auto-generated method stub - return null; - } - - private MqttConnectOptions createConnectionOptions(ID targetID, - Object data, int timeout) { - MqttConnectOptions options = new MqttConnectOptions(); - options.setCleanSession(true); - options.setConnectionTimeout(timeout); - options.setKeepAliveInterval(timeout); - return options; - } - - public void disconnect() { - // TODO Auto-generated method stub - - } - - public boolean isConnected() { - // TODO Auto-generated method stub - return false; - } - - public ID getLocalID() { - return clientID; - } - - public void start() { - // TODO Auto-generated method stub - - } - - public void stop() { - // TODO Auto-generated method stub - - } - - public boolean isStarted() { - // TODO Auto-generated method stub - return false; - } - - @SuppressWarnings("rawtypes") - public Map getProperties() { - // TODO Auto-generated method stub - return null; - } - - public void addListener(IConnectionListener listener) { - // TODO Auto-generated method stub - - } - - public void removeListener(IConnectionListener listener) { - // TODO Auto-generated method stub - - } - - @SuppressWarnings("rawtypes") - public Object getAdapter(Class adapter) { - // TODO Auto-generated method stub - return null; - } - - public Object sendSynch(ID receiver, byte[] data) throws IOException { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java new file mode 100644 index 000000000..8d13accbd --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java @@ -0,0 +1,34 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import java.io.IOException; +import java.io.Serializable; + +import org.eclipse.ecf.provider.generic.SOContainer; +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class PahoMessage implements Serializable { + + private static final long serialVersionUID = -8768793858838034483L; + + private PahoID targetID; + private byte[] bytes; + + public PahoMessage(PahoID targetID, byte[] bytes) { + this.targetID = targetID; + this.bytes = bytes; + } + + public PahoID getTargetID() { + return this.targetID; + } + + public byte[] getBytes() { + return this.bytes; + } + + public MqttMessage createMessage() throws IOException { + byte[] objectBytes = SOContainer.serialize(this); + return new MqttMessage(objectBytes); + } +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java new file mode 100644 index 000000000..2012ffe0a --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java @@ -0,0 +1,41 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import java.io.IOException; + +import org.eclipse.ecf.core.identity.ID; +import org.eclipse.ecf.core.util.ECFException; +import org.eclipse.ecf.provider.comm.ISynchAsynchConnection; +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; + +public class PahoServerConnection extends AbstractPahoConnection implements ISynchAsynchConnection { + + public PahoServerConnection(PahoID clientID) { + super(clientID); + } + + public synchronized Object connect(ID targetID, Object data, int timeout) + throws ECFException { + throw new ECFException("Server cannot be connected"); + } + + public void start() { + // TODO Auto-generated method stub + + } + + public void stop() { + // TODO Auto-generated method stub + + } + + public boolean isStarted() { + // TODO Auto-generated method stub + return false; + } + + public Object sendSynch(ID receiver, byte[] data) throws IOException { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java new file mode 100644 index 000000000..0f817ca1a --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java @@ -0,0 +1,12 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import org.eclipse.ecf.core.sharedobject.ISharedObjectContainerConfig; +import org.eclipse.ecf.provider.generic.ServerSOContainer; + +public class PahoServerContainer extends ServerSOContainer { + + public PahoServerContainer(ISharedObjectContainerConfig config) { + super(config); + } + +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java new file mode 100644 index 000000000..4e2aa010a --- /dev/null +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java @@ -0,0 +1,13 @@ +package org.eclipse.ecf.provider.mqtt.paho.container; + +import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID; + +public class SyncPahoRequest extends PahoMessage { + + private static final long serialVersionUID = 9073885283936597534L; + + public SyncPahoRequest(PahoID targetID, byte[] bytes) { + super(targetID, bytes); + } + +} diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java index 83f52f8d6..2f2e1e628 100644 --- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java +++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java @@ -11,13 +11,17 @@ public class PahoID extends StringID { super(n, s); } - public String getTargetID() { - // XXX TODO + public String getServerURI() { return null; } - - public String getClientID() { - // XXX TODO + + public String getClientId() { + // TODO Auto-generated method stub + return null; + } + + public String getTopic() { + // TODO Auto-generated method stub return null; } } |