Skip to main content
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristophe Munilla2019-07-15 08:52:32 -0400
committerChristophe Munilla2019-07-15 08:52:32 -0400
commit91e9633383184f9f4e5f4924b39fcd0b9043a07c (patch)
treea682c0f441f50dde526faa88a8c4d6305212e823
parentc40dd8e9f47e5eaa2126dd9048fd2a13d5dceb32 (diff)
downloadorg.eclipse.sensinact.gateway-91e9633383184f9f4e5f4924b39fcd0b9043a07c.tar.gz
org.eclipse.sensinact.gateway-91e9633383184f9f4e5f4924b39fcd0b9043a07c.tar.xz
org.eclipse.sensinact.gateway-91e9633383184f9f4e5f4924b39fcd0b9043a07c.zip
Mqtt utils memory leak fix
At each connection the MqttBroker instantiated a new client - The old one may not be deleted conducting to memory leak
-rw-r--r--platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java147
-rw-r--r--platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttTopic.java16
2 files changed, 98 insertions, 65 deletions
diff --git a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
index ef3ddb03..6115024f 100644
--- a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
+++ b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttBroker.java
@@ -13,15 +13,18 @@ package org.eclipse.sensinact.gateway.sthbnd.mqtt.util.api;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.sensinact.gateway.sthbnd.mqtt.util.listener.MqttConnectionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
public class MqttBroker {
@@ -89,17 +92,35 @@ public class MqttBroker {
}
public void subscribeToTopic(MqttTopic topic) {
- topics.add(topic);
- LOG.info("Subscription to the topic {} added to the list", topic.getTopic());
+ synchronized(this.topics) {
+ if(this.topics.indexOf(topic) < 0) {
+ topics.add(topic);
+ LOG.info("Subscription to the topic {} added to the list", topic.getTopic());
+ try {
+ if(client == null || !client.isConnected()) {
+ this.connect();
+ }
+ client.subscribe(topic.getTopic(), topic.getListener());
+ LOG.info("Subscription to the topic {} done", topic.getTopic());
+ } catch (MqttException e) {
+ LOG.error("Error when subscribing to the topic {}", topic.getTopic(),e);
+ }
+ }
+ }
}
public void unsubscribeFromTopic(MqttTopic topic) {
try {
client.unsubscribe(topic.getTopic());
- topics.remove(topic);
LOG.info("Unsubscription to the topic {} done", topic.getTopic());
- } catch (Exception e) {
- LOG.error("Unable to unsubscribe from the topic {}", topic.getTopic());
+ synchronized(this.topics) {
+ topics.remove(topic);
+ if(topics.isEmpty()) {
+ this.disconnect();
+ }
+ }
+ } catch (MqttException e) {
+ LOG.error("Error when unsubscribing from the topic {}", topic.getTopic(), e);
}
}
@@ -111,10 +132,27 @@ public class MqttBroker {
return handler;
}
- public void connect() throws Exception {
+ public void connect() throws MqttException {
+ if (client!=null && client.isConnected()) {
+ client.disconnect();
+ }
+ client = null;
final String brokerUrl = String.format("%s://%s:%d", protocol, host, port);
- LOG.info("Connecting to broker {}",brokerUrl);
- client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+ client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+ if (handler == null) {
+ LOG.info("Custom Connection Handler not defined, using default reconnection for {}", brokerUrl);
+ this.handler=new MqttConnectionHandlerImpl(this);
+ }
+ client.setCallback(handler);
+ doConnect();
+ }
+
+ public void doConnect() throws MqttException {
+ if(client == null) {
+ throw new MqttException(new NullPointerException("Valid client is required"));
+ }
+ final String brokerUrl = String.format("%s://%s:%d", protocol, host, port);
+
MqttConnectOptions connectOptions = new MqttConnectOptions();
if (session != null) {
if (session.getCleanSession() != null) {
@@ -138,70 +176,43 @@ public class MqttBroker {
connectOptions.setSSLProperties(authentication.getSslProperties());
}
}
-
- if (handler == null) {
- LOG.info("Custom Connection Handler not defined, using default reconnection for {}", brokerUrl);
- this.handler=new MqttConnectionHandlerImpl(this);
- }
-
- client.setCallback(handler);
-
LOG.info("Connecting to broker: {}", brokerUrl);
-
try {
if (!client.isConnected()) {
client.connect(connectOptions);
- if (handler != null) {
- handler.connectionEstablished(this);
- }
+ handler.connectionEstablished(this);
} else {
LOG.error("Already connected to the MQTT broker: {}", brokerUrl);
}
} catch (MqttException e) {
- if (handler != null) {
- handler.connectionFailed(this);
- }
+ handler.connectionFailed(this);
LOG.error("Failed to connect to MQTT broker", e);
}
LOG.info("Connected to broker: {}:{}", this.host, this.port);
- for (MqttTopic topic : topics) {
- try {
- client.subscribe(topic.getTopic(), topic.getListener());
- LOG.info("Subscription to the topic {} done", topic.getTopic());
- } catch (MqttException e) {
- LOG.error("Unable to subscribe to the topic {}", topic.getTopic());
- }
- }
- LOG.info("Connected to broker {}",brokerUrl);
}
- public synchronized void disconnect() throws Exception {
-
- List<MqttTopic> removalTopic=new ArrayList<>();
- //Avoid reconnection on voluntary disconnection
- setHandler(null);
- for (java.util.Iterator it = topics.iterator(); it.hasNext();) {
- MqttTopic topic=null;
- try {
- topic=(MqttTopic) it.next();
- LOG.info("Unsubscribing from topic {} done", topic.getTopic());
- client.unsubscribe(topic.getTopic());
- removalTopic.add(topic);
- LOG.info("Unsubscription to the topic {} done", topic.getTopic());
- } catch (MqttException e) {
- LOG.error("Unable to unsubscribe from the topic {}", topic.getTopic());
- }
+ public void disconnect() throws MqttException {
+ synchronized(this.topics) {
+ if(!topics.isEmpty()) {
+ for (Iterator<MqttTopic> it = topics.iterator(); it.hasNext();) {
+ MqttTopic topic=null;
+ try {
+ topic=(MqttTopic) it.next();
+ LOG.info("Unsubscribing from topic {} done", topic.getTopic());
+ client.unsubscribe(topic.getTopic());
+ it.remove();
+ LOG.info("Unsubscription to the topic {} done", topic.getTopic());
+ } catch (MqttException e) {
+ LOG.error("Unable to unsubscribe from the topic {}", topic.getTopic());
+ }
+ }
+ }
}
-
- topics.removeAll(removalTopic);
-
- if (client!=null&&client.isConnected()) {
+ if (client!=null && client.isConnected()) {
client.disconnect();
- client = null;
LOG.info("Disconnected from MQTT broker: {}", this.host);
- } else {
- LOG.error("Unable to disconnect from MQTT broker: {}", host);
}
+ client = null;
}
public MqttClient getClient() {
@@ -302,18 +313,24 @@ public class MqttBroker {
}
private class MqttConnectionHandlerImpl extends MqttConnectionHandler {
+ private Timer timer = new Timer();
+
public MqttConnectionHandlerImpl(MqttBroker broker) {
super(broker);
}
@Override
- public void connectionFailed(MqttBroker broker) {
- try {
- Thread.sleep(5000);
- broker.connect();
- } catch (Exception e) {
- LOG.debug("Connection Failed with {}://{}:{}",broker.getProtocol().name(),broker.getHost(),broker.getPort());
- }
+ public void connectionFailed(final MqttBroker broker) {
+ this.timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ broker.doConnect();
+ } catch (MqttException e) {
+ LOG.error("Connection Failed with {}://{}:{}",broker.getProtocol().name(),broker.getHost(),broker.getPort());
+ }
+ }
+ }, 5000);
}
@Override
@@ -322,7 +339,7 @@ public class MqttBroker {
for(MqttTopic topic:topics){
try {
LOG.info("Subscription to the topic {} done", topic.getTopic());
- client.subscribe(topic.getTopic(),0,topic.getListener());
+ broker.getClient().subscribe(topic.getTopic(),topic.getListener());
} catch (Exception e) {
LOG.error("Unable to subscribe to the topic {}", topic.getTopic());
}
@@ -335,7 +352,7 @@ public class MqttBroker {
public void connectionLost(MqttBroker broker) {
try {
broker.connect();
- } catch (Exception e) {
+ } catch (MqttException e) {
LOG.debug("Connection Lost with {}://{}:{}",broker.getProtocol().name(),broker.getHost(),broker.getPort());
}
}
diff --git a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttTopic.java b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttTopic.java
index 04234433..e2e3c916 100644
--- a/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttTopic.java
+++ b/platform/southbound/mqtt/mqtt-utils/src/main/java/org/eclipse/sensinact/gateway/sthbnd/mqtt/util/api/MqttTopic.java
@@ -32,4 +32,20 @@ public class MqttTopic {
public void setListener(MqttTopicMessage listener) {
this.listener = listener;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ public boolean equals(Object o) {
+ if(o == null) {
+ return false;
+ }
+ if(o.getClass() == String.class) {
+ return o.equals(this.topic);
+ }
+ if(MqttTopic.class.isAssignableFrom(o.getClass())) {
+ return ((MqttTopic)o).getTopic().equals(this.topic);
+ }
+ return false;
+ }
}

Back to the top