summaryrefslogtreecommitdiffstatsabout
diff options
context:
space:
mode:
authorAllan Stockdill-Mander2013-07-18 05:43:42 (EDT)
committer Ian Craggs2013-07-18 05:57:02 (EDT)
commit6183c7572bb2e73ef2ebd2112e83de6e50677563 (patch)
tree1c5b7f0d70fccc9662a6f2acaebf4e19143ca76b
parent99c930107eeff4c056700ed79a2abad5c77c49e2 (diff)
downloadorg.eclipse.paho.mqtt.java-6183c7572bb2e73ef2ebd2112e83de6e50677563.zip
org.eclipse.paho.mqtt.java-6183c7572bb2e73ef2ebd2112e83de6e50677563.tar.gz
org.eclipse.paho.mqtt.java-6183c7572bb2e73ef2ebd2112e83de6e50677563.tar.bz2
New and revised tests
Signed-off-by: Allan Stockdill-Mander <asm@uk.ibm.com>
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/BasicTest.java2
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/LiveTakeOverTest.java290
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/ModelTestCase.java685
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncTest.java510
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveTest.java5
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ConsoleHandler.java50
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/DetailFormatter.java180
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/HumanFormatter.java65
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerDumper.java133
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerNode.java71
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggingUtilities.java96
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ObjectFormatter.java273
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/TraceFormatter.java135
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/Utility.java2
-rw-r--r--org.eclipse.paho.client.mqttv3/src/test/resources/logging.properties8
15 files changed, 2496 insertions, 9 deletions
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/BasicTest.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/BasicTest.java
index 071354c..ff7b756 100644
--- a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/BasicTest.java
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/BasicTest.java
@@ -26,7 +26,7 @@ import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.test.client.MqttClientFactoryPaho;
-import org.eclipse.paho.client.mqttv3.test.log.LoggingUtilities;
+import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
import org.junit.AfterClass;
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/LiveTakeOverTest.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/LiveTakeOverTest.java
new file mode 100644
index 0000000..145a040
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/LiveTakeOverTest.java
@@ -0,0 +1,290 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test;
+
+import java.net.URI;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttTopic;
+import org.eclipse.paho.client.mqttv3.test.client.MqttClientFactoryPaho;
+import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
+import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
+import org.eclipse.paho.client.mqttv3.test.utilities.MqttV3Receiver;
+import org.eclipse.paho.client.mqttv3.test.utilities.MqttV3Receiver.ReceivedMessage;
+import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class LiveTakeOverTest {
+
+ private static final Class<?> cclass = LiveTakeOverTest.class;
+ private static final String className = cclass.getName();
+ private static final Logger log = Logger.getLogger(className);
+
+ private static URI serverURI;
+ private static MqttClientFactoryPaho clientFactory;
+
+ static enum FirstClientState {
+ INITIAL,
+ READY,
+ RUNNING,
+ FINISHED,
+ ERROR
+ }
+
+ private static String ClientId = "TakeOverClient";
+ private static String FirstSubTopicString = "FirstClient/Topic";
+
+ /**
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ try {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+
+ serverURI = TestProperties.getServerURI();
+ clientFactory = new MqttClientFactoryPaho();
+ clientFactory.open();
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ throw exception;
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+
+ try {
+ if (clientFactory != null) {
+ clientFactory.close();
+ clientFactory.disconnect();
+ }
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+
+ /**
+ * Test that a client actively doing work can be taken over
+ * @throws Exception
+ */
+ @Test
+ public void testLiveTakeOver() throws Exception {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ IMqttClient mqttClient = null;
+ try {
+ FirstClient firstClient = new FirstClient();
+ Thread firstClientThread = new Thread(firstClient);
+ log.info("Starting the firstClient thread");
+ firstClientThread.start();
+ log.info("firstClientThread Started");
+
+ firstClient.waitForState(FirstClientState.READY);
+
+ log.fine("telling the 1st client to go and let it publish for 2 seconds");
+ //Tell the first client to go and let it publish for a couple of seconds
+ firstClient.setState(FirstClientState.RUNNING);
+ Thread.sleep(2000);
+
+ log.fine("Client has been run for 2 seconds, now taking over connection");
+
+ //Now lets take over the connection
+ // Create a second MQTT client connection with the same clientid. The
+ // server should spot this and kick the first client connection off.
+ // To do this from the same box the 2nd client needs to use either
+ // a different form of persistent store or a different locaiton for
+ // the store to the first client.
+ // MqttClientPersistence persist = new MemoryPersistence();
+ mqttClient = clientFactory.createMqttClient(serverURI, ClientId, null);
+
+ MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ mqttClient.setCallback(mqttV3Receiver);
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+ mqttConnectOptions.setCleanSession(false);
+ mqttConnectOptions.setWill("WillTopic", "payload".getBytes(), 2, true);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + ClientId);
+ mqttClient.connect(mqttConnectOptions);
+
+ //We should have taken over the first Client's subscription...we may have some
+ //of his publishes arrive.
+ // NOTE: as a different persistence is used for the second client any inflight
+ // publications from the client will not be recovered / restarted. This will
+ // leave debris on the server.
+ log.fine("We should have taken over the first Client's subscription...we may have some of his publishes arrive.");
+ //Ignore his publishes that arrive...
+ ReceivedMessage oldMsg;
+ do {
+ oldMsg = mqttV3Receiver.receiveNext(1000);
+ }
+ while (oldMsg != null);
+
+ log.fine("Now check we have grabbed his subscription by publishing..");
+ //Now check we have grabbed his subscription by publishing..
+ byte[] payload = ("Message payload from second client " + getClass().getName() + "." + methodName).getBytes();
+ MqttTopic mqttTopic = mqttClient.getTopic(FirstSubTopicString);
+ log.info("Publishing to..." + FirstSubTopicString);
+ mqttTopic.publish(payload, 1, false);
+ log.info("Publish sent, checking for receipt...");
+
+ boolean ok = mqttV3Receiver.validateReceipt(FirstSubTopicString, 1, payload);
+ if (!ok) {
+ throw new Exception("Receive failed");
+ }
+ }
+ catch (Exception exception) {
+ log.throwing(className, methodName, exception);
+ throw exception;
+ }
+ finally {
+ try {
+ mqttClient.disconnect();
+ log.info("Disconnecting...");
+ mqttClient.close();
+ log.info("Close...");
+ }
+ catch (Exception exception) {
+ log.throwing(className, methodName, exception);
+ throw exception;
+ }
+ }
+
+ log.exiting(className, methodName);
+ }
+
+ class FirstClient implements Runnable {
+
+ private FirstClientState state = FirstClientState.INITIAL;
+ public Object stateLock = new Object();
+ IMqttClient mqttClient = null;
+ MqttV3Receiver mqttV3Receiver = null;
+
+ void waitForState(FirstClientState desiredState) throws InterruptedException {
+ final String methodName = "waitForState";
+ synchronized (stateLock) {
+ while ((state != desiredState) && (state != FirstClientState.ERROR)) {
+ try {
+ stateLock.wait();
+ }
+ catch (InterruptedException exception) {
+ log.throwing(className, methodName, exception);
+ throw exception;
+ }
+ }
+
+ if (state == FirstClientState.ERROR) {
+ Assert.fail("Firstclient entered an ERROR state");
+ }
+ }
+ log.exiting(className, methodName);
+ }
+
+ void setState(FirstClientState newState) {
+ synchronized (stateLock) {
+ state = newState;
+ stateLock.notifyAll();
+ }
+ }
+
+ void connectAndSub() {
+ String methodName = Utility.getMethodName();
+ try {
+ mqttClient = clientFactory.createMqttClient(serverURI, ClientId);
+ mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ mqttV3Receiver.setReportConnectionLoss(false);
+ mqttClient.setCallback(mqttV3Receiver);
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+ mqttConnectOptions.setCleanSession(false);
+ mqttConnectOptions.setWill("WillTopic", "payload".getBytes(), 2, true);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + ClientId);
+ mqttClient.connect(mqttConnectOptions);
+ log.info("Subscribing to..." + FirstSubTopicString);
+ mqttClient.subscribe(FirstSubTopicString, 2);
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caugh exception:" + exception);
+ setState(FirstClientState.ERROR);
+ Assert.fail("Failed ConnectAndSub exception=" + exception);
+ }
+ }
+
+ void repeatedlyPub() {
+ String methodName = Utility.getMethodName();
+
+ int i = 0;
+ while (mqttClient.isConnected()) {
+ try {
+ if (i > 999999) {
+ i = 0;
+ }
+ byte[] payload = ("Message payload " + getClass().getName() + ".publish" + (i++)).getBytes();
+ MqttTopic mqttTopic = mqttClient.getTopic(FirstSubTopicString);
+ log.info("Publishing to..." + FirstSubTopicString);
+ mqttTopic.publish(payload, 1, false);
+
+ }
+ catch (Exception exception) {
+ log.fine("Caught exception:" + exception);
+ // Don't fail - we are going to get an exception as we disconnected during takeOver
+ // Its likely the publish rate is too high i.e. inflight window is full
+ }
+ }
+ }
+
+ public void run() {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ connectAndSub();
+ try {
+ setState(FirstClientState.READY);
+ waitForState(FirstClientState.RUNNING);
+ repeatedlyPub();
+ log.info("FirstClient exiting...");
+ log.exiting(className, methodName);
+
+ mqttClient.close();
+
+ }
+ catch (InterruptedException exception) {
+ setState(FirstClientState.ERROR);
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ catch (MqttException exception) {
+ setState(FirstClientState.ERROR);
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/ModelTestCase.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/ModelTestCase.java
new file mode 100644
index 0000000..2d75b42
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/ModelTestCase.java
@@ -0,0 +1,685 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.test.client.MqttClientFactoryPaho;
+import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
+import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
+import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Client test which rapidly tries random combinations of connect,
+ * disconnect, publish and subscribe on a single thread with varying options
+ * (retained, qos etc) and verifies the results. A log is produced
+ * of the history of commands tried which can be fed back into the
+ * test to re-produce a previous run (See run(String filename)
+ */
+
+public class ModelTestCase implements MqttCallback {
+
+ private static final Class<?> cclass = ModelTestCase.class;
+ private static final String className = cclass.getName();
+ private static final Logger log = Logger.getLogger(className);
+
+ private static URI serverURI;
+ private static MqttClientFactoryPaho clientFactory;
+
+ public static final String LOGDIR = "./";
+ public static final String CLIENTID = "mqttv3.ModelTestCase";
+
+ public String logFilename = null;
+ public File logDirectory = null;
+ public PrintWriter logout = null;
+ public HashMap<String, Integer> subscribedTopics;
+ public Object lock;
+ public ArrayList<MqttMessage> messages;
+ public ArrayList<String> topics;
+ public Random random;
+ public IMqttClient client;
+ public HashMap<String, MqttMessage> retainedPublishes;
+ public HashMap<MqttDeliveryToken, String> currentTokens;
+ public boolean cleanSession;
+
+ private int numOfIterations = 500;
+
+ /**
+ * Constructor
+ **/
+ public ModelTestCase() {
+ logDirectory = new File(LOGDIR);
+ if (logDirectory.exists()) {
+ deleteLogFiles();
+ logFilename = "mqttv3.ModelTestCase." + System.currentTimeMillis() + ".log";
+
+ File logfile = new File(logDirectory, logFilename);
+ try {
+ logout = new PrintWriter(new FileWriter(logfile));
+ }
+ catch (IOException e) {
+ logout = null;
+ }
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ private void deleteLogFiles() {
+ log.info("Deleting log files");
+ File[] files = logDirectory.listFiles(new FilenameFilter() {
+
+ public boolean accept(File dir, String name) {
+ return name.matches("mqttv3\\.ModelTestCase\\..*\\.log");
+ }
+ });
+
+ for (File file : files) {
+ boolean isDeleted = file.delete();
+ if (isDeleted == false) {
+ log.info(" failed to delete: " + file.getAbsolutePath());
+ file.deleteOnExit();
+ }
+ }
+ }
+
+ /**
+ * Test definitions
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ try {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+
+ serverURI = TestProperties.getServerURI();
+ clientFactory = new MqttClientFactoryPaho();
+ clientFactory.open();
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ throw exception;
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+
+ try {
+ if (clientFactory != null) {
+ clientFactory.close();
+ clientFactory.disconnect();
+ }
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testRunModel() throws Exception {
+ log.info("Test core operations and parameters by random selection");
+ log.info("See file: " + logFilename + " for details of selected test sequence");
+ initialise();
+ try {
+ this.run(numOfIterations);
+ }
+ finally {
+ finish();
+ }
+ }
+
+ /**
+ * @param msg
+ */
+ public void logToFile(String msg) {
+
+ DateFormat df = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS");
+ Date d = new Date();
+ String tsMsg = df.format(d) + " " + msg;
+
+ if (logout != null) {
+ logout.println(tsMsg);
+ }
+ else {
+ System.out.println(tsMsg);
+ }
+ }
+
+ /**
+ * @param e
+ */
+ public void logToFile(Throwable e) {
+ e.printStackTrace(System.out);
+ if (logout != null) {
+ e.printStackTrace(logout);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void initialise() throws Exception {
+ random = new Random();
+ subscribedTopics = new HashMap<String, Integer>();
+ messages = new ArrayList<MqttMessage>();
+ topics = new ArrayList<String>();
+ lock = new Object();
+ retainedPublishes = new HashMap<String, MqttMessage>();
+ currentTokens = new HashMap<MqttDeliveryToken, String>();
+
+ client = clientFactory.createMqttClient(serverURI, CLIENTID);
+ client.setCallback(this);
+ // Clean any hungover state
+ MqttConnectOptions connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ client.connect(connOpts);
+ client.disconnect();
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void finish() throws Exception {
+ if (logout != null) {
+ logout.flush();
+ logout.close();
+ }
+ client.close();
+ }
+
+ private final double[] CONNECTED_TABLE = new double[]{0.05, // disconnect
+ 0.2, // subscribe
+ 0.2, // unsubscribe
+ 0.5, // publish
+ 0.05 // pendingDeliveryTokens
+ };
+ private final double[] DISCONNECTED_TABLE = new double[]{0.5, // connect
+ 0.2, // pendingDeliveryTokens
+ 0.3, // disconnect
+ };
+
+ /**
+ * @param table
+ */
+ private int getOption(double[] table) {
+ double n = random.nextDouble();
+ double c = 0;
+ for (int i = 0; i < table.length; i++) {
+ c += table[i];
+ if (c > n) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * @param iterations
+ * @throws Exception
+ */
+ public void run(int iterations) throws Exception {
+ try {
+ for (int i = 0; i < iterations; i++) {
+ if (client.isConnected()) {
+ int o = getOption(CONNECTED_TABLE);
+ switch (o) {
+ case 0 :
+ disconnect(true);
+ break;
+ case 1 :
+ subscribe();
+ break;
+ case 2 :
+ unsubscribe();
+ break;
+ case 3 :
+ publish();
+ break;
+ case 4 :
+ pendingDeliveryTokens();
+ break;
+ }
+ }
+ else {
+ int o = getOption(DISCONNECTED_TABLE);
+ switch (o) {
+ case 0 :
+ connect();
+ break;
+ case 1 :
+ pendingDeliveryTokens();
+ break;
+ case 2 :
+ disconnect(false);
+ break;
+ }
+ }
+ }
+ }
+ catch (Exception e) {
+ logToFile(e);
+ throw (e);
+ }
+ finally {
+ try {
+ if (client.isConnected()) {
+ client.disconnect();
+ }
+
+ client.close();
+ }
+ catch (Exception e) {
+ // ignore - cleanup for error cases, allow any previous exception to be seen
+ }
+ }
+ }
+
+ // TODO:
+ /**
+ * @param filename
+ * @throws Exception
+ */
+ public void run(String filename) throws Exception {
+ /*
+ 26/07/2010 16:28:46.972 connect [cleanSession:false]
+ 26/07/2010 16:28:46.972 disconnect [cleanSession:false][isConnected:true]
+ 26/07/2010 16:28:46.972 subscribe [topic:5f00344b-530a-414a-91e6-57f7d8662b80][qos:2][expectRetained:true]
+ 26/07/2010 16:28:46.972 unsubscribe [topic:0c94dacd-71b0-4692-8b49-4cb225e5f505][existing:false]
+ 26/07/2010 16:28:46.972 publish [topic:5f00344b-530a-414a-91e6-57f7d8662b80][payload:0dbc3ef9-85b3-4cf0-b1f3-e086289d8152][qos:2][retained:true][subscribed:false][waitForCompletion:false]
+ 26/07/2010 16:28:46.972 pendingDeliveryTokens [count:0]
+ */
+ Pattern pConnect = Pattern.compile("^.*? connect \\[cleanSession:(.+)\\]$");
+ Pattern pDisconnect = Pattern
+ .compile("^.*? disconnect \\[cleanSession:(.+)\\]\\[isConnected:(.+)\\]$");
+ Pattern pSubscribe = Pattern
+ .compile("^.*? subscribe \\[topic:(.+)\\]\\[qos:(.+)\\]\\[expectRetained:(.+)\\]$");
+ Pattern pUnsubscribe = Pattern.compile("^.*? unsubscribe \\[topic:(.+)\\]\\[existing:(.+)\\]$");
+ Pattern pPublish = Pattern
+ .compile("^.*? publish \\[topic:(.+)\\]\\[payload:(.+)\\]\\[qos:(.+)\\]\\[retained:(.+)\\]\\[subscribed:(.+)\\]\\[waitForCompletion:(.+)\\]$");
+ Pattern pPendingDeliveryTokens = Pattern.compile("^.*? pendingDeliveryTokens \\[count:(.+)\\]$");
+ BufferedReader in = new BufferedReader(new FileReader(filename));
+ String line;
+ try {
+ while ((line = in.readLine()) != null) {
+ Matcher m = pConnect.matcher(line);
+ if (m.matches()) {
+ connect(Boolean.parseBoolean(m.group(1)));
+ }
+ else if ((m = pDisconnect.matcher(line)).matches()) {
+ disconnect(Boolean.parseBoolean(m.group(1)), Boolean.parseBoolean(m.group(2)));
+ }
+ else if ((m = pSubscribe.matcher(line)).matches()) {
+ subscribe(m.group(1), Integer.parseInt(m.group(2)), Boolean.parseBoolean(m.group(3)));
+ }
+ else if ((m = pUnsubscribe.matcher(line)).matches()) {
+ unsubscribe(m.group(1), Boolean.parseBoolean(m.group(2)));
+ }
+ else if ((m = pPublish.matcher(line)).matches()) {
+ publish(m.group(1), m.group(2), Integer.parseInt(m.group(3)), Boolean.parseBoolean(m
+ .group(4)), Boolean.parseBoolean(m.group(5)), Boolean.parseBoolean(m.group(6)));
+ }
+ else if ((m = pPendingDeliveryTokens.matcher(line)).matches()) {
+ pendingDeliveryTokens(Integer.parseInt(m.group(1)));
+ }
+ }
+ }
+ catch (Exception e) {
+ if (client.isConnected()) {
+ client.disconnect();
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void connect() throws Exception {
+ cleanSession = random.nextBoolean();
+ connect(cleanSession);
+ }
+
+ /**
+ * Connects the client.
+ * @param cleanSession1 whether to connect clean session.
+ * @throws Exception
+ */
+ public void connect(boolean cleanSession1) throws Exception {
+ logToFile("connect [cleanSession:" + cleanSession1 + "]");
+ if (cleanSession1) {
+ subscribedTopics.clear();
+ }
+ MqttConnectOptions opts = new MqttConnectOptions();
+ opts.setCleanSession(cleanSession1);
+ client.connect(opts);
+ }
+
+ /**
+ * @param connected
+ * @throws Exception
+ */
+ public void disconnect(boolean connected) throws Exception {
+ disconnect(cleanSession, connected);
+ }
+
+ /**
+ * Disconnects the client
+ * @param cleanSession1 whether this is a clean session being disconnected
+ * @param isConnected whether we think the client is currently connected
+ * @throws Exception
+ */
+ public void disconnect(boolean cleanSession1, boolean isConnected) throws Exception {
+ logToFile("disconnect [cleanSession:" + cleanSession1 + "][isConnected:" + client.isConnected()
+ + "]");
+ if (isConnected != client.isConnected()) {
+ throw new Exception("Client state mismatch [expected:" + isConnected + "][actual:"
+ + client.isConnected() + "]");
+ }
+ if (isConnected && cleanSession1) {
+ subscribedTopics.clear();
+ }
+ try {
+ client.disconnect();
+ }
+ catch (MqttException e) {
+ if (((e.getReasonCode() != 6) && (e.getReasonCode() != 32101)) || isConnected) {
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void subscribe() throws Exception {
+ String topic;
+ boolean expectRetained;
+ if (!retainedPublishes.isEmpty() && (random.nextInt(5) == 0)) {
+ Object[] topics1 = retainedPublishes.keySet().toArray();
+ topic = (String) topics1[random.nextInt(topics1.length)];
+
+ expectRetained = true;
+ }
+ else {
+ topic = UUID.randomUUID().toString();
+ expectRetained = false;
+ }
+ int qos = random.nextInt(3);
+ subscribe(topic, qos, expectRetained);
+ }
+
+ /**
+ * Subscribes to a given topic at the given qos
+ * @param topic the topic to subscribe to
+ * @param qos the qos to subscribe at
+ * @param expectRetained whether a retained message is expected to exist on this topic
+ * @throws Exception
+ */
+ public void subscribe(String topic, int qos, boolean expectRetained) throws Exception {
+ logToFile("subscribe [topic:" + topic + "][qos:" + qos + "][expectRetained:" + expectRetained
+ + "]");
+ subscribedTopics.put(topic, new Integer(qos));
+ client.subscribe(topic, qos);
+ if (expectRetained) {
+ waitForMessage(topic, retainedPublishes.get(topic), true);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void unsubscribe() throws Exception {
+ String topic;
+ boolean existing = false;
+ if (random.nextBoolean() && (subscribedTopics.size() > 0)) {
+ Object[] topics1 = subscribedTopics.keySet().toArray();
+ topic = (String) topics1[random.nextInt(topics1.length)];
+ existing = true;
+ }
+ else {
+ topic = UUID.randomUUID().toString();
+ }
+ unsubscribe(topic, existing);
+ }
+
+ /**
+ * Unsubscribes the given topic
+ * @param topic the topic to unsubscribe from
+ * @param existing whether we think we're currently subscribed to the topic
+ * @throws Exception
+ */
+ public void unsubscribe(String topic, boolean existing) throws Exception {
+ logToFile("unsubscribe [topic:" + topic + "][existing:" + existing + "]");
+ client.unsubscribe(topic);
+ Object o = subscribedTopics.remove(topic);
+ if (existing == (o == null)) {
+ throw new Exception("Subscription state mismatch [topic:" + topic + "][expected:" + existing
+ + "]");
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void publish() throws Exception {
+ String topic;
+ boolean subscribed = false;
+ if (random.nextBoolean() && (subscribedTopics.size() > 0)) {
+ Object[] topics1 = subscribedTopics.keySet().toArray();
+ topic = (String) topics1[random.nextInt(topics1.length)];
+ subscribed = true;
+ }
+ else {
+ topic = UUID.randomUUID().toString();
+ }
+ String payload = UUID.randomUUID().toString();
+ int qos = random.nextInt(3);
+ boolean retained = random.nextInt(3) == 0;
+
+ // If the message is retained then we should wait for completion. If this isn't done there
+ // is a risk that a subsequent subscriber could be created to receive the message before it
+ // has been fully delivered and hence would not see the retained flag.
+ boolean waitForCompletion = (retained || (random.nextInt(1000) == 1));
+ publish(topic, payload, qos, retained, subscribed, waitForCompletion);
+
+ // For QoS0 messages, wait for completion takes no effect as there is no feedback from
+ // the server, and so even though not very deterministic, a small sleep is taken.
+ if (waitForCompletion && retained && (qos == 0)) {
+ Thread.sleep(50);
+ }
+ }
+
+ /**
+ * Publishes to the given topic
+ * @param topic the topic to publish to
+ * @param payload the payload to publish
+ * @param qos the qos to publish at
+ * @param retained whether to publish retained
+ * @param subscribed whether we think we're currently subscribed to the topic
+ * @param waitForCompletion whether we should wait for the message to complete delivery
+ * @throws Exception
+ */
+ public void publish(String topic, String payload, int qos, boolean retained, boolean subscribed,
+ boolean waitForCompletion) throws Exception {
+ logToFile("publish [topic:" + topic + "][payload:" + payload + "][qos:" + qos + "][retained:"
+ + retained + "][subscribed:" + subscribed + "][waitForCompletion:" + waitForCompletion
+ + "]");
+ if (subscribed != subscribedTopics.containsKey(topic)) {
+ throw new Exception("Subscription state mismatch [topic:" + topic + "][expected:"
+ + subscribed + "]");
+ }
+ MqttMessage msg = new MqttMessage(payload.getBytes());
+ msg.setQos(qos);
+ msg.setRetained(retained);
+ if (retained) {
+ retainedPublishes.put(topic, msg);
+ }
+ MqttDeliveryToken token = client.getTopic(topic).publish(msg);
+ synchronized (currentTokens) {
+ if (!token.isComplete()) {
+ currentTokens.put(token, "[" + topic + "][" + msg.toString() + "]");
+ }
+ }
+
+ if (retained || waitForCompletion) {
+ token.waitForCompletion();
+ synchronized (currentTokens) {
+ currentTokens.remove(token);
+ }
+ }
+ if (subscribed) {
+ waitForMessage(topic, msg, false);
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ public void pendingDeliveryTokens() throws Exception {
+ IMqttDeliveryToken[] tokens = client.getPendingDeliveryTokens();
+
+ }
+
+ /**
+ * Checks the pending delivery tokens
+ * @param count the expected number of tokens to be returned
+ * @throws Exception
+ */
+ public void pendingDeliveryTokens(int count) throws Exception {
+ IMqttDeliveryToken[] tokens = client.getPendingDeliveryTokens();
+ logToFile("pendingDeliveryTokens [count:" + tokens.length + "]");
+ if (!client.isConnected() && (tokens.length != count)) {
+ throw new Exception("Unexpected pending tokens [expected:" + count + "][actual:"
+ + tokens.length + "]");
+ }
+ }
+
+ /**
+ * @param cause
+ */
+ public void connectionLost(Throwable cause) {
+ logToFile("Connection Lost:");
+ logToFile(cause);
+ }
+
+ /**
+ * @param token
+ */
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ synchronized (currentTokens) {
+ currentTokens.remove(token);
+ }
+ }
+
+ /**
+ * Waits for the next message to arrive and checks it's values
+ * @param topic the topic expected
+ * @param message the message expected
+ * @param expectRetained whether the retain flag is expected to be set
+ * @throws Exception
+ */
+ public void waitForMessage(String topic, MqttMessage message, boolean expectRetained)
+ throws Exception {
+ synchronized (lock) {
+ if (messages.size() == 0) {
+ lock.wait(1000);
+ }
+ if (messages.size() == 0) {
+ throw new Exception("message timeout [topic:" + topic + "]");
+ }
+ String rtopic = topics.remove(0);
+ MqttMessage rmessage = messages.remove(0);
+ if (!rtopic.equals(topic)) {
+ if (rmessage.isRetained() && !expectRetained) {
+ throw new Exception("pre-existing retained message [expectedTopic:" + topic
+ + "][expectedPayload:" + message.toString() + "] [receivedTopic:" + rtopic
+ + "][receivedPayload:" + rmessage.toString() + "]");
+ }
+ throw new Exception("message topic mismatch [expectedTopic:" + topic
+ + "][expectedPayload:" + message.toString() + "] [receivedTopic:" + rtopic
+ + "][receivedPayload:" + rmessage.toString() + "]");
+ }
+ if (!rmessage.toString().equals(message.toString())) {
+ if (rmessage.isRetained() && !expectRetained) {
+ throw new Exception("pre-existing retained message [expectedTopic:" + topic
+ + "][expectedPayload:" + message.toString() + "] [receivedTopic:" + rtopic
+ + "][receivedPayload:" + rmessage.toString() + "]");
+ }
+ throw new Exception("message payload mismatch [expectedTopic:" + topic
+ + "][expectedPayload:" + message.toString() + "] [receivedTopic:" + rtopic
+ + "][receivedPayload:" + rmessage.toString() + "]");
+ }
+ if (expectRetained && !rmessage.isRetained()) {
+ throw new Exception("message not retained [topic:" + topic + "]");
+ }
+ else if (!expectRetained && rmessage.isRetained()) {
+ throw new Exception("message retained [topic:" + topic + "]");
+ }
+ }
+ }
+
+ /**
+ * @param topic
+ * @param message
+ * @throws Exception
+ */
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ synchronized (lock) {
+ messages.add(message);
+ topics.add(topic);
+ lock.notifyAll();
+ }
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncTest.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncTest.java
new file mode 100644
index 0000000..1cbf4ac
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveAsyncTest.java
@@ -0,0 +1,510 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test;
+
+import java.net.URI;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.test.client.MqttClientFactoryPaho;
+import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
+import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
+import org.eclipse.paho.client.mqttv3.test.utilities.MqttV3Receiver;
+import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class SendReceiveAsyncTest {
+
+ static final Class<?> cclass = SendReceiveAsyncTest.class;
+ static final String className = cclass.getName();
+ static final Logger log = Logger.getLogger(className);
+
+ private static URI serverURI;
+ private static MqttClientFactoryPaho clientFactory;
+
+ /**
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ try {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+
+ serverURI = TestProperties.getServerURI();
+ clientFactory = new MqttClientFactoryPaho();
+ clientFactory.open();
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ throw exception;
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+
+ try {
+ if (clientFactory != null) {
+ clientFactory.close();
+ clientFactory.disconnect();
+ }
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+
+ /**
+ * Tests that a client can be constructed and that it can connect to and
+ * disconnect from the service
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testConnect() throws Exception {
+ final String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ IMqttAsyncClient mqttClient = null;
+ try {
+ mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName);
+ IMqttToken connectToken = null;
+ IMqttToken disconnectToken = null;
+
+ connectToken = mqttClient.connect(null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName);
+ connectToken.waitForCompletion();
+
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+
+ connectToken = mqttClient.connect(null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName);
+ connectToken.waitForCompletion();
+
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ Assert.fail("Failed:" + methodName + " exception=" + exception);
+ }
+ finally {
+ if (mqttClient != null) {
+ log.info("Close...");
+ mqttClient.close();
+ }
+ }
+
+ log.exiting(className, methodName);
+ }
+
+ /**
+ * Test connection using a remote host name for the local host.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRemoteConnect() throws Exception {
+ final String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ IMqttAsyncClient mqttClient = null;
+ try {
+ mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName);
+ IMqttToken connectToken = null;
+ IMqttToken subToken = null;
+ IMqttDeliveryToken pubToken = null;
+ IMqttToken disconnectToken = null;
+
+ connectToken = mqttClient.connect(null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName);
+ connectToken.waitForCompletion();
+
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+
+ MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ log.info("Assigning callback...");
+ mqttClient.setCallback(mqttV3Receiver);
+
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+ mqttConnectOptions.setCleanSession(false);
+
+ connectToken = mqttClient.connect(mqttConnectOptions, null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName + ", cleanSession: false");
+ connectToken.waitForCompletion();
+
+ String[] topicNames = new String[]{methodName + "/Topic"};
+ int[] topicQos = {0};
+ subToken = mqttClient.subscribe(topicNames, topicQos, null, null);
+ log.info("Subscribing to..." + topicNames[0]);
+ subToken.waitForCompletion();
+
+ byte[] payload = ("Message payload " + className + "." + methodName).getBytes();
+ pubToken = mqttClient.publish(topicNames[0], payload, 1, false, null, null);
+ log.info("Publishing to..." + topicNames[0]);
+ pubToken.waitForCompletion();
+
+ boolean ok = mqttV3Receiver.validateReceipt(topicNames[0], 0,
+ payload);
+ if (!ok) {
+ Assert.fail("Receive failed");
+ }
+
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ Assert.fail("Failed:" + methodName + " exception=" + exception);
+ }
+ finally {
+ if (mqttClient != null) {
+ log.info("Close...");
+ mqttClient.close();
+ }
+ }
+
+ log.exiting(className, methodName);
+ }
+
+ /**
+ * Test client pubSub using very large messages
+ */
+ @Test
+ public void testLargeMessage() {
+ final String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ IMqttAsyncClient mqttClient = null;
+ try {
+ mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName);
+ IMqttToken connectToken;
+ IMqttToken subToken;
+ IMqttToken unsubToken;
+ IMqttDeliveryToken pubToken;
+
+ MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ log.info("Assigning callback...");
+ mqttClient.setCallback(mqttV3Receiver);
+
+ connectToken = mqttClient.connect(null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName);
+ connectToken.waitForCompletion();
+
+ int largeSize = 1000;
+ String[] topicNames = new String[]{methodName + "/Topic"};
+ int[] topicQos = {0};
+ byte[] message = new byte[largeSize];
+
+ java.util.Arrays.fill(message, (byte) 's');
+
+ subToken = mqttClient.subscribe(topicNames, topicQos, null, null);
+ log.info("Subscribing to..." + topicNames[0]);
+ subToken.waitForCompletion();
+
+ unsubToken = mqttClient.unsubscribe(topicNames, null, null);
+ log.info("Unsubscribing from..." + topicNames[0]);
+ unsubToken.waitForCompletion();
+
+ subToken = mqttClient.subscribe(topicNames, topicQos, null, null);
+ log.info("Subscribing to..." + topicNames[0]);
+ subToken.waitForCompletion();
+
+ pubToken = mqttClient.publish(topicNames[0], message, 0, false, null, null);
+ log.info("Publishing to..." + topicNames[0]);
+ pubToken.waitForCompletion();
+
+ boolean ok = mqttV3Receiver.validateReceipt(topicNames[0], 0,
+ message);
+ if (!ok) {
+ Assert.fail("Receive failed");
+ }
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ Assert.fail("Failed to instantiate:" + methodName + " exception="+ exception);
+ }
+ finally {
+ try {
+ IMqttToken disconnectToken;
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+ log.info("Close...");
+ mqttClient.close();
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+
+ log.exiting(className, methodName);
+ }
+
+ /**
+ * Multiple publishers and subscribers.
+ */
+ @Test
+ public void testMultipleClients() {
+ final String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ int publishers = 2;
+ int subscribers = 10;
+
+ IMqttAsyncClient[] mqttPublisher = new IMqttAsyncClient[publishers];
+ IMqttAsyncClient[] mqttSubscriber = new IMqttAsyncClient[subscribers];
+
+ IMqttToken connectToken;
+ IMqttToken subToken;
+ IMqttDeliveryToken pubToken;
+ IMqttToken disconnectToken;
+
+ try {
+ String[] topicNames = new String[]{methodName + "/Topic"};
+ int[] topicQos = {0};
+
+ for (int i = 0; i < mqttPublisher.length; i++) {
+ mqttPublisher[i] = clientFactory.createMqttAsyncClient(serverURI, "MultiPub" + i);
+ connectToken = mqttPublisher[i].connect(null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId: MultiPub" + i);
+ connectToken.waitForCompletion();
+ } // for...
+
+ MqttV3Receiver[] mqttV3Receiver = new MqttV3Receiver[mqttSubscriber.length];
+ for (int i = 0; i < mqttSubscriber.length; i++) {
+ mqttSubscriber[i] = clientFactory.createMqttAsyncClient(serverURI, "MultiSubscriber" + i);
+ mqttV3Receiver[i] = new MqttV3Receiver(mqttSubscriber[i], LoggingUtilities.getPrintStream());
+ log.info("Assigning callback...");
+ mqttSubscriber[i].setCallback(mqttV3Receiver[i]);
+ connectToken = mqttSubscriber[i].connect(null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId: MultiSubscriber" + i);
+ connectToken.waitForCompletion();
+ subToken = mqttSubscriber[i].subscribe(topicNames, topicQos, null, null);
+ log.info("Subcribing to..." + topicNames[0]);
+ subToken.waitForCompletion();
+ } // for...
+
+ for (int iMessage = 0; iMessage < 10; iMessage++) {
+ byte[] payload = ("Message " + iMessage).getBytes();
+ for (int i = 0; i < mqttPublisher.length; i++) {
+ pubToken = mqttPublisher[i].publish(topicNames[0], payload, 0, false, null, null);
+ log.info("Publishing to..." + topicNames[0]);
+ pubToken.waitForCompletion();
+ }
+
+ for (int i = 0; i < mqttSubscriber.length; i++) {
+ for (int ii = 0; ii < mqttPublisher.length; ii++) {
+ boolean ok = mqttV3Receiver[i].validateReceipt(
+ topicNames[0], 0, payload);
+ if (!ok) {
+ Assert.fail("Receive failed");
+ }
+ } // for publishers...
+ } // for subscribers...
+ } // for messages...
+
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ Assert.fail("Failed to instantiate:" + methodName + " exception="+ exception);
+ }
+ finally {
+ try {
+ for (int i = 0; i < mqttPublisher.length; i++) {
+ disconnectToken = mqttPublisher[i].disconnect(null, null);
+ log.info("Disconnecting...MultiPub" + i);
+ disconnectToken.waitForCompletion();
+ log.info("Close...");
+ mqttPublisher[i].close();
+ }
+ for (int i = 0; i < mqttSubscriber.length; i++) {
+ disconnectToken = mqttSubscriber[i].disconnect(null, null);
+ log.info("Disconnecting...MultiSubscriber" + i);
+ disconnectToken.waitForCompletion();
+ log.info("Close...");
+ mqttSubscriber[i].close();
+ }
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+
+ log.exiting(className, methodName);
+ }
+
+ /**
+ * Test the behaviour of the cleanStart flag, used to clean up before
+ * re-connecting.
+ */
+ @Test
+ public void testCleanStart() throws Exception {
+ final String methodName = Utility.getMethodName();
+ LoggingUtilities.banner(log, cclass, methodName);
+ log.entering(className, methodName);
+
+ IMqttAsyncClient mqttClient = null;
+
+ IMqttToken connectToken;
+ IMqttToken subToken;
+ IMqttDeliveryToken pubToken;
+ IMqttToken disconnectToken;
+
+ try {
+ mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName);
+ MqttV3Receiver mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ log.info("Assigning callback...");
+ mqttClient.setCallback(mqttV3Receiver);
+
+ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
+ // Clean start: true - The broker cleans up all client state, including subscriptions, when the client is disconnected.
+ // Clean start: false - The broker remembers all client state, including subscriptions, when the client is disconnected.
+ // Matching publications will get queued in the broker whilst the client is disconnected.
+ // For Mqtt V3 cleanSession=false, implies new subscriptions are durable.
+ mqttConnectOptions.setCleanSession(false);
+ connectToken = mqttClient.connect(mqttConnectOptions, null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName + ", cleanSession: false");
+ connectToken.waitForCompletion();
+
+ String[] topicNames = new String[]{methodName + "/Topic"};
+ int[] topicQos = {0};
+ subToken = mqttClient.subscribe(topicNames, topicQos, null, null);
+ log.info("Subscribing to..." + topicNames[0]);
+ subToken.waitForCompletion();
+
+ byte[] payload = ("Message payload " + className + "." + methodName + " First").getBytes();
+ pubToken = mqttClient.publish(topicNames[0], payload, 1, false, null, null);
+ log.info("Publishing to..." + topicNames[0]);
+ pubToken.waitForCompletion();
+ boolean ok = mqttV3Receiver.validateReceipt(topicNames[0], 0,
+ payload);
+ if (!ok) {
+ Assert.fail("Receive failed");
+ }
+
+ // Disconnect and reconnect to make sure the subscription and all queued messages are cleared.
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+ log.info("Close");
+ mqttClient.close();
+
+ // Send a message from another client, to our durable subscription.
+ mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName + "Other");
+ mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ log.info("Assigning callback...");
+ mqttClient.setCallback(mqttV3Receiver);
+
+ mqttConnectOptions = new MqttConnectOptions();
+ mqttConnectOptions.setCleanSession(true);
+ connectToken = mqttClient.connect(mqttConnectOptions, null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName + "Other, cleanSession: true");
+ connectToken.waitForCompletion();
+
+ // Receive the publication so that we can be sure the first client has also received it.
+ // Otherwise the first client may reconnect with its clean session before the message has arrived.
+ subToken = mqttClient.subscribe(topicNames, topicQos, null, null);
+ log.info("Subscribing to..." + topicNames[0]);
+ subToken.waitForCompletion();
+ payload = ("Message payload " + className + "." + methodName + " Other client").getBytes();
+ pubToken = mqttClient.publish(topicNames[0], payload, 1, false, null, null);
+ log.info("Publishing to..." + topicNames[0]);
+ pubToken.waitForCompletion();
+ ok = mqttV3Receiver.validateReceipt(topicNames[0], 0, payload);
+ if (!ok) {
+ Assert.fail("Receive failed");
+ }
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+ log.info("Close...");
+ mqttClient.close();
+
+ // Reconnect and check we have no messages.
+ mqttClient = clientFactory.createMqttAsyncClient(serverURI, methodName);
+ mqttV3Receiver = new MqttV3Receiver(mqttClient, LoggingUtilities.getPrintStream());
+ log.info("Assigning callback...");
+ mqttClient.setCallback(mqttV3Receiver);
+ mqttConnectOptions = new MqttConnectOptions();
+ mqttConnectOptions.setCleanSession(true);
+ connectToken = mqttClient.connect(mqttConnectOptions, null, null);
+ log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + methodName + ", cleanSession: true");
+ connectToken.waitForCompletion();
+ MqttV3Receiver.ReceivedMessage receivedMessage = mqttV3Receiver.receiveNext(100);
+ if (receivedMessage != null) {
+ Assert.fail("Receive messaqe:" + new String(receivedMessage.message.getPayload()));
+ }
+
+ // Also check that subscription is cancelled.
+ payload = ("Message payload " + className + "." + methodName + " Cancelled Subscription").getBytes();
+ pubToken = mqttClient.publish(topicNames[0], payload, 1, false, null, null);
+ log.info("Publishing to..." + topicNames[0]);
+ pubToken.waitForCompletion();
+
+ receivedMessage = mqttV3Receiver.receiveNext(100);
+ if (receivedMessage != null) {
+ log.fine("Message I shouldn't have: " + new String(receivedMessage.message.getPayload()));
+ Assert.fail("Receive messaqe:" + new String(receivedMessage.message.getPayload()));
+ }
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ Assert.fail("Failed:" + methodName + " exception=" + exception);
+ }
+ finally {
+ try {
+ disconnectToken = mqttClient.disconnect(null, null);
+ log.info("Disconnecting...");
+ disconnectToken.waitForCompletion();
+ log.info("Close...");
+ mqttClient.close();
+ }
+ catch (Exception exception) {
+ log.log(Level.SEVERE, "caught exception:", exception);
+ }
+ }
+
+ log.exiting(className, methodName);
+ }
+} \ No newline at end of file
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveTest.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveTest.java
index c8c1cf0..bf122b2 100644
--- a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveTest.java
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/SendReceiveTest.java
@@ -22,7 +22,7 @@ import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.test.client.MqttClientFactoryPaho;
-import org.eclipse.paho.client.mqttv3.test.log.LoggingUtilities;
+import org.eclipse.paho.client.mqttv3.test.logging.LoggingUtilities;
import org.eclipse.paho.client.mqttv3.test.properties.TestProperties;
import org.eclipse.paho.client.mqttv3.test.utilities.MqttV3Receiver;
import org.eclipse.paho.client.mqttv3.test.utilities.Utility;
@@ -222,7 +222,7 @@ public class SendReceiveTest {
}
finally {
try {
- log.info("Disconnecting...");
+ log.info("Disconnecting...");
mqttClient.disconnect();
log.info("Close...");
mqttClient.close();
@@ -314,7 +314,6 @@ public class SendReceiveTest {
log.info("Connecting...(serverURI:" + serverURI + ", ClientId: MultiPub" + i);
mqttPublisher[i].connect();
mqttTopic[i] = mqttPublisher[i].getTopic(topicNames[0]);
-
} // for...
MqttV3Receiver[] mqttV3Receiver = new MqttV3Receiver[mqttSubscriber.length];
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ConsoleHandler.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ConsoleHandler.java
new file mode 100644
index 0000000..75bee83
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ConsoleHandler.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.util.logging.LogRecord;
+import java.util.logging.StreamHandler;
+
+/**
+ * Write console output to stdout (rather the default implementation which writes to stderr)
+ */
+public class ConsoleHandler extends StreamHandler {
+
+ /**
+ * Constructs a <code>ConsoleHandler</code> object.
+ */
+ public ConsoleHandler() {
+ super();
+ setOutputStream(System.out);
+ }
+
+ /**
+ * Logs a record if necessary. A flush operation will be done.
+ * @param record
+ */
+ @Override
+ public void publish(LogRecord record) {
+ super.publish(record);
+ super.flush();
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void close() {
+ // Do nothing (the default implementation would close the stream!)
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/DetailFormatter.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/DetailFormatter.java
new file mode 100644
index 0000000..e6b1ecb
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/DetailFormatter.java
@@ -0,0 +1,180 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+
+import org.eclipse.paho.client.mqttv3.test.utilities.StringUtilities;
+
+/**
+ * A log formatter which formats most of the LogRecord fields.
+ */
+public class DetailFormatter extends Formatter {
+
+ private final static SimpleDateFormat formater = new SimpleDateFormat("yyyyMMdd kkmmss.SSS");
+ private String NL = StringUtilities.NL;
+ private Date date = new Date();
+
+ /**
+ * Format the given LogRecord.
+ * @param record the log record to be formatted.
+ * @return a formatted log record
+ */
+ @Override
+ public synchronized String format(LogRecord record) {
+ StringBuffer sb = new StringBuffer();
+
+ String[] array = parseLogRecord(record);
+ String type = array[0];
+ String text = array[1];
+
+ addTimeStamp(record, sb);
+ addClassName(record, sb);
+ addTypeName(record, sb, type);
+ addMethodName(record, sb);
+ addText(record, sb, text);
+ addThrown(record, sb);
+
+ return sb.toString();
+ }
+
+ /**
+ * @param record
+ * @param sb
+ */
+ public void addThrown(LogRecord record, StringBuffer sb) {
+ Throwable thrown = record.getThrown();
+ if (thrown != null) {
+ try {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ thrown.printStackTrace(pw);
+ pw.close();
+ sb.append(sw.toString());
+ }
+ catch (Exception ex) {
+ // do nothing
+ }
+ }
+ }
+
+ /**
+ * @param record
+ * @param sb
+ * @param text
+ */
+ public void addText(LogRecord record, StringBuffer sb, String text) {
+ sb.append(text);
+ sb.append(NL);
+ }
+
+ /**
+ * @param record
+ * @param sb
+ */
+ public void addMethodName(LogRecord record, StringBuffer sb) {
+ sb.append(formatJavaName(record.getSourceMethodName(), 30));
+ sb.append(" ");
+ }
+
+ /**
+ * @param record
+ * @param sb
+ * @param type
+ */
+ public void addTypeName(LogRecord record, StringBuffer sb, String type) {
+ sb.append(type);
+ sb.append(" ");
+ }
+
+ /**
+ * @param record
+ * @param sb
+ */
+ public void addClassName(LogRecord record, StringBuffer sb) {
+ sb.append(formatJavaName(record.getSourceClassName(), 60));
+ sb.append(" ");
+ }
+
+ /**
+ * @param record
+ * @param sb
+ */
+ public void addTimeStamp(LogRecord record, StringBuffer sb) {
+ date.setTime(record.getMillis());
+ sb.append(formater.format(date));
+ sb.append(" ");
+ }
+
+ /**
+ * @param width
+ * @param n
+ * @return string
+ */
+ public String formatJavaName(String n, int width) {
+ String string = (n == null) ? "" : n;
+ return StringUtilities.left(string, width);
+ }
+
+ /**
+ * @param r
+ * @return string
+ */
+ public String[] parseLogRecord(LogRecord r) {
+
+ String type = " ";
+ String text = "";
+
+ String message = r.getMessage();
+ Throwable throwable = r.getThrown();
+ if (message != null) {
+ if (message.startsWith("ENTRY")) {
+ type = "-->";
+ text = formatParameters(r);
+ }
+ else if (message.startsWith("RETURN")) {
+ type = "<--";
+ text = formatParameters(r);
+ }
+ else if ((throwable != null) && ("THROW".equals(message))) {
+ text = "";
+ }
+ else {
+ text = message;
+ }
+ }
+
+ return new String[]{type, text};
+ }
+
+ /**
+ * @param r
+ * @return string
+ */
+ public String formatParameters(LogRecord r) {
+ String string = "";
+ Object[] parameters = r.getParameters();
+ if (parameters != null) {
+ string = ObjectFormatter.format(parameters);
+ }
+ return string;
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/HumanFormatter.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/HumanFormatter.java
new file mode 100644
index 0000000..1991e0f
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/HumanFormatter.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+
+/**
+ * A log formatter which formats a reduced selection of the LogRecord fields.
+ */
+public class HumanFormatter extends DetailFormatter {
+
+ /**
+ * @param record
+ * @param sb
+ */
+ @Override
+ public void addClassName(LogRecord record, StringBuffer sb) {
+ // do nothing
+ }
+
+ /**
+ * @param sb
+ * @param type
+ */
+ @Override
+ public void addTypeName(LogRecord record, StringBuffer sb, String type) {
+
+ int intLevel = record.getLevel().intValue();
+ int intFINER = Level.FINER.intValue();
+
+ if (intLevel <= intFINER) {
+ sb.append(type);
+ sb.append(" ");
+ }
+ }
+
+ /**
+ * @param record
+ * @param sb
+ */
+ @Override
+ public void addMethodName(LogRecord record, StringBuffer sb) {
+
+ int intLevel = record.getLevel().intValue();
+ int intFINER = Level.FINER.intValue();
+
+ if (intLevel <= intFINER) {
+ sb.append(formatJavaName(record.getSourceMethodName(), 30));
+ sb.append(" ");
+ }
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerDumper.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerDumper.java
new file mode 100644
index 0000000..5b5a1dd
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerDumper.java
@@ -0,0 +1,133 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+
+import org.eclipse.paho.client.mqttv3.test.utilities.StringUtilities;
+
+/**
+ * Utility class which dumps the formatters and handlers under a log manager
+ * <p>
+ * The Java Util Logger package does not provide public method to dump its handlers and formatters in a useful way
+ * so the LogDumper class builds up a collection of the handlers using this simple container class which it can then dump
+ * in a human readable way.
+ */
+public class LoggerDumper {
+
+ static final Class<?> cclass = LoggerDumper.class;
+
+ private LoggerNode rootNode = null;
+
+ /**
+ * @throws Exception
+ *
+ */
+ public LoggerDumper() throws Exception {
+ LogManager mgr = LogManager.getLogManager();
+ Enumeration<String> enumeration = mgr.getLoggerNames();
+ while (enumeration.hasMoreElements()) {
+ String name = enumeration.nextElement();
+ Logger log = mgr.getLogger(name);
+ findParentNode(log);
+ }
+ }
+
+ /**
+ * @param logger
+ * @throws Exception
+ */
+ private LoggerNode findParentNode(Logger logger) throws Exception {
+ LoggerNode parentNode = null;
+ Logger parent = logger.getParent();
+
+ if (parent == null) {
+ if (rootNode == null) {
+ parentNode = rootNode = new LoggerNode(null, logger);
+ }
+ else if (rootNode.getLogger() == logger) {
+ parentNode = rootNode;
+ }
+ else {
+ throw new Exception("duplicate root");
+ }
+ }
+ else {
+ parentNode = findParentNode(parent);
+
+ LoggerNode found = null;
+ for (Iterator<LoggerNode> iterator = parentNode.getChildren().iterator(); iterator.hasNext();) {
+ LoggerNode childNode = iterator.next();
+ if (childNode.getLogger() == logger) {
+ found = childNode;
+ break;
+ }
+ }
+ if (found == null) {
+ LoggerNode node = new LoggerNode(parentNode, logger);
+ parentNode.getChildren().add(node);
+ }
+ }
+
+ return parentNode;
+ }
+
+ /**
+ *
+ */
+ public void dump() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("-----------------------------------------------------------------" + StringUtilities.NL);
+ dumpLoggerNode(rootNode, 0, sb);
+ sb.append("-----------------------------------------------------------------");
+ System.out.println(sb.toString());
+ }
+
+ /**
+ * @param node
+ * @param sb
+ */
+ private void dumpLoggerNode(LoggerNode node, int indent, StringBuilder sb) {
+ Logger l = node.getLogger();
+ String padding = StringUtilities.left("", indent * 2);
+
+ sb.append(padding);
+ // sb.append("@" + Integer.toHexString(System.identityHashCode(l)) + " ");
+ sb.append("\"" + l.getName() + "\" ");
+ sb.append(l.getLevel());
+ sb.append(StringUtilities.NL);
+
+ Handler[] handlers = l.getHandlers();
+ for (Handler h : handlers) {
+ Formatter f = h.getFormatter();
+ sb.append(padding);
+ sb.append(" Handler = ");
+ sb.append(StringUtilities.left(h.getClass().getName(), 40));
+ sb.append("Formatter = ");
+ sb.append(StringUtilities.left(f.getClass().getName(), 40));
+ sb.append(StringUtilities.NL);
+ }
+
+ for (Iterator<LoggerNode> iterator = node.getChildren().iterator(); iterator.hasNext();) {
+ LoggerNode child = iterator.next();
+ dumpLoggerNode(child, indent + 1, sb);
+ }
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerNode.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerNode.java
new file mode 100644
index 0000000..e75647b
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggerNode.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+/**
+ * The Java Util Logger package does not provide public method to dump its handlers and formatters in a useful way
+ * so the LogDumper class builds up a collection of the handlers using this simple container class which it can then dump
+ * in a human readable way.
+ */
+public class LoggerNode {
+
+ private LoggerNode parent;
+ private Logger logger;
+ private Set<LoggerNode> children;
+
+ /**
+ * @param p
+ * @param l
+ */
+ public LoggerNode(LoggerNode p, Logger l) {
+ parent = p;
+ logger = l;
+ children = new HashSet<LoggerNode>();
+ }
+
+ /**
+ * @return the parent
+ */
+ public LoggerNode getParent() {
+ return parent;
+ }
+
+ /**
+ * @return the logger
+ */
+ public Logger getLogger() {
+ return logger;
+ }
+
+ /**
+ * @return the children
+ */
+ public Collection<LoggerNode> getChildren() {
+ return children;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return logger.getName();
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggingUtilities.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggingUtilities.java
new file mode 100644
index 0000000..b41b32d
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/LoggingUtilities.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.logging.LogManager;
+import java.util.logging.Logger;
+
+/**
+ * Class containing logging utility methods
+ */
+public class LoggingUtilities {
+
+ /**
+ * Configure logging by loading the logging.properties file
+ */
+ public static final Class<?> cclass = LoggingUtilities.class;
+
+ static {
+ String configClass = System.getProperty("java.util.logging.config.class");
+ String configFile = System.getProperty("java.util.logging.config.file");
+
+ if ((configClass == null) && (configFile == null)) {
+ try {
+ InputStream inputStream = cclass.getClassLoader().getResourceAsStream("logging.properties");
+ LogManager manager = LogManager.getLogManager();
+ manager.readConfiguration(inputStream);
+ inputStream.close();
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * @return logStream
+ */
+ public static PrintStream getPrintStream() {
+ return System.out;
+ }
+
+ /**
+ * Log a banner containing the class and method name
+ *
+ * @param logger
+ * @param clazz
+ * @param methodName
+ */
+ public static void banner(Logger logger, Class<?> clazz, String methodName) {
+ banner(logger, clazz, methodName, null);
+ }
+
+ /**
+ * Log a banner containing the class and method name and text
+ *
+ * @param logger
+ * @param clazz
+ * @param methodName
+ * @param text
+ */
+ public static void banner(Logger logger, Class<?> clazz, String methodName, String text) {
+ String string = clazz.getSimpleName() + "." + methodName;
+ if (text != null) {
+ string += " " + text;
+ }
+
+ logger.info("");
+ logger.info("*************************************************************");
+ logger.info("* " + string);
+ logger.info("*************************************************************");
+ }
+
+ /**
+ * Dump the configuration of the log manager
+ *
+ * @throws Exception
+ */
+ public static void dump() throws Exception {
+ LoggerDumper loggerDumper = new LoggerDumper();
+ loggerDumper.dump();
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ObjectFormatter.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ObjectFormatter.java
new file mode 100644
index 0000000..9d80007
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/ObjectFormatter.java
@@ -0,0 +1,273 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Array;
+
+import org.eclipse.paho.client.mqttv3.test.utilities.StringUtilities;
+import org.xml.sax.Attributes;
+
+/**
+ * Utility class used by the framework logger to format an arbitrary object
+ */
+public class ObjectFormatter {
+
+ private StringBuffer buffer;
+ private int width1;
+ private int width2;
+ private String separator;
+
+ /**
+ * @param width1
+ * @param width2
+ * @param separator
+ */
+ public ObjectFormatter(int width1, int width2, String separator) {
+ buffer = new StringBuffer();
+ this.width1 = width1;
+ this.width2 = width2;
+ this.separator = separator;
+ }
+
+ private void addField(String lquote, String rquote, String name, String value) {
+ buffer.append(StringUtilities.left(name, width1));
+ buffer.append(StringUtilities.left(":", width2));
+ buffer.append(lquote);
+ buffer.append(value);
+ buffer.append(rquote);
+ buffer.append(separator);
+ }
+
+ private void addField(String lquote, String rquote, int name, String value) {
+ buffer.append(StringUtilities.right(Integer.toString(name), width1));
+ buffer.append(StringUtilities.left(":", width2));
+ buffer.append(lquote);
+ buffer.append(value);
+ buffer.append(rquote);
+ buffer.append(separator);
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, int value) {
+ addField("", "", name, Integer.toString(value));
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, long value) {
+ addField("", "", name, Long.toString(value));
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, boolean value) {
+ addField("", "", name, Boolean.toString(value));
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, String value) {
+ addField("'", "'", name, StringUtilities.safeString(value));
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, String[] value) {
+ if (value != null) {
+ buffer.append(StringUtilities.left(name, width1));
+ buffer.append(":");
+ buffer.append(separator);
+ for (int i = 0; i < value.length; i++) {
+ addField("(", ")", i, StringUtilities.safeString(value[i]));
+ }
+ }
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, int[] value) {
+ buffer.append(StringUtilities.left(name, width1));
+ buffer.append(StringUtilities.left(":", width2));
+ buffer.append("(");
+ for (int i = 0; i < value.length; i++) {
+ if (i > 0) {
+ buffer.append(',');
+ }
+ buffer.append(value[i]);
+ }
+ buffer.append(")");
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, byte[] value) {
+ addField("", "", name, StringUtilities.arrayToHexString(value));
+ }
+
+ /**
+ * @param name
+ * @param value
+ */
+ public void add(String name, Object value) {
+ addField("[", "]", name, StringUtilities.safeString(value));
+ }
+
+ /**
+ * @param value
+ * @return this instance
+ */
+ public ObjectFormatter append(String value) {
+ buffer.append(value);
+ buffer.append(separator);
+ return this;
+ }
+
+ /**
+ * @return string
+ */
+ @Override
+ public String toString() {
+ return buffer.toString();
+ }
+
+ /**
+ * @param object
+ * @return string
+ */
+ public static String format(Object object) {
+ StringBuilder sb = new StringBuilder();
+
+ if (object == null) {
+ sb.append("(null)");
+ }
+ else if (object instanceof Attributes) {
+ sb.append(format((Attributes) object));
+ }
+ else if (object instanceof String) {
+ sb.append(format((String) object));
+ }
+ else if (object instanceof StringBuffer) {
+ sb.append(format((StringBuffer) object));
+ }
+ else if (object instanceof StringBuilder) {
+ sb.append(format((StringBuilder) object));
+ }
+ else if (object instanceof Throwable) {
+ sb.append(format((Throwable) object));
+ }
+ else {
+ boolean isArray = object.getClass().isArray();
+ if (isArray) {
+ int arrayLength = Array.getLength(object);
+ if (arrayLength > 1) {
+ sb.append("[");
+ }
+ for (int i = 0; i < arrayLength; i++) {
+ Object element = Array.get(object, i);
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(format(element));
+ }
+ if (arrayLength > 1) {
+ sb.append("]");
+ }
+ }
+ else {
+ sb.append(object);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @param attributes
+ * @return string
+ */
+ public static String format(Attributes attributes) {
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < attributes.getLength(); i++) {
+ if (i > 0) {
+ sb.append(" ");
+ }
+ sb.append("[");
+ String name = attributes.getQName(i);
+ String value = attributes.getValue(i);
+ sb.append(name);
+ sb.append("=\"");
+ sb.append(value);
+ sb.append("\"");
+ sb.append("]");
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * @param text
+ * @return string
+ */
+ public static String format(String text) {
+ return StringUtilities.toJavaString(text);
+ }
+
+ /**
+ * @param sb
+ * @return string
+ */
+ public static String format(StringBuffer sb) {
+ return format(sb.toString());
+ }
+
+ /**
+ * @param sb
+ * @return string
+ */
+ public static String format(StringBuilder sb) {
+ return format(sb.toString());
+ }
+
+ /**
+ * @param t
+ * @return string
+ */
+ public static String format(Throwable t) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw, true);
+ t.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+ return sw.toString();
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/TraceFormatter.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/TraceFormatter.java
new file mode 100644
index 0000000..2eb3dc7
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/logging/TraceFormatter.java
@@ -0,0 +1,135 @@
+/*******************************************************************************
+ * Copyright (c) 2009, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ *******************************************************************************/
+
+package org.eclipse.paho.client.mqttv3.test.logging;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+
+import org.eclipse.paho.client.mqttv3.test.utilities.StringUtilities;
+
+/**
+ * A log formatter which formats the LogRecord fields in a way which is suitable for tracing
+ */
+public class TraceFormatter extends Formatter {
+
+ private final static SimpleDateFormat formater = new SimpleDateFormat("kk:mm:ss.SSS");
+ private String NL = StringUtilities.NL;
+ private Date date = new Date();
+
+ /**
+ *
+ */
+ public TraceFormatter() {
+ System.out.println("");
+ }
+
+ /**
+ * Format the given LogRecord.
+ * @param record the log record to be formatted.
+ * @return a formatted log record
+ */
+ @Override
+ public synchronized String format(LogRecord record) {
+ StringBuffer sb = new StringBuffer();
+
+ String[] array = parseLogRecord(record);
+ String type = array[0];
+ String text = array[1];
+
+ date.setTime(record.getMillis());
+ sb.append(formater.format(date));
+ sb.append(" ");
+
+ sb.append(formatJavaName(record.getSourceClassName(), 60));
+ sb.append(" ");
+ sb.append(type);
+ sb.append(" ");
+ sb.append(formatJavaName(record.getSourceMethodName(), 30));
+ sb.append(" ");
+ sb.append(text);
+ sb.append(NL);
+
+ Throwable thrown = record.getThrown();
+ if (thrown != null) {
+ try {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ thrown.printStackTrace(pw);
+ pw.close();
+ sb.append(sw.toString());
+ }
+ catch (Exception ex) {
+ // do nothing
+ }
+ }
+
+ return sb.toString();
+ }
+
+ /**
+ * @param width
+ * @param n
+ * @return string
+ */
+ private String formatJavaName(String n, int width) {
+ String string = (n == null) ? "" : n;
+ return StringUtilities.left(string, width);
+ }
+
+ /**
+ * @param r
+ * @return string
+ */
+ public String[] parseLogRecord(LogRecord r) {
+
+ String string = " ";
+ String text = "";
+
+ String message = r.getMessage();
+ if (message != null) {
+ if (message.startsWith("ENTRY")) {
+ string = "-->";
+ text = formatParameters(r);
+ }
+ else if (message.startsWith("RETURN")) {
+ string = "<--";
+ text = formatParameters(r);
+ }
+ else {
+ text = message;
+ }
+ }
+
+ return new String[]{string, text};
+ }
+
+ /**
+ * @param r
+ * @return string
+ */
+ public String formatParameters(LogRecord r) {
+ String string = "";
+ Object[] parameters = r.getParameters();
+ if (parameters != null) {
+ string = ObjectFormatter.format(parameters);
+ }
+ return string;
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/Utility.java b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/Utility.java
index 4267e9d..4873cb9 100644
--- a/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/Utility.java
+++ b/org.eclipse.paho.client.mqttv3/src/test/java/org/eclipse/paho/client/mqttv3/test/utilities/Utility.java
@@ -95,7 +95,7 @@ public class Utility {
try {
if (handler == null) {
handler = new FileHandler("framework.log", true);
- handler.setFormatter(new org.eclipse.paho.client.mqttv3.test.log.HumanFormatter());
+ handler.setFormatter(new org.eclipse.paho.client.mqttv3.test.logging.HumanFormatter());
}
}
catch (IOException exception) {
diff --git a/org.eclipse.paho.client.mqttv3/src/test/resources/logging.properties b/org.eclipse.paho.client.mqttv3/src/test/resources/logging.properties
index eebf677..1d66cc3 100644
--- a/org.eclipse.paho.client.mqttv3/src/test/resources/logging.properties
+++ b/org.eclipse.paho.client.mqttv3/src/test/resources/logging.properties
@@ -4,7 +4,7 @@
# ***********************************************************************
# The set of handlers to be loaded upon startup.
-handlers=java.util.logging.FileHandler, org.eclipse.paho.client.mqttv3.test.log.ConsoleHandler
+handlers=java.util.logging.FileHandler, org.eclipse.paho.client.mqttv3.test.logging.ConsoleHandler
# Default global logging level.
.level=INFO
@@ -20,13 +20,13 @@ utility.level=ALL
# -----------------------------------------
# --- ConsoleHandler ---
-org.eclipse.paho.client.mqttv3.test.log.ConsoleHandler.level=INFO
-org.eclipse.paho.client.mqttv3.test.log.ConsoleHandler.formatter=org.eclipse.paho.client.mqttv3.test.log.HumanFormatter
+org.eclipse.paho.client.mqttv3.test.logging.ConsoleHandler.level=INFO
+org.eclipse.paho.client.mqttv3.test.logging.ConsoleHandler.formatter=org.eclipse.paho.client.mqttv3.test.logging.HumanFormatter
# --- FileHandler ---
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=framework.log
-java.util.logging.FileHandler.formatter=org.eclipse.paho.client.mqttv3.test.log.DetailFormatter
+java.util.logging.FileHandler.formatter=org.eclipse.paho.client.mqttv3.test.logging.DetailFormatter
java.util.logging.FileHandler.append=false