Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'apps/MQTTSN-UDP-Client/samples/MqttsSampleConsole.java')
-rw-r--r--apps/MQTTSN-UDP-Client/samples/MqttsSampleConsole.java518
1 files changed, 518 insertions, 0 deletions
diff --git a/apps/MQTTSN-UDP-Client/samples/MqttsSampleConsole.java b/apps/MQTTSN-UDP-Client/samples/MqttsSampleConsole.java
new file mode 100644
index 0000000..1ab75cb
--- /dev/null
+++ b/apps/MQTTSN-UDP-Client/samples/MqttsSampleConsole.java
@@ -0,0 +1,518 @@
+/*******************************************************************************
+ * Copyright (c) 2010, 2013 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and Eclipse Distribution License v1.0 which accompany this distribution.
+ *
+ * The Eclipse Public License is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ * and the Eclipse Distribution License is available at
+ * http://www.eclipse.org/org/documents/edl-v10.php.
+ *
+ * Contributors:
+ * Ian Craggs - initial API and implementation and/or initial documentation
+ *******************************************************************************/
+
+/*
+ * Sample application for demonstrating how to use the java MQTT-S client
+ *
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+
+import org.eclipse.paho.mqttsn.udpclient.*;
+import org.eclipse.paho.mqttsn.udpclient.exceptions.MqttsException;
+import org.eclipse.paho.mqttsn.udpclient.utils.*;
+
+
+public class MqttsSampleConsole implements MqttsCallback {
+
+ private MqttsClient mqClient; // client
+
+ protected String server; // name of server hosting the broker
+ protected int port; // broker's port
+ protected String mqttsClientId; // client id
+ private boolean mqttsCleanStart=false;
+ private short mqttsKeepAliveDuration = 600; // seconds
+
+ private int maxMqttsMsgLength; //bytes
+ private int minMqttsMsgLength; //bytes
+ private int maxRetries;
+ private int ackTime; //seconds
+
+ protected boolean connected; // true if connected to a broker
+ protected Hashtable<Integer, String> topicTable;
+ private String tName;
+
+ private boolean pubFlag; //indicates a pub has to be sent when REGACK is received
+ private String pubTopic;
+ private byte[] pubMsg;
+ private int pubQos;
+ private boolean pubRetained;
+
+ private boolean autoReconnect=false;
+
+ /*
+ * Constructor
+ * initialize fields and connect to broker
+ */
+
+ public MqttsSampleConsole(String server, int port, String clientId, boolean cleanStart,
+ int maxMqttsMsgLength, int minMqttsMsgLength,
+ int maxRetries, int ackWaitingTime, boolean autoReconnect) {
+
+ this.topicTable = new Hashtable<Integer, String>();
+ this.pubFlag = false; this.pubTopic = null;
+ this.server = server;
+ this.port = port;
+ this.mqttsClientId = clientId;
+ this.mqttsCleanStart= cleanStart;
+
+ this.maxMqttsMsgLength= maxMqttsMsgLength;
+ this.minMqttsMsgLength= minMqttsMsgLength;
+ this.maxRetries= maxRetries;
+ this.ackTime= ackWaitingTime;
+
+ this.autoReconnect=autoReconnect;
+
+ this.connected = false;
+
+ mqClient = new MqttsClient (this.server ,this.port,
+ this.maxMqttsMsgLength, this.minMqttsMsgLength,
+ this.maxRetries, this.ackTime, this.autoReconnect);
+ mqClient.registerHandler(this);
+
+ System.out.print("** mqtts java client version "+
+ MqttsClient.version + " started, ");
+ if (autoReconnect) System.out.println("autoreconnect= true");
+ else System.out.println("autoreconnect= false");
+ System.out.println("");
+
+ connect();
+
+ }
+
+ public static void main(String[] args) {
+ String srv = "localhost"; // default gateway
+ int port = 20000; // default port
+ String clientId = "mqtts_console_" + System.currentTimeMillis(); // default client id
+ boolean cleanStart=false;
+
+ int maxMqttsMsgLength=60;
+ int minMqttsMsgLength=2;
+ int maxRetries=2;
+ int ackTime=3;
+ boolean autoReconnect=true;
+ // parse command line arguments -s server -p port -id clientId
+ // and overwrite default values if present
+ int i = 0;
+ String arg;
+ while (i < args.length && args[i].startsWith("-")) {
+ arg = args[i++];
+ if (arg.equals("-s")) {
+ srv = args[i++];
+ }
+ if (arg.equals("-p")) {
+ port = Integer.parseInt(args[i++]);
+ }
+ if (arg.equals("-id")) {
+ clientId = args[i++];
+ }
+ if (arg.equals("-cs")) {
+ int cs=Integer.parseInt(args[i++]);
+ if(cs==0) cleanStart=false; else cleanStart=true;
+ }
+ if (arg.equals("-log")) {
+ try {
+ ClientLogger.setLogFile(args[i++]);
+ } catch (MqttsException e) {
+ e.printStackTrace();
+ }
+ }
+ if (arg.equals("-level")) {
+ ClientLogger.setLogLevel(Integer.parseInt(args[i++]));
+ }
+ if (arg.equals("-auto")) {
+ if (args[i++].equals("0")) autoReconnect=false;
+ else autoReconnect=true;
+ }
+ }
+
+ System.out.println("");
+ System.out.println("** Starting MQTT-S console ... ");
+ // create console and launch the thread
+ MqttsSampleConsole console = new MqttsSampleConsole(srv,port,clientId,cleanStart,
+ maxMqttsMsgLength,minMqttsMsgLength,maxRetries,ackTime,autoReconnect);
+ console.run();
+ }
+
+ public void run() {
+ while (true) {
+ //System.out.println("");
+
+ // read command line from system.in
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line="";
+ try {
+ line = in.readLine();
+ }
+ catch (IOException e) { // that should never happen
+ System.out.println("** IOException: " + e);
+ System.exit(0);
+ }
+
+ StringTokenizer st = new StringTokenizer(line);
+ String token[]= new String[10];
+ int i=0;
+ while (st.hasMoreTokens()) {
+ token[i] = st.nextToken(); i++;
+ }
+
+ if (i == 0) {
+ printHelp();
+ continue;
+ }
+
+ if (token[0].equals("exit")) terminate();
+ if (token[0].equals("t")) terminate();
+ if (token[0].equals("help")) printHelp();
+ if (token[0].equals("h")) printHelp();
+ if (token[0].equals("d")) disconnect();
+ if (token[0].equals("print")) printTopicTable();
+ if (token[0].equals("c")) {
+ if (!connected) {
+ connect();
+ } else {
+ System.out.println("** already connected to " + server +
+ " as " + mqttsClientId);
+ }
+ }
+
+
+ if (token[0].equals("s") ) {
+ if (connected) {
+ if (token[1] != null) {
+ subscribe(token[1]);
+ } else {
+ System.out.println(">> error: missing topic");
+ }
+ continue;
+ } else {
+ System.out.println(">> disconnected, subscribe not possible!");
+ }
+ }
+
+ if (token[0].equals("u")) {
+ if (connected) {
+ if (token[1] != null) {
+ unsubscribe(token[1]);
+ } else {
+ System.out.println(">> error: missing topic");
+ }
+ continue;
+ } else {
+ System.out.println(">> disconnected, unsubscribe not possible!");
+ }
+ }
+
+ if (token[0].equals("r")) {
+ if (connected) {
+ if (token[1] != null) {
+ register(token[1]);
+ } else {
+ System.out.println(">> error: missing topic");
+ }
+ continue;
+ } else {
+ System.out.println(">> disconnected, register not possible!");
+ }
+ }
+
+
+ if (token[0].equals("p")) {
+ if (connected) {
+ if (token[1]!=null && token[2]!=null) {
+ boolean retained=false;
+ if (token[3] == null) {
+ publish(token[1],token[2],0,retained);
+ } else {
+ publish(token[1],token[2],1,retained);
+ }
+ } else {
+ System.out.println(">> error, pub format is \"p topic msg \"");
+ }
+ }
+ else System.out.println(">> disconnected, publish not possible!");
+ }
+ } //end while
+ } //end run method
+
+ public void connect() {
+ try {
+ if (mqClient == null) {
+ System.out.println("** Starting MQTTS-S java client version "+
+ MqttsClient.version);
+ mqClient = new MqttsClient (this.server ,this.port,
+ maxMqttsMsgLength, minMqttsMsgLength, maxRetries,
+ ackTime);
+ mqClient.registerHandler(this);
+ }
+ // cleanStart= false;
+ //mqClient.connect(this.mqttsClientId,mqttsCleanStart,mqttsKeepAliveDuration);
+ mqClient.connect(this.mqttsClientId,mqttsCleanStart,mqttsKeepAliveDuration,
+ "down",1,this.mqttsClientId,true);
+ } catch (Exception e){
+ connected = false;
+ System.out.println("** connection to " + server + " failed!");
+ System.out.println("** exception: " + e);
+ //System.out.println("Exiting ... ");
+ //System.exit(0);
+ }
+ }
+
+ public void register(String topicName) {
+ mqClient.register(topicName);
+ this.tName = topicName;
+ }
+
+ public void disconnect() {
+ System.out.println("** disconnecting...");
+ if (connected) {
+ try {
+ mqClient.disconnect();
+ //System.out.println("** connection to gateway closed");
+ connected = false;
+ }
+ catch (Exception e) {
+ System.out.println("** cannot disconnect, exception: " + e);
+ }
+ } else {
+ System.out.println("** already disconnected ... ");
+ }
+ }
+
+ public boolean publish(String topic, String msg, int qos, boolean retained) {
+ byte[] message = msg.getBytes();
+
+ return publish(topic, message, qos, retained);
+ }
+
+ public boolean publish(String topic, byte[] msg, int qos, boolean retained) {
+
+ boolean retVal = false;
+
+ Iterator<Integer> iter = topicTable.keySet().iterator();
+ Iterator<String> iterVal = topicTable.values().iterator();
+ Integer ret = new Integer(-1);
+ while (iter.hasNext()) { //check whether topic is in topicTable
+ Integer topicId = (Integer)iter.next();
+ String tname = (String)iterVal.next();
+ if(tname.equals(topic)) {
+ ret = topicId;
+ break;
+ }
+ }
+ int topicID = ret.intValue();
+ if (topicID == -1) { //topic not in topicTable, have to register it
+ register(topic);
+ pubFlag = true; //set the flag and wait for REG ACK
+ pubTopic= topic; //store the values for later publish
+ pubMsg = msg;
+ pubQos = qos;
+ pubRetained = retained;
+ //System.out.println("** topic not in table, have to register it first");
+ } else {
+ try {
+ retVal = mqClient.publish(topicID, msg, qos, retained);
+ //System.out.println("** published: \"" + topic + ": " +
+ // Utils.hexString(msg) + "\"");
+ } catch (Exception e) {
+ System.out.println("** publish exception: " + e);
+ }
+ }
+ return retVal;
+ }
+
+ public void subscribe(String topic) {
+ if (topic != null) {
+ try {
+ mqClient.subscribe(topic,1,0); //topic name
+ }
+ catch (Exception e) {
+ System.out.println("** sub exception: " + e);
+ }
+ }
+ else {
+ System.out.println("** sub error: topic missing!");
+ }
+
+ this.tName = topic;
+
+ }
+
+ public void unsubscribe(String topic) {
+ if (topic != null) {
+ try {
+ mqClient.unSubscribe(topic,0); //topic name
+ }
+ catch (Exception e) {
+ System.out.println("** unsub exception: " + e);
+ }
+ }
+ else {
+ System.out.println("** unsub error: topic missing!");
+ }
+ this.tName = topic;
+ }
+
+ public void terminate() {
+ disconnect();
+ if (mqClient != null) {
+ mqClient.terminate();
+ mqClient = null;
+ System.out.println("** client terminated!");
+ }
+ System.out.println("** exiting ...");
+ System.exit(0);
+ }
+
+ // callback: publishArrived
+ public int publishArrived(boolean dup, int qos, int topicId, byte[] msg ){
+
+ DateFormat df = new SimpleDateFormat("dd.MM HH:mm:ss.SSS");
+ String topic = (String)topicTable.get(new Integer(topicId));
+ //String message = new String(msg);
+
+ if (topic == null) {
+ System.out.println("** PUB with invalid topic id " +topicId +" rejected ...");
+ return 2; //return invalid topic id
+ }
+
+ System.out.print(df.format(new Date()) + " " + topic +": ");
+ //System.out.println(df.format(new Date()) +" >>> " + topic + " msg= " + Utils.hexString(msg));
+ System.out.println(Utils.hexString(msg));
+
+ return 0;
+ }
+
+ private void printHelp() {
+ System.out.println("");
+ System.out.println("Type c for connect, d for disconnect, " +
+ "p for publish, s for subscribe, u for unsubscribe, " +
+ "and t for terminate.");
+ System.out.println("");
+ }
+
+ //call back: CONNECT is sent to broker, waiting for CONNACK
+ public void connectSent() {
+ System.out.println("** CONNECT sent to " + server +" ...");
+ }
+
+ //call back: CONNACK received
+ public void connected() {
+ connected = true;
+ System.out.println("** connected to " + server + ":" + port + " as " + mqttsClientId);
+ }
+
+ //call back: DISCONNECT received
+ public void disconnected(int returnType) {
+ connected= false;
+ switch(returnType) {
+ case MqttsCallback.MQTTS_OK:
+ System.out.println("** disconnected");
+ break;
+ case MqttsCallback.MQTTS_LOST_GATEWAY:
+ System.out.println("** disconnected, no answer from gateway/broker!");
+ break;
+ default:
+ System.out.println("** disconnected, unknown cause= " + returnType);
+ }
+ }
+
+ //call back: PUBACK received
+ public void pubAckReceived(int topicId, int returnCode) {
+ if (returnCode != 0) {
+ System.out.println("** WARNING: puback received with rc="+returnCode);
+ topicTable.clear();
+ } else {
+ System.out.println("** puback received.");
+ }
+ }
+
+ //call back: PUBCOMP received
+ public void pubCompReceived() {
+ System.out.println("** pubcomp received.");
+
+ }
+
+ //callback: REGACK received
+ public void regAckReceived(int topicId, int returnCode) {
+
+ // System.out.println("** registered: topic= " + tName +
+ // " topicId= "+ topicId + " rc= " + returnCode);
+ topicTable.put(new Integer(topicId), tName);
+ tName=null;
+
+ if (pubFlag) {
+ publish(pubTopic, pubMsg, pubQos, pubRetained);
+ pubFlag = false;
+ }
+
+ }
+
+ //callback: REGISTER received
+ public void registerReceived(int topicId, String topicName) {
+ // System.out.println("** REG received: topic= " + topicName +
+ // " topicId= "+ topicId);
+ topicTable.put(new Integer (topicId), topicName);
+ }
+
+ //callback: SUBACK received
+ public void subackReceived(int grantedQos, int topicId, int rc) {
+ if (rc == 0) {
+ System.out.println("** subscribed: topic= " + tName + " qos= "+
+ grantedQos+ " topicId= " +topicId + " rc= " + rc);
+ topicTable.put(new Integer(topicId), tName);
+ tName=null;
+ } else {
+ System.err.println("** SUB rejected: topic= " + tName + " qos= "+
+ grantedQos+ " topicId= " +topicId + " rc= " + rc);
+ }
+ }
+
+ //callback: UNSUBACK received
+ public void unsubackReceived() {
+ System.out.println("** Unsubscribed: topic= "+this.tName);
+ Iterator<Integer> iter = topicTable.keySet().iterator();
+ Iterator<String> iterVal = topicTable.values().iterator();
+ while (iter.hasNext()) {
+ Integer topicId = (Integer)iter.next();
+ String topic = (String)iterVal.next();
+ if (this.tName.equals(topic)) {
+ topicTable.remove(topicId);
+ break;
+ }
+ }
+
+ }
+
+ public void printTopicTable() {
+ Iterator<Integer> iter = topicTable.keySet().iterator();
+ Iterator<String> iterVal = topicTable.values().iterator();
+ int n = 0;
+ System.out.println("");
+ while (iter.hasNext()) {
+ Integer topicId = (Integer)iter.next();
+ String topicName = (String)iterVal.next();
+ System.out.println("n= " + ++n + ", topicId= "+ topicId +
+ ", topic= " + topicName);
+ }
+ }
+
+}

Back to the top