Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2013-11-20 22:56:30 +0000
committerslewis2013-11-20 22:56:30 +0000
commitb7bfc73ef77d17364320bb76824cc877ebdf4493 (patch)
tree07841838081e28376b6806eaaa7951c65f3955f7 /incubation
parentf08d394caa00c3b15b62e2dadd6024be7a3ae589 (diff)
downloadorg.eclipse.ecf-b7bfc73ef77d17364320bb76824cc877ebdf4493.tar.gz
org.eclipse.ecf-b7bfc73ef77d17364320bb76824cc877ebdf4493.tar.xz
org.eclipse.ecf-b7bfc73ef77d17364320bb76824cc877ebdf4493.zip
Additions for both server and client for Paho provider.
Diffstat (limited to 'incubation')
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java138
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java72
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java3
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java149
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java34
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java41
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java12
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java13
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java14
9 files changed, 321 insertions, 155 deletions
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
new file mode 100644
index 000000000..d10be9693
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
@@ -0,0 +1,138 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.provider.comm.IConnectionListener;
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public abstract class AbstractPahoConnection {
+
+ protected PahoID clientID;
+ protected MqttClient client;
+ protected String mqttTopic;
+ protected boolean connected;
+
+ protected String getMqttTopic() {
+ return this.mqttTopic;
+ }
+
+ public AbstractPahoConnection(PahoID clientID) {
+ this.clientID = clientID;
+ }
+
+ protected MqttCallback callback = new MqttCallback() {
+ public void connectionLost(Throwable cause) {
+ handleConnectionLost(cause);
+ }
+
+ public void messageArrived(String topic, MqttMessage message)
+ throws Exception {
+ handleMessageArrived(topic, message);
+ }
+
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ handleDeliveryComplete(token);
+ }
+ };
+
+ protected void handleConnectionLost(Throwable cause) {
+ // xxx todo
+ }
+
+ protected void handleDeliveryComplete(IMqttDeliveryToken token) {
+ // TODO Auto-generated method stub
+ }
+
+ protected void handleMessageArrived(String topic2, MqttMessage message) {
+ // TODO Auto-generated method stub
+ }
+
+ protected synchronized void connectAndSubscribe(PahoID targetID, MqttConnectOptions opts) throws ECFException {
+ // Create client
+ try {
+ this.client = new MqttClient(targetID.getServerURI(), targetID.getClientId());
+ // Set callback
+ this.client.setCallback(callback);
+ // Connect to broker with connectOpts
+ if (opts == null)
+ this.client.connect();
+ else
+ this.client.connect(opts);
+ // Subscribe to topic
+ this.client.subscribe(targetID.getClientId());
+ this.connected = true;
+ } catch (MqttException e) {
+ throw new ECFException("Could not connect to targetID" + targetID.getName());
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Map getProperties() {
+ return null;
+ }
+ public void addListener(IConnectionListener listener) {
+ }
+ public void removeListener(IConnectionListener listener) {
+ }
+
+ public ID getLocalID() {
+ return clientID;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Object getAdapter(Class adapter) {
+ return null;
+ }
+
+ protected MqttConnectOptions createConnectionOptions(ID targetID,
+ Object data, int timeout) {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setConnectionTimeout(timeout);
+ options.setKeepAliveInterval(timeout);
+ return options;
+ }
+
+ public boolean isConnected() {
+ return connected;
+ }
+
+ protected void publishMessage(PahoMessage message) throws IOException {
+ try {
+ this.client.publish(getMqttTopic(), message.createMessage());
+ } catch (MqttException e) {
+ throw new IOException(e.getMessage(),e);
+ }
+ }
+
+ public synchronized void disconnect() {
+ if (isConnected()) {
+ try {
+ this.client.disconnect();
+ } catch (MqttException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ this.client = null;
+ this.connected = false;
+ }
+ }
+
+ public synchronized void sendAsynch(ID receiver, byte[] data)
+ throws IOException {
+ if (!isConnected()) throw new IOException("PahoConnection not connected");
+ if (receiver != null && !PahoNamespace.NAME.equals(receiver.getNamespace().getName())) throw new IOException("receiver not in PahoID namespace");
+ publishMessage(new PahoMessage((PahoID) receiver, data));
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
new file mode 100644
index 000000000..30cbce7d9
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
@@ -0,0 +1,72 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.IOException;
+
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class PahoClientConnection extends AbstractPahoConnection implements
+ ISynchAsynchConnection {
+
+ public PahoClientConnection(PahoID clientID) {
+ super(clientID);
+ }
+
+ public synchronized Object connect(ID targetID, Object data, int timeout)
+ throws ECFException {
+ if (targetID == null)
+ throw new ECFException("targetID cannot be null");
+ if (!(targetID instanceof PahoID))
+ throw new ECFException("targetID must be in PahoID namespace");
+ PahoID pahoTargetID = (PahoID) targetID;
+
+ MqttConnectOptions connectOpts = createConnectionOptions(targetID,
+ data, timeout);
+
+ connectAndSubscribe(pahoTargetID, connectOpts);
+
+ try {
+ // publish to topic
+ this.client.publish(pahoTargetID.getTopic(), new MqttMessage());
+ } catch (MqttException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ // wait for response
+ // ConnectResponseMessage crm =
+ // AbstractMessage.createFromByteArray(bytes);
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void disconnect() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean isStarted() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Object sendSynch(ID receiver, byte[] data) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java
index 5b7176372..df4f83c59 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientContainer.java
@@ -23,7 +23,8 @@ public class PahoClientContainer extends ClientSOContainer {
@Override
protected ISynchAsynchConnection createConnection(ID targetID, Object data)
throws ConnectionCreateException {
- return new PahoConnection();
+ // XXX todo
+ return null;
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java
deleted file mode 100644
index 68bff0f77..000000000
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoConnection.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package org.eclipse.ecf.provider.mqtt.paho.container;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.eclipse.ecf.core.identity.ID;
-import org.eclipse.ecf.core.util.ECFException;
-import org.eclipse.ecf.provider.comm.IConnectionListener;
-import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
-import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttSecurityException;
-
-public class PahoConnection implements ISynchAsynchConnection {
-
- private PahoID clientID;
- private MqttClient client;
- private String mqttTopic;
-
- private MqttCallback callback = new MqttCallback() {
-
- public void connectionLost(Throwable cause) {
- // TODO Auto-generated method stub
-
- }
-
- public void messageArrived(String topic, MqttMessage message)
- throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- public void deliveryComplete(IMqttDeliveryToken token) {
- // TODO Auto-generated method stub
-
- }
-
- };
-
- public PahoConnection() {
- }
-
- public synchronized void sendAsynch(ID receiver, byte[] data) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- public synchronized Object connect(ID targetID, Object data, int timeout)
- throws ECFException {
- if (targetID == null) throw new ECFException("targetID cannot be null");
- if (!(targetID instanceof PahoID)) throw new ECFException("targetID must be in PahoID namespace");
- PahoID pahoTargetID = (PahoID) targetID;
-
- MqttConnectOptions connectOpts = createConnectionOptions(targetID, data, timeout);
- String serverURI = null;
- String clientId = null;
- String topic = null;
- this.mqttTopic = null;
- // connect to broker
- try {
- this.client = new MqttClient(serverURI, clientId);
- this.client.setCallback(callback);
- this.client.connect(connectOpts);
- this.client.subscribe(topic);
- byte[] connectPayload = null;
- this.client.publish(mqttTopic, new MqttMessage(connectPayload));
- } catch (MqttSecurityException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (MqttException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- // TODO Auto-generated method stub
- return null;
- }
-
- private MqttConnectOptions createConnectionOptions(ID targetID,
- Object data, int timeout) {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);
- options.setConnectionTimeout(timeout);
- options.setKeepAliveInterval(timeout);
- return options;
- }
-
- public void disconnect() {
- // TODO Auto-generated method stub
-
- }
-
- public boolean isConnected() {
- // TODO Auto-generated method stub
- return false;
- }
-
- public ID getLocalID() {
- return clientID;
- }
-
- public void start() {
- // TODO Auto-generated method stub
-
- }
-
- public void stop() {
- // TODO Auto-generated method stub
-
- }
-
- public boolean isStarted() {
- // TODO Auto-generated method stub
- return false;
- }
-
- @SuppressWarnings("rawtypes")
- public Map getProperties() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public void addListener(IConnectionListener listener) {
- // TODO Auto-generated method stub
-
- }
-
- public void removeListener(IConnectionListener listener) {
- // TODO Auto-generated method stub
-
- }
-
- @SuppressWarnings("rawtypes")
- public Object getAdapter(Class adapter) {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Object sendSynch(ID receiver, byte[] data) throws IOException {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
new file mode 100644
index 000000000..8d13accbd
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
@@ -0,0 +1,34 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.eclipse.ecf.provider.generic.SOContainer;
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class PahoMessage implements Serializable {
+
+ private static final long serialVersionUID = -8768793858838034483L;
+
+ private PahoID targetID;
+ private byte[] bytes;
+
+ public PahoMessage(PahoID targetID, byte[] bytes) {
+ this.targetID = targetID;
+ this.bytes = bytes;
+ }
+
+ public PahoID getTargetID() {
+ return this.targetID;
+ }
+
+ public byte[] getBytes() {
+ return this.bytes;
+ }
+
+ public MqttMessage createMessage() throws IOException {
+ byte[] objectBytes = SOContainer.serialize(this);
+ return new MqttMessage(objectBytes);
+ }
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
new file mode 100644
index 000000000..2012ffe0a
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
@@ -0,0 +1,41 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.IOException;
+
+import org.eclipse.ecf.core.identity.ID;
+import org.eclipse.ecf.core.util.ECFException;
+import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class PahoServerConnection extends AbstractPahoConnection implements ISynchAsynchConnection {
+
+ public PahoServerConnection(PahoID clientID) {
+ super(clientID);
+ }
+
+ public synchronized Object connect(ID targetID, Object data, int timeout)
+ throws ECFException {
+ throw new ECFException("Server cannot be connected");
+ }
+
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean isStarted() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Object sendSynch(ID receiver, byte[] data) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java
new file mode 100644
index 000000000..0f817ca1a
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerContainer.java
@@ -0,0 +1,12 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import org.eclipse.ecf.core.sharedobject.ISharedObjectContainerConfig;
+import org.eclipse.ecf.provider.generic.ServerSOContainer;
+
+public class PahoServerContainer extends ServerSOContainer {
+
+ public PahoServerContainer(ISharedObjectContainerConfig config) {
+ super(config);
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java
new file mode 100644
index 000000000..4e2aa010a
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java
@@ -0,0 +1,13 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class SyncPahoRequest extends PahoMessage {
+
+ private static final long serialVersionUID = 9073885283936597534L;
+
+ public SyncPahoRequest(PahoID targetID, byte[] bytes) {
+ super(targetID, bytes);
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java
index 83f52f8d6..2f2e1e628 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/identity/PahoID.java
@@ -11,13 +11,17 @@ public class PahoID extends StringID {
super(n, s);
}
- public String getTargetID() {
- // XXX TODO
+ public String getServerURI() {
return null;
}
-
- public String getClientID() {
- // XXX TODO
+
+ public String getClientId() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public String getTopic() {
+ // TODO Auto-generated method stub
return null;
}
}

Back to the top