Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorslewis2013-11-21 01:10:00 -0500
committerslewis2013-11-21 01:10:00 -0500
commitc3eeaafcb2b07553c30b4409eb0cf936528ee279 (patch)
tree71bcaa39497c0a0a4bfee869a2bfed87e3cdcf48 /incubation
parent93bdbb1c89767cac98401787a41694630e82b2f7 (diff)
downloadorg.eclipse.ecf-c3eeaafcb2b07553c30b4409eb0cf936528ee279.tar.gz
org.eclipse.ecf-c3eeaafcb2b07553c30b4409eb0cf936528ee279.tar.xz
org.eclipse.ecf-c3eeaafcb2b07553c30b4409eb0cf936528ee279.zip
more additions for paho provider
Diffstat (limited to 'incubation')
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/META-INF/MANIFEST.MF1
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java180
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AsyncPahoMessage.java15
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/ConnectRequestMessage.java16
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java58
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java1
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java46
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java26
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java13
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java13
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncRequestMessage.java15
-rw-r--r--incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncResponseMessage.java15
12 files changed, 292 insertions, 107 deletions
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/META-INF/MANIFEST.MF b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/META-INF/MANIFEST.MF
index 12e9e96fb..a7504612b 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/META-INF/MANIFEST.MF
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/META-INF/MANIFEST.MF
@@ -16,4 +16,5 @@ Import-Package: org.eclipse.ecf.core;version="3.0.0",
org.eclipse.ecf.core.sharedobject;version="2.3.0",
org.eclipse.ecf.core.util;version="3.2.0",
org.eclipse.ecf.provider.comm;version="4.3.0",
+ org.eclipse.ecf.provider.comm.tcp;version="4.3.0",
org.eclipse.ecf.provider.generic;version="4.3.0"
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
index 6b1690bf8..e0ff378b1 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AbstractPahoConnection.java
@@ -2,13 +2,21 @@ package org.eclipse.ecf.provider.mqtt.paho.container;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.core.util.Trace;
+import org.eclipse.ecf.provider.comm.AsynchEvent;
+import org.eclipse.ecf.provider.comm.ConnectionEvent;
+import org.eclipse.ecf.provider.comm.DisconnectEvent;
import org.eclipse.ecf.provider.comm.IConnectionListener;
+import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.provider.comm.ISynchAsynchEventHandler;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoNamespace;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -18,19 +26,67 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-public abstract class AbstractPahoConnection {
+public abstract class AbstractPahoConnection implements
+ISynchAsynchConnection {
protected PahoID clientID;
protected MqttClient client;
protected String mqttTopic;
protected boolean connected;
+ protected boolean started;
+ protected List<IConnectionListener> connectionListeners = new ArrayList<IConnectionListener>();
+ protected ISynchAsynchEventHandler handler;
+
+ protected void fireListenersConnect(ConnectionEvent event) {
+ List<IConnectionListener> toNotify = null;
+ synchronized (connectionListeners) {
+ toNotify = new ArrayList<IConnectionListener>(connectionListeners);
+ }
+ for (Iterator<IConnectionListener> i = toNotify.iterator(); i.hasNext();) {
+ IConnectionListener l = (IConnectionListener) i.next();
+ l.handleConnectEvent(event);
+ }
+ }
+
+ protected void fireListenersDisconnect(DisconnectEvent event) {
+ List<IConnectionListener> toNotify = null;
+ synchronized (connectionListeners) {
+ toNotify = new ArrayList<IConnectionListener>(connectionListeners);
+ }
+ for (Iterator<IConnectionListener> i = toNotify.iterator(); i.hasNext();) {
+ IConnectionListener l = (IConnectionListener) i.next();
+ l.handleDisconnectEvent(event);
+ }
+ }
+
+ public void start() {
+ this.started = true;
+ }
+
+ public void stop() {
+ this.started = false;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ protected boolean isActive() {
+ return isConnected() && isStarted();
+ }
protected String getMqttTopic() {
return this.mqttTopic;
}
- public AbstractPahoConnection(PahoID clientID) {
+ public AbstractPahoConnection(PahoID clientID,
+ ISynchAsynchEventHandler handler) {
+ if (clientID == null)
+ throw new IllegalArgumentException("clientID cannot be null");
this.clientID = clientID;
+ if (handler == null)
+ throw new IllegalArgumentException("handler cannot be null");
+ this.handler = handler;
}
protected MqttCallback callback = new MqttCallback() {
@@ -56,9 +112,6 @@ public abstract class AbstractPahoConnection {
// TODO Auto-generated method stub
}
- protected abstract void handleMessageArrived(String topic2,
- MqttMessage message);
-
protected synchronized void connectAndSubscribe(PahoID targetID,
MqttConnectOptions opts) throws ECFException {
// Create client
@@ -86,10 +139,26 @@ public abstract class AbstractPahoConnection {
return null;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.eclipse.ecf.core.comm.IConnection#addCommEventListener(org.eclipse
+ * .ecf.core.comm.IConnectionListener)
+ */
public void addListener(IConnectionListener listener) {
+ connectionListeners.add(listener);
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.eclipse.ecf.core.comm.IConnection#removeCommEventListener(org.eclipse
+ * .ecf.core.comm.IConnectionListener)
+ */
public void removeListener(IConnectionListener listener) {
+ connectionListeners.remove(listener);
}
public ID getLocalID() {
@@ -116,7 +185,7 @@ public abstract class AbstractPahoConnection {
protected void publishMessage(PahoMessage message) throws IOException {
try {
- this.client.publish(getMqttTopic(), message.createMessage());
+ this.client.publish(getMqttTopic(), message.toMqttMessage());
} catch (MqttException e) {
throw new IOException(e.getMessage(), e);
}
@@ -137,13 +206,14 @@ public abstract class AbstractPahoConnection {
public synchronized void sendAsynch(ID receiver, byte[] data)
throws IOException {
- if (!isConnected())
+ if (!isActive())
throw new IOException("PahoConnection not connected");
if (receiver != null
&& !PahoNamespace.NAME
.equals(receiver.getNamespace().getName()))
throw new IOException("receiver not in PahoID namespace");
- publishMessage(new PahoMessage((PahoID) receiver, data));
+ publishMessage(new AsyncPahoMessage((PahoID) getLocalID(),
+ (PahoID) receiver, data));
}
protected Object synch = new Object();
@@ -151,8 +221,12 @@ public abstract class AbstractPahoConnection {
protected Serializable reply;
protected int connectWaitDuration = 15000;
+ protected PahoID getLocalPahoID() {
+ return (PahoID) getLocalID();
+ }
+
public Object sendSynch(ID receiver, byte[] data) throws IOException {
- if (!isConnected())
+ if (!isActive())
throw new IOException("PahoConnection not connected");
if (receiver == null)
throw new IOException("receiver id must not be null");
@@ -160,8 +234,8 @@ public abstract class AbstractPahoConnection {
&& !PahoNamespace.NAME
.equals(receiver.getNamespace().getName()))
throw new IOException("receiver not in PahoID namespace");
- return sendAndWait(new SyncPahoRequest((PahoID) receiver, data),
- connectWaitDuration);
+ return sendAndWait(new SyncRequestMessage(getLocalPahoID(),
+ (PahoID) receiver, data), connectWaitDuration);
}
private String PLUGIN_ID = "org.eclipse.ecf.provider.mqtt.paho";
@@ -179,7 +253,7 @@ public abstract class AbstractPahoConnection {
this.getClass(), method, e);
}
- private Serializable sendAndWait(SyncPahoRequest syncPahoRequest,
+ private Serializable sendAndWait(SyncRequestMessage syncRequestMessage,
int waitDuration) throws IOException {
synchronized (synch) {
try {
@@ -187,7 +261,7 @@ public abstract class AbstractPahoConnection {
long waittimeout = System.currentTimeMillis() + waitDuration;
reply = null;
this.client.publish(getMqttTopic(),
- syncPahoRequest.createMessage());
+ syncRequestMessage.toMqttMessage());
while (!waitDone
&& (waittimeout - System.currentTimeMillis() > 0)) {
synch.wait(waitDuration / 10);
@@ -208,4 +282,84 @@ public abstract class AbstractPahoConnection {
}
}
+ protected void handleMessageArrived(String topic, MqttMessage message) {
+ if (!isActive())
+ return;
+ if (topic.equals(this.getMqttTopic()))
+ return;
+ Object response = null;
+ try {
+ response = PahoMessage.deserialize(message.getPayload());
+ if (response instanceof PahoMessage) {
+ PahoMessage pahoMessage = (PahoMessage) response;
+ PahoID fromID = pahoMessage.getFromID();
+ if (fromID == null) {
+ Trace.exiting(PLUGIN_ID, PahoDebugOptions.METHODS_ENTERING,
+ this.getClass(),
+ "fromID=null...ignoring PahoMessage " + pahoMessage); //$NON-NLS-1$
+ return;
+ }
+ if (fromID.equals(getLocalID())) {
+ Trace.exiting(
+ PLUGIN_ID,
+ PahoDebugOptions.METHODS_ENTERING,
+ this.getClass(),
+ "fromID=localID...ignoring PahoMessage " + pahoMessage); //$NON-NLS-1$
+ return;
+ }
+ PahoID targetID = pahoMessage.getTargetID();
+ if (targetID == null) {
+ if (pahoMessage instanceof AsyncPahoMessage)
+ handleTopicMessage((AsyncPahoMessage) pahoMessage);
+ else
+ Trace.trace(PLUGIN_ID,
+ "onMessage.received invalid message to group"); //$NON-NLS-1$
+ } else {
+ if (targetID.equals(getLocalID())) {
+ if (pahoMessage instanceof AsyncPahoMessage)
+ handleTopicMessage((AsyncPahoMessage) pahoMessage);
+ else if (pahoMessage instanceof SyncRequestMessage)
+ handleSynchRequest((SyncRequestMessage) pahoMessage);
+ else if (pahoMessage instanceof SyncResponseMessage)
+ handleSynchResponse((SyncResponseMessage) pahoMessage);
+ else
+ Trace.trace(
+ PLUGIN_ID,
+ "onMessage.msg invalid message to " + targetID); //$NON-NLS-1$
+ } else
+ Trace.trace(
+ PLUGIN_ID,
+ "onMessage.msg " + pahoMessage + " not intended for " + targetID); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ } else
+ // received bogus message...ignore
+ Trace.trace(
+ PLUGIN_ID,
+ "onMessage: received non-ECFMessage...ignoring " + response); //$NON-NLS-1$
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected void handleTopicMessage(AsyncPahoMessage message) {
+ if (isActive()) {
+ try {
+ this.handler.handleAsynchEvent(new AsynchEvent(this,message.getData()));
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ protected void handleSynchResponse(SyncResponseMessage pahoMessage) {
+ synchronized (this.synch) {
+ this.reply = pahoMessage.getData();
+ this.waitDone = true;
+ this.synch.notify();
+ }
+ }
+
+ protected abstract void handleSynchRequest(SyncRequestMessage pahoMessage);
+
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AsyncPahoMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AsyncPahoMessage.java
new file mode 100644
index 000000000..85faa4c54
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/AsyncPahoMessage.java
@@ -0,0 +1,15 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.Serializable;
+
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class AsyncPahoMessage extends PahoMessage {
+
+ private static final long serialVersionUID = -2538908397359614161L;
+
+ public AsyncPahoMessage(PahoID fromID, PahoID targetID, Serializable data) {
+ super(fromID, targetID, data);
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/ConnectRequestMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/ConnectRequestMessage.java
new file mode 100644
index 000000000..c13150804
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/ConnectRequestMessage.java
@@ -0,0 +1,16 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.IOException;
+
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class ConnectRequestMessage extends PahoMessage {
+
+ private static final long serialVersionUID = 3750692684824242655L;
+
+ public ConnectRequestMessage(PahoID fromID, PahoID targetID, Object data)
+ throws IOException {
+ super(fromID, targetID, serialize(data));
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
index 8b796b660..63a0eb005 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoClientConnection.java
@@ -1,18 +1,19 @@
package org.eclipse.ecf.provider.mqtt.paho.container;
+import java.io.IOException;
+
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
-import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.provider.comm.ISynchAsynchEventHandler;
+import org.eclipse.ecf.provider.comm.tcp.ConnectResultMessage;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-public class PahoClientConnection extends AbstractPahoConnection implements
- ISynchAsynchConnection {
+public class PahoClientConnection extends AbstractPahoConnection {
- public PahoClientConnection(PahoID clientID) {
- super(clientID);
+ public PahoClientConnection(PahoID clientID,
+ ISynchAsynchEventHandler handler) {
+ super(clientID, handler);
}
public synchronized Object connect(ID targetID, Object data, int timeout)
@@ -27,45 +28,24 @@ public class PahoClientConnection extends AbstractPahoConnection implements
data, timeout);
connectAndSubscribe(pahoTargetID, connectOpts);
-
+ ConnectResultMessage response;
try {
- // publish to topic
- this.client.publish(pahoTargetID.getTopic(), new MqttMessage());
- } catch (MqttException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ response = (ConnectResultMessage) sendSynch(pahoTargetID,
+ new ConnectRequestMessage((PahoID) getLocalID(),
+ pahoTargetID, data).serialize());
+ } catch (IOException e) {
+ throw new ECFException("Could not connect to target=" + targetID, e);
}
- // wait for response
- // ConnectResponseMessage crm =
- // AbstractMessage.createFromByteArray(bytes);
- // TODO Auto-generated method stub
- return null;
- }
-
- public void disconnect() {
- // TODO Auto-generated method stub
-
- }
-
- public void start() {
- // TODO Auto-generated method stub
-
- }
-
- public void stop() {
- // TODO Auto-generated method stub
+ if (response == null)
+ throw new ECFException("Received null response from group manager");
- }
-
- public boolean isStarted() {
- // TODO Auto-generated method stub
- return false;
+ return response.getData();
}
@Override
- protected void handleMessageArrived(String topic2, MqttMessage message) {
+ protected void handleSynchRequest(SyncRequestMessage pahoMessage) {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java
index b2a34c0a2..72e22a645 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoDebugOptions.java
@@ -3,5 +3,6 @@ package org.eclipse.ecf.provider.mqtt.paho.container;
public interface PahoDebugOptions {
public static String EXCEPTIONS_CATCHING = "exceptions catching";
+ public static String METHODS_ENTERING = "methods entering";
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
index 29f68002e..d126d9fcb 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoMessage.java
@@ -1,9 +1,12 @@
package org.eclipse.ecf.provider.mqtt.paho.container;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
-import org.eclipse.ecf.provider.generic.SOContainer;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -11,24 +14,49 @@ public class PahoMessage implements Serializable {
private static final long serialVersionUID = -8768793858838034483L;
+ private PahoID fromID;
private PahoID targetID;
- private byte[] bytes;
+ private Serializable data;
- public PahoMessage(PahoID targetID, byte[] bytes) {
+ public static byte[] serialize(Object object) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(object);
+ return bos.toByteArray();
+ }
+
+ public static Object deserialize(byte[] bytes) throws IOException,
+ ClassNotFoundException {
+ ObjectInputStream oos = new ObjectInputStream(new ByteArrayInputStream(
+ bytes));
+ return oos.readObject();
+ }
+
+ public PahoMessage(PahoID fromID, PahoID targetID, Serializable data) {
+ if (fromID == null)
+ throw new IllegalArgumentException("fromID cannot be null");
+ this.fromID = fromID;
this.targetID = targetID;
- this.bytes = bytes;
+ this.data = data;
+ }
+
+ public PahoID getFromID() {
+ return this.fromID;
}
public PahoID getTargetID() {
return this.targetID;
}
- public byte[] getBytes() {
- return this.bytes;
+ public Serializable getData() {
+ return this.data;
+ }
+
+ public byte[] serialize() throws IOException {
+ return serialize(this);
}
- public MqttMessage createMessage() throws IOException {
- byte[] objectBytes = SOContainer.serialize(this);
- return new MqttMessage(objectBytes);
+ public MqttMessage toMqttMessage() throws IOException {
+ return new MqttMessage(serialize());
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
index eed0562c4..9b061f1a6 100644
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/PahoServerConnection.java
@@ -3,14 +3,15 @@ package org.eclipse.ecf.provider.mqtt.paho.container;
import org.eclipse.ecf.core.identity.ID;
import org.eclipse.ecf.core.util.ECFException;
import org.eclipse.ecf.provider.comm.ISynchAsynchConnection;
+import org.eclipse.ecf.provider.comm.ISynchAsynchEventHandler;
import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PahoServerConnection extends AbstractPahoConnection implements
ISynchAsynchConnection {
- public PahoServerConnection(PahoID clientID) {
- super(clientID);
+ public PahoServerConnection(PahoID clientID,
+ ISynchAsynchEventHandler handler) {
+ super(clientID, handler);
}
public synchronized Object connect(ID targetID, Object data, int timeout)
@@ -18,25 +19,10 @@ public class PahoServerConnection extends AbstractPahoConnection implements
throw new ECFException("Server cannot be connected");
}
- public void start() {
- // TODO Auto-generated method stub
-
- }
-
- public void stop() {
- // TODO Auto-generated method stub
-
- }
-
- public boolean isStarted() {
- // TODO Auto-generated method stub
- return false;
- }
-
@Override
- protected void handleMessageArrived(String topic2, MqttMessage message) {
+ protected void handleSynchRequest(SyncRequestMessage pahoMessage) {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java
deleted file mode 100644
index 4e2aa010a..000000000
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoRequest.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.eclipse.ecf.provider.mqtt.paho.container;
-
-import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
-
-public class SyncPahoRequest extends PahoMessage {
-
- private static final long serialVersionUID = 9073885283936597534L;
-
- public SyncPahoRequest(PahoID targetID, byte[] bytes) {
- super(targetID, bytes);
- }
-
-}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java
deleted file mode 100644
index c5048c19f..000000000
--- a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncPahoResponse.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.eclipse.ecf.provider.mqtt.paho.container;
-
-import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
-
-public class SyncPahoResponse extends PahoMessage {
-
- private static final long serialVersionUID = 4085439339371979310L;
-
- public SyncPahoResponse(PahoID targetID, byte[] bytes) {
- super(targetID, bytes);
- }
-
-}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncRequestMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncRequestMessage.java
new file mode 100644
index 000000000..93936ab93
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncRequestMessage.java
@@ -0,0 +1,15 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.Serializable;
+
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class SyncRequestMessage extends PahoMessage {
+
+ private static final long serialVersionUID = 9073885283936597534L;
+
+ public SyncRequestMessage(PahoID fromID, PahoID targetID, Serializable data) {
+ super(fromID, targetID, data);
+ }
+
+}
diff --git a/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncResponseMessage.java b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncResponseMessage.java
new file mode 100644
index 000000000..7b76fb174
--- /dev/null
+++ b/incubation/bundles/org.eclipse.ecf.provider.mqtt.paho/src/org/eclipse/ecf/provider/mqtt/paho/container/SyncResponseMessage.java
@@ -0,0 +1,15 @@
+package org.eclipse.ecf.provider.mqtt.paho.container;
+
+import java.io.Serializable;
+
+import org.eclipse.ecf.provider.mqtt.paho.identity.PahoID;
+
+public class SyncResponseMessage extends PahoMessage {
+
+ private static final long serialVersionUID = 4085439339371979310L;
+
+ public SyncResponseMessage(PahoID fromID, PahoID targetID, Serializable data) {
+ super(fromID, targetID, data);
+ }
+
+}

Back to the top