Skip to main content
aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndy Piper2014-02-13 01:16:09 +0000
committerAndy Piper2014-02-13 01:16:09 +0000
commitad94df19379cd0ef86f800d4d3225324ec7e8de8 (patch)
tree29ba6a118eef084c7bc4ea17f915f8b51125c9bd
downloadorg.eclipse.paho.mqtt.objc-ad94df19379cd0ef86f800d4d3225324ec7e8de8.tar.gz
org.eclipse.paho.mqtt.objc-ad94df19379cd0ef86f800d4d3225324ec7e8de8.tar.xz
org.eclipse.paho.mqtt.objc-ad94df19379cd0ef86f800d4d3225324ec7e8de8.zip
initial import
-rw-r--r--README.md23
-rw-r--r--src/MqttSDK/MQTTDecoder.h62
-rw-r--r--src/MqttSDK/MQTTDecoder.m147
-rw-r--r--src/MqttSDK/MQTTEncoder.h59
-rw-r--r--src/MqttSDK/MQTTEncoder.m161
-rw-r--r--src/MqttSDK/MQTTMessage.h104
-rw-r--r--src/MqttSDK/MQTTMessage.m268
-rw-r--r--src/MqttSDK/MQTTSession.h148
-rw-r--r--src/MqttSDK/MQTTSession.m630
-rw-r--r--src/MqttSDK/MQttTxFlow.h36
-rw-r--r--src/MqttSDK/MQttTxFlow.m50
11 files changed, 1688 insertions, 0 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b0a098b
--- /dev/null
+++ b/README.md
@@ -0,0 +1,23 @@
+mqttIO-iOS-SDK
+==============
+
+iOS SDK for MQTT
+
+Copyright © 2011, 2013 2lemetry, LLC
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that
+the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
+STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/MqttSDK/MQTTDecoder.h b/src/MqttSDK/MQTTDecoder.h
new file mode 100644
index 0000000..aaeb232
--- /dev/null
+++ b/src/MqttSDK/MQTTDecoder.h
@@ -0,0 +1,62 @@
+//
+// MQTTDecoder.h
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import <Foundation/Foundation.h>
+#import "MQTTMessage.h"
+
+@interface MQTTDecoder : NSObject <NSStreamDelegate> {
+ NSInteger status;
+ NSInputStream* stream;
+ NSRunLoop* runLoop;
+ NSString* runLoopMode;
+ id delegate;
+ UInt8 header;
+ UInt32 length;
+ UInt32 lengthMultiplier;
+ NSMutableData* dataBuffer;
+}
+
+typedef enum {
+ MQTTDecoderEventProtocolError,
+ MQTTDecoderEventConnectionClosed,
+ MQTTDecoderEventConnectionError
+} MQTTDecoderEvent;
+
+enum {
+ MQTTDecoderStatusInitializing,
+ MQTTDecoderStatusDecodingHeader,
+ MQTTDecoderStatusDecodingLength,
+ MQTTDecoderStatusDecodingData,
+ MQTTDecoderStatusConnectionClosed,
+ MQTTDecoderStatusConnectionError,
+ MQTTDecoderStatusProtocolError
+};
+
+- (id)initWithStream:(NSInputStream*)aStream
+ runLoop:(NSRunLoop*)aRunLoop
+ runLoopMode:(NSString*)aMode;
+- (void)setDelegate:(id)aDelegate;
+- (void)open;
+- (void)close;
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode;
+@end
+
+@interface NSObject (MQTTDecoderDelegate)
+- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg;
+- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode;
+
+@end
diff --git a/src/MqttSDK/MQTTDecoder.m b/src/MqttSDK/MQTTDecoder.m
new file mode 100644
index 0000000..ea12f57
--- /dev/null
+++ b/src/MqttSDK/MQTTDecoder.m
@@ -0,0 +1,147 @@
+//
+// MQTTDecoder.m
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import "MQTTDecoder.h"
+
+@implementation MQTTDecoder
+
+- (id)initWithStream:(NSInputStream*)aStream
+ runLoop:(NSRunLoop*)aRunLoop
+ runLoopMode:(NSString*)aMode {
+ status = MQTTDecoderStatusInitializing;
+ stream = aStream;
+ [stream setDelegate:self];
+ runLoop = aRunLoop;
+ runLoopMode = aMode;
+ return self;
+}
+
+- (void)setDelegate:(id)aDelegate {
+ delegate = aDelegate;
+}
+
+- (void)open {
+ [stream setDelegate:self];
+ [stream scheduleInRunLoop:runLoop forMode:runLoopMode];
+ [stream open];
+}
+
+- (void)close {
+ [stream setDelegate:nil];
+ [stream close];
+ [stream removeFromRunLoop:runLoop forMode:runLoopMode];
+ stream = nil;
+}
+
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
+ if(stream == nil)
+ return;
+ switch (eventCode) {
+ case NSStreamEventOpenCompleted:
+ status = MQTTDecoderStatusDecodingHeader;
+ break;
+ case NSStreamEventHasBytesAvailable:
+ if (status == MQTTDecoderStatusDecodingHeader) {
+ NSInteger n = [stream read:&header maxLength:1];
+ if (n == -1) {
+ status = MQTTDecoderStatusConnectionError;
+ [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+ }
+ else if (n == 1) {
+ length = 0;
+ lengthMultiplier = 1;
+ status = MQTTDecoderStatusDecodingLength;
+ }
+ }
+ while (status == MQTTDecoderStatusDecodingLength) {
+ UInt8 digit;
+ NSInteger n = [stream read:&digit maxLength:1];
+ if (n == -1) {
+ status = MQTTDecoderStatusConnectionError;
+ [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+ break;
+ }
+ else if (n == 0) {
+ break;
+ }
+ length += (digit & 0x7f) * lengthMultiplier;
+ if ((digit & 0x80) == 0x00) {
+ dataBuffer = [NSMutableData dataWithCapacity:length];
+ status = MQTTDecoderStatusDecodingData;
+ }
+ else {
+ lengthMultiplier *= 128;
+ }
+ }
+ if (status == MQTTDecoderStatusDecodingData) {
+ if (length > 0) {
+ NSInteger n, toRead;
+ UInt8 buffer[768];
+ toRead = length - [dataBuffer length];
+ if (toRead > sizeof buffer) {
+ toRead = sizeof buffer;
+ }
+ n = [stream read:buffer maxLength:toRead];
+ if (n == -1) {
+ status = MQTTDecoderStatusConnectionError;
+ [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+ }
+ else {
+ [dataBuffer appendBytes:buffer length:n];
+ }
+ }
+ if ([dataBuffer length] == length) {
+ MQTTMessage* msg;
+ UInt8 type, qos;
+ BOOL isDuplicate, retainFlag;
+ type = (header >> 4) & 0x0f;
+ isDuplicate = NO;
+ if ((header & 0x08) == 0x08) {
+ isDuplicate = YES;
+ }
+ // XXX qos > 2
+ qos = (header >> 1) & 0x03;
+ retainFlag = NO;
+ if ((header & 0x01) == 0x01) {
+ retainFlag = YES;
+ }
+ msg = [[MQTTMessage alloc] initWithType:type
+ qos:qos
+ retainFlag:retainFlag
+ dupFlag:isDuplicate
+ data:dataBuffer];
+ [delegate decoder:self newMessage:msg];
+ dataBuffer = NULL;
+ status = MQTTDecoderStatusDecodingHeader;
+ }
+ }
+ break;
+ case NSStreamEventEndEncountered:
+ status = MQTTDecoderStatusConnectionClosed;
+ [delegate decoder:self handleEvent:MQTTDecoderEventConnectionClosed];
+ break;
+ case NSStreamEventErrorOccurred:
+ status = MQTTDecoderStatusConnectionError;
+ [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError];
+ break;
+ default:
+ NSLog(@"unhandled event code");
+ break;
+ }
+}
+
+@end
diff --git a/src/MqttSDK/MQTTEncoder.h b/src/MqttSDK/MQTTEncoder.h
new file mode 100644
index 0000000..e063fbd
--- /dev/null
+++ b/src/MqttSDK/MQTTEncoder.h
@@ -0,0 +1,59 @@
+//
+// MQTTEncoder.h
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import <Foundation/Foundation.h>
+#import "MQTTMessage.h"
+
+@interface MQTTEncoder : NSObject <NSStreamDelegate> {
+ NSInteger status;
+ NSOutputStream* stream;
+ NSRunLoop* runLoop;
+ NSString* runLoopMode;
+ NSMutableData* buffer;
+ NSInteger byteIndex;
+ id delegate;
+}
+
+typedef enum {
+ MQTTEncoderEventReady,
+ MQTTEncoderEventErrorOccurred
+} MQTTEncoderEvent;
+
+typedef enum {
+ MQTTEncoderStatusInitializing,
+ MQTTEncoderStatusReady,
+ MQTTEncoderStatusSending,
+ MQTTEncoderStatusEndEncountered,
+ MQTTEncoderStatusError
+} MQTTEncoderStatus;
+
+- (id)initWithStream:(NSOutputStream*)aStream
+ runLoop:(NSRunLoop*)aRunLoop
+ runLoopMode:(NSString*)aMode;
+- (void)setDelegate:(id)aDelegate;
+- (void)open;
+- (void)close;
+- (MQTTEncoderStatus)status;
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode;
+- (void)encodeMessage:(MQTTMessage*)msg;
+
+@end
+
+@interface NSObject (MQTTEncoderDelegate)
+- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent)eventCode;
+
+@end
diff --git a/src/MqttSDK/MQTTEncoder.m b/src/MqttSDK/MQTTEncoder.m
new file mode 100644
index 0000000..c99e0c2
--- /dev/null
+++ b/src/MqttSDK/MQTTEncoder.m
@@ -0,0 +1,161 @@
+//
+// MQTTEncoder.m
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import "MQTTEncoder.h"
+
+@implementation MQTTEncoder
+
+- (id)initWithStream:(NSOutputStream*)aStream
+ runLoop:(NSRunLoop*)aRunLoop
+ runLoopMode:(NSString*)aMode {
+ status = MQTTEncoderStatusInitializing;
+ stream = aStream;
+ [stream setDelegate:self];
+ runLoop = aRunLoop;
+ runLoopMode = aMode;
+ return self;
+}
+
+- (void)setDelegate:(id)aDelegate {
+ delegate = aDelegate;
+}
+
+- (MQTTEncoderStatus)status {
+ return status;
+}
+
+- (void)open {
+ [stream setDelegate:self];
+ [stream scheduleInRunLoop:runLoop forMode:runLoopMode];
+ [stream open];
+}
+
+- (void)close {
+ [stream close];
+ [stream setDelegate:nil];
+ [stream removeFromRunLoop:runLoop forMode:runLoopMode];
+ stream = nil;
+}
+
+- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode {
+ if(stream == nil)
+ return;
+ assert(sender == stream);
+ switch (eventCode) {
+ case NSStreamEventOpenCompleted:
+ break;
+ case NSStreamEventHasSpaceAvailable:
+ if (status == MQTTEncoderStatusInitializing) {
+ status = MQTTEncoderStatusReady;
+ [delegate encoder:self handleEvent:MQTTEncoderEventReady];
+ }
+ else if (status == MQTTEncoderStatusReady) {
+ [delegate encoder:self handleEvent:MQTTEncoderEventReady];
+ }
+ else if (status == MQTTEncoderStatusSending) {
+ UInt8* ptr;
+ NSInteger n, length;
+
+ ptr = (UInt8*) [buffer bytes] + byteIndex;
+ // Number of bytes pending for transfer
+ length = [buffer length] - byteIndex;
+ n = [stream write:ptr maxLength:length];
+ if (n == -1) {
+ status = MQTTEncoderStatusError;
+ [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred];
+ }
+ else if (n < length) {
+ byteIndex += n;
+ }
+ else {
+ buffer = NULL;
+ byteIndex = 0;
+ status = MQTTEncoderStatusReady;
+ }
+ }
+ break;
+ case NSStreamEventErrorOccurred:
+ case NSStreamEventEndEncountered:
+ if (status != MQTTEncoderStatusError) {
+ status = MQTTEncoderStatusError;
+ [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred];
+ }
+ break;
+ default:
+ NSLog(@"Oops, event code not handled: 0x%02x", eventCode);
+ break;
+ }
+}
+
+- (void)encodeMessage:(MQTTMessage*)msg {
+ UInt8 header;
+ NSInteger n, length;
+
+ if (status != MQTTEncoderStatusReady) {
+ NSLog(@"Encoder not ready");
+ return;
+ }
+
+ assert (buffer == NULL);
+ assert (byteIndex == 0);
+
+ buffer = [[NSMutableData alloc] init];
+
+ // encode fixed header
+ header = [msg type] << 4;
+ if ([msg isDuplicate]) {
+ header |= 0x08;
+ }
+ header |= [msg qos] << 1;
+ if ([msg retainFlag]) {
+ header |= 0x01;
+ }
+ [buffer appendBytes:&header length:1];
+
+ // encode remaining length
+ length = [[msg data] length];
+ do {
+ UInt8 digit = length % 128;
+ length /= 128;
+ if (length > 0) {
+ digit |= 0x80;
+ }
+ [buffer appendBytes:&digit length:1];
+ }
+ while (length > 0);
+
+ // encode message data
+ if ([msg data] != NULL) {
+ [buffer appendData:[msg data]];
+ }
+
+ n = [stream write:[buffer bytes] maxLength:[buffer length]];
+ if (n == -1) {
+ status = MQTTEncoderStatusError;
+ [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred];
+ }
+ else if (n < [buffer length]) {
+ byteIndex += n;
+ status = MQTTEncoderStatusSending;
+ }
+ else {
+ buffer = NULL;
+ // XXX [delegate encoder:self handleEvent:MQTTEncoderEventReady];
+ }
+}
+
+@end
diff --git a/src/MqttSDK/MQTTMessage.h b/src/MqttSDK/MQTTMessage.h
new file mode 100644
index 0000000..35f98bd
--- /dev/null
+++ b/src/MqttSDK/MQTTMessage.h
@@ -0,0 +1,104 @@
+//
+// MQTTMessage.h
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import <Foundation/Foundation.h>
+
+@interface MQTTMessage : NSObject {
+ UInt8 type;
+ UInt8 qos;
+ BOOL retainFlag;
+ BOOL dupFlag;
+}
+
+enum {
+ MQTTConnect = 1,
+ MQTTConnack = 2,
+ MQTTPublish = 3,
+ MQTTPuback = 4,
+ MQTTPubrec = 5,
+ MQTTPubrel = 6,
+ MQTTPubcomp = 7,
+ MQTTSubscribe = 8,
+ MQTTSuback = 9,
+ MQTTUnsubscribe = 10,
+ MQTTUnsuback = 11,
+ MQTTPingreq = 12,
+ MQTTPingresp = 13,
+ MQTTDisconnect = 14
+};
+
+// instance methods
++ (id)connectMessageWithClientId:(NSString*)clientId
+ userName:(NSString*)userName
+ password:(NSString*)password
+ keepAlive:(NSInteger)keeplive
+ cleanSession:(BOOL)cleanSessionFlag;
++ (id)connectMessageWithClientId:(NSString*)clientId
+ userName:(NSString*)userName
+ password:(NSString*)password
+ keepAlive:(NSInteger)keeplive
+ cleanSession:(BOOL)cleanSessionFlag
+ willTopic:(NSString*)willTopic
+ willMsg:(NSData*)willData
+ willQoS:(UInt8)willQoS
+ willRetain:(BOOL)willRetainFlag;
+
++ (id)pingreqMessage;
++ (id)subscribeMessageWithMessageId:(UInt16)msgId
+ topic:(NSString*)topic
+ qos:(UInt8)qos;
++ (id)unsubscribeMessageWithMessageId:(UInt16)msgId
+ topic:(NSString*)topic;
++ (id)publishMessageWithData:(NSData*)payload
+ onTopic:(NSString*)theTopic
+ retainFlag:(BOOL)retain;
++ (id)publishMessageWithData:(NSData*)payload
+ onTopic:(NSString*)topic
+ qos:(UInt8)qosLevel
+ msgId:(UInt16)msgId
+ retainFlag:(BOOL)retain
+ dupFlag:(BOOL)dup;
++ (id)pubackMessageWithMessageId:(UInt16)msgId;
++ (id)pubrecMessageWithMessageId:(UInt16)msgId;
++ (id)pubrelMessageWithMessageId:(UInt16)msgId;
++ (id)pubcompMessageWithMessageId:(UInt16)msgId;
+
+- (id)initWithType:(UInt8)aType;
+- (id)initWithType:(UInt8)aType data:(NSData*)aData;
+- (id)initWithType:(UInt8)aType
+ qos:(UInt8)aQos
+ data:(NSData*)aData;
+- (id)initWithType:(UInt8)aType
+ qos:(UInt8)aQos
+ retainFlag:(BOOL)aRetainFlag
+ dupFlag:(BOOL)aDupFlag
+ data:(NSData*)aData;
+- (void)setDupFlag;
+- (UInt8)type;
+- (UInt8)qos;
+- (BOOL)retainFlag;
+- (BOOL)isDuplicate;
+@property (strong,nonatomic) NSData * data;
+
+@end
+
+@interface NSMutableData (MQTT)
+- (void)appendByte:(UInt8)byte;
+- (void)appendUInt16BigEndian:(UInt16)val;
+- (void)appendMQTTString:(NSString*)s;
+
+@end
diff --git a/src/MqttSDK/MQTTMessage.m b/src/MqttSDK/MQTTMessage.m
new file mode 100644
index 0000000..5871bdb
--- /dev/null
+++ b/src/MqttSDK/MQTTMessage.m
@@ -0,0 +1,268 @@
+//
+// MQTTMessage.m
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import "MQTTMessage.h"
+
+@implementation MQTTMessage
+
++ (id)connectMessageWithClientId:(NSString*)clientId
+ userName:(NSString*)userName
+ password:(NSString*)password
+ keepAlive:(NSInteger)keepAlive
+ cleanSession:(BOOL)cleanSessionFlag {
+ MQTTMessage* msg;
+ UInt8 flags = 0x00;
+
+ if (cleanSessionFlag) {
+ flags |= 0x02;
+ }
+ if ([userName length] > 0) {
+ flags |= 0x80;
+ if ([password length] > 0) {
+ flags |= 0x40;
+ }
+ }
+
+ NSMutableData* data = [NSMutableData data];
+ [data appendMQTTString:@"MQIsdp"];
+ [data appendByte:3];
+ [data appendByte:flags];
+ [data appendUInt16BigEndian:keepAlive];
+ [data appendMQTTString:clientId];
+ if ([userName length] > 0) {
+ [data appendMQTTString:userName];
+ if ([password length] > 0) {
+ [data appendMQTTString:password];
+ }
+ }
+
+ msg = [[MQTTMessage alloc] initWithType:MQTTConnect data:data];
+ return msg;
+}
+
++ (id)connectMessageWithClientId:(NSString*)clientId
+ userName:(NSString*)userName
+ password:(NSString*)password
+ keepAlive:(NSInteger)keepAlive
+ cleanSession:(BOOL)cleanSessionFlag
+ willTopic:(NSString*)willTopic
+ willMsg:(NSData*)willMsg
+ willQoS:(UInt8)willQoS
+ willRetain:(BOOL)willRetainFlag {
+ UInt8 flags = 0x04 | (willQoS << 4 & 0x18);
+
+ if (willRetainFlag) {
+ flags |= 0x20;
+ }
+ if (cleanSessionFlag) {
+ flags |= 0x02;
+ }
+ if ([userName length] > 0) {
+ flags |= 0x80;
+ if ([password length] > 0) {
+ flags |= 0x40;
+ }
+ }
+
+ NSMutableData* data = [NSMutableData data];
+ [data appendMQTTString:@"MQIsdp"];
+ [data appendByte:3];
+ [data appendByte:flags];
+ [data appendUInt16BigEndian:keepAlive];
+ [data appendMQTTString:clientId];
+ [data appendMQTTString:willTopic];
+ [data appendUInt16BigEndian:[willMsg length]];
+ [data appendData:willMsg];
+ if ([userName length] > 0) {
+ [data appendMQTTString:userName];
+ if ([password length] > 0) {
+ [data appendMQTTString:password];
+ }
+ }
+
+ MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTConnect
+ data:data];
+ return msg;
+}
+
++ (id)pingreqMessage {
+ return [[MQTTMessage alloc] initWithType:MQTTPingreq];
+}
+
++ (id)subscribeMessageWithMessageId:(UInt16)msgId
+ topic:(NSString*)topic
+ qos:(UInt8)qos {
+ NSMutableData* data = [NSMutableData data];
+ [data appendUInt16BigEndian:msgId];
+ [data appendMQTTString:topic];
+ [data appendByte:qos];
+ MQTTMessage* msg = [[MQTTMessage alloc] initWithType:MQTTSubscribe
+ qos:1
+ data:data];
+ return msg;
+}
+
++ (id)unsubscribeMessageWithMessageId:(UInt16)msgId
+ topic:(NSString*)topic {
+ NSMutableData* data = [NSMutableData data];
+ [data appendUInt16BigEndian:msgId];
+ [data appendMQTTString:topic];
+ MQTTMessage* msg = [[MQTTMessage alloc] initWithType:MQTTUnsubscribe
+ qos:1
+ data:data];
+ return msg;
+}
+
++ (id)publishMessageWithData:(NSData*)payload
+ onTopic:(NSString*)topic
+ retainFlag:(BOOL)retain {
+ NSMutableData* data = [NSMutableData data];
+ [data appendMQTTString:topic];
+ [data appendData:payload];
+ MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTPublish
+ qos:0
+ retainFlag:retain
+ dupFlag:false
+ data:data];
+ return msg;
+}
+
++ (id)publishMessageWithData:(NSData*)payload
+ onTopic:(NSString*)topic
+ qos:(UInt8)qosLevel
+ msgId:(UInt16)msgId
+ retainFlag:(BOOL)retain
+ dupFlag:(BOOL)dup {
+ NSMutableData* data = [NSMutableData data];
+ [data appendMQTTString:topic];
+ [data appendUInt16BigEndian:msgId];
+ [data appendData:payload];
+ MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTPublish
+ qos:qosLevel
+ retainFlag:retain
+ dupFlag:dup
+ data:data];
+ return msg;
+}
+
++ (id)pubackMessageWithMessageId:(UInt16)msgId {
+ NSMutableData* data = [NSMutableData data];
+ [data appendUInt16BigEndian:msgId];
+ return [[MQTTMessage alloc] initWithType:MQTTPuback
+ data:data];
+}
+
++ (id)pubrecMessageWithMessageId:(UInt16)msgId {
+ NSMutableData* data = [NSMutableData data];
+ [data appendUInt16BigEndian:msgId];
+ return [[MQTTMessage alloc] initWithType:MQTTPubrec
+ data:data];
+}
+
++ (id)pubrelMessageWithMessageId:(UInt16)msgId {
+ NSMutableData* data = [NSMutableData data];
+ [data appendUInt16BigEndian:msgId];
+ return [[MQTTMessage alloc] initWithType:MQTTPubrel
+ data:data];
+}
+
++ (id)pubcompMessageWithMessageId:(UInt16)msgId {
+ NSMutableData* data = [NSMutableData data];
+ [data appendUInt16BigEndian:msgId];
+ return [[MQTTMessage alloc] initWithType:MQTTPubcomp
+ data:data];
+}
+
+- (id)initWithType:(UInt8)aType {
+ type = aType;
+ self.data = nil;
+ return self;
+}
+
+- (id)initWithType:(UInt8)aType data:(NSData*)aData {
+ type = aType;
+ self.data = aData;
+ return self;
+}
+
+- (id)initWithType:(UInt8)aType
+ qos:(UInt8)aQos
+ data:(NSData*)aData {
+ type = aType;
+ qos = aQos;
+ self.data = aData;
+ return self;
+}
+
+- (id)initWithType:(UInt8)aType
+ qos:(UInt8)aQos
+ retainFlag:(BOOL)aRetainFlag
+ dupFlag:(BOOL)aDupFlag
+ data:(NSData*)aData {
+ type = aType;
+ qos = aQos;
+ retainFlag = aRetainFlag;
+ dupFlag = aDupFlag;
+ self.data = aData;
+ return self;
+}
+
+- (void)setDupFlag {
+ dupFlag = true;
+}
+
+- (UInt8)type {
+ return type;
+}
+
+- (UInt8)qos {
+ return qos;
+}
+
+- (BOOL)retainFlag {
+ return retainFlag;
+}
+
+- (BOOL)isDuplicate {
+ return dupFlag;
+}
+
+
+@end
+
+@implementation NSMutableData (MQTT)
+
+- (void)appendByte:(UInt8)byte {
+ [self appendBytes:&byte length:1];
+}
+
+- (void)appendUInt16BigEndian:(UInt16)val {
+ [self appendByte:val / 256];
+ [self appendByte:val % 256];
+}
+
+- (void)appendMQTTString:(NSString*)string {
+ UInt8 buf[2];
+ const char* utf8String = [string UTF8String];
+ int strLen = strlen(utf8String);
+ buf[0] = strLen / 256;
+ buf[1] = strLen % 256;
+ [self appendBytes:buf length:2];
+ [self appendBytes:utf8String length:strLen];
+}
+
+@end
diff --git a/src/MqttSDK/MQTTSession.h b/src/MqttSDK/MQTTSession.h
new file mode 100644
index 0000000..bb01e03
--- /dev/null
+++ b/src/MqttSDK/MQTTSession.h
@@ -0,0 +1,148 @@
+//
+// MQTTSession.h
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import <Foundation/Foundation.h>
+#import "MQTTDecoder.h"
+#import "MQTTEncoder.h"
+
+typedef enum {
+ MQTTSessionStatusCreated,
+ MQTTSessionStatusConnecting,
+ MQTTSessionStatusConnected,
+ MQTTSessionStatusError
+} MQTTSessionStatus;
+
+typedef enum {
+ MQTTSessionEventConnected,
+ MQTTSessionEventConnectionRefused,
+ MQTTSessionEventConnectionClosed,
+ MQTTSessionEventConnectionError,
+ MQTTSessionEventProtocolError
+} MQTTSessionEvent;
+
+@interface MQTTSession : NSObject {
+ MQTTSessionStatus status;
+ NSString* clientId;
+ //NSString* userName;
+ //NSString* password;
+ UInt16 keepAliveInterval;
+ BOOL cleanSessionFlag;
+ MQTTMessage* connectMessage;
+ NSRunLoop* runLoop;
+ NSString* runLoopMode;
+ NSTimer* timer;
+ NSInteger idleTimer;
+ MQTTEncoder* encoder;
+ MQTTDecoder* decoder;
+ UInt16 txMsgId;
+ id delegate;
+ NSMutableDictionary* txFlows;
+ NSMutableDictionary* rxFlows;
+ unsigned int ticks;
+}
+
+- (id)initWithClientId:(NSString*)theClientId;
+- (id)initWithClientId:(NSString*)theClientId runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode;
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUsername
+ password:(NSString*)thePassword;
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode;
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUsername
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)cleanSessionFlag;
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUsername
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAlive
+ cleanSession:(BOOL)theCleanSessionFlag
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theMode;
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)theCleanSessionFlag
+ willTopic:(NSString*)willTopic
+ willMsg:(NSData*)willMsg
+ willQoS:(UInt8)willQoS
+ willRetainFlag:(BOOL)willRetainFlag;
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)theCleanSessionFlag
+ willTopic:(NSString*)willTopic
+ willMsg:(NSData*)willMsg
+ willQoS:(UInt8)willQoS
+ willRetainFlag:(BOOL)willRetainFlag
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode;
+- (id)initWithClientId:(NSString*)theClientId
+ keepAlive:(UInt16)theKeepAliveInterval
+ connectMessage:(MQTTMessage*)theConnectMessage
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode;
+
+- (void)dealloc;
+- (void)close;
+- (void)setDelegate:aDelegate;
+- (void)connectToHost:(NSString*)ip port:(UInt32)port;
+- (void)connectToHost:(NSString*)ip port:(UInt32)port usingSSL:(BOOL)usingSSL;
+- (void)subscribeTopic:(NSString*)theTopic;
+- (void)subscribeToTopic:(NSString*)topic atLevel:(UInt8)qosLevel;
+- (void)unsubscribeTopic:(NSString*)theTopic;
+- (void)publishData:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataAtLeastOnce:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataAtLeastOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag;
+- (void)publishDataAtMostOnce:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataAtMostOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag;
+- (void)publishDataExactlyOnce:(NSData*)theData onTopic:(NSString*)theTopic;
+- (void)publishDataExactlyOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag;
+- (void)publishJson:(id)payload onTopic:(NSString*)theTopic;
+- (void)timerHandler:(NSTimer*)theTimer;
+- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode;
+- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent) eventCode;
+- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*) msg;
+
+// private methods
+- (void)newMessage:(MQTTMessage*)msg;
+- (void)error:(MQTTSessionEvent)event;
+- (void)handlePublish:(MQTTMessage*)msg;
+- (void)handlePuback:(MQTTMessage*)msg;
+- (void)handlePubrec:(MQTTMessage*)msg;
+- (void)handlePubrel:(MQTTMessage*)msg;
+- (void)handlePubcomp:(MQTTMessage*)msg;
+- (void)send:(MQTTMessage*)msg;
+- (UInt16)nextMsgId;
+
+@property (strong,atomic) NSMutableArray* queue;
+@property (strong,atomic) NSMutableArray* timerRing;
+
+@end
+
+@interface NSObject (MQTTSessionDelegate)
+- (void)session:(MQTTSession*)session handleEvent:(MQTTSessionEvent)eventCode;
+- (void)session:(MQTTSession*)session newMessage:(NSData*)data onTopic:(NSString*)topic;
+
+@end
diff --git a/src/MqttSDK/MQTTSession.m b/src/MqttSDK/MQTTSession.m
new file mode 100644
index 0000000..9dd1fc2
--- /dev/null
+++ b/src/MqttSDK/MQTTSession.m
@@ -0,0 +1,630 @@
+//
+// MQTTSession.m
+// MQtt Client
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import "MQTTSession.h"
+#import "MQttTxFlow.h"
+
+@implementation MQTTSession
+
+- (id)initWithClientId:(NSString*)theClientId {
+ return [self initWithClientId:theClientId userName:@"" password:@""];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword {
+ return [self initWithClientId:theClientId
+ userName:theUserName
+ password:thePassword
+ keepAlive:60
+ cleanSession:YES];
+}
+
+- (id)initWithClientId:(NSString*)theClientId runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode {
+ return [self initWithClientId:theClientId userName:@"" password:@"" runLoop:theRunLoop forMode:theRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode {
+ return [self initWithClientId:theClientId
+ userName:theUserName
+ password:thePassword
+ keepAlive:60
+ cleanSession:YES
+ runLoop:theRunLoop
+ forMode:theRunLoopMode];
+}
+
+
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)theCleanSessionFlag {
+ return [self initWithClientId:theClientId
+ userName:theUserName
+ password:thePassword
+ keepAlive:theKeepAliveInterval
+ cleanSession:theCleanSessionFlag
+ runLoop:[NSRunLoop currentRunLoop]
+ forMode:NSDefaultRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)theCleanSessionFlag
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode {
+ MQTTMessage *msg = [MQTTMessage connectMessageWithClientId:theClientId
+ userName:theUserName
+ password:thePassword
+ keepAlive:theKeepAliveInterval
+ cleanSession:theCleanSessionFlag];
+ return [self initWithClientId:theClientId
+ keepAlive:theKeepAliveInterval
+ connectMessage:msg
+ runLoop:theRunLoop
+ forMode:theRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)theCleanSessionFlag
+ willTopic:(NSString*)willTopic
+ willMsg:(NSData*)willMsg
+ willQoS:(UInt8)willQoS
+ willRetainFlag:(BOOL)willRetainFlag {
+ return [self initWithClientId:theClientId
+ userName:theUserName
+ password:thePassword
+ keepAlive:theKeepAliveInterval
+ cleanSession:theCleanSessionFlag
+ willTopic:willTopic
+ willMsg:willMsg
+ willQoS:willQoS
+ willRetainFlag:willRetainFlag
+ runLoop:[NSRunLoop currentRunLoop]
+ forMode:NSDefaultRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+ userName:(NSString*)theUserName
+ password:(NSString*)thePassword
+ keepAlive:(UInt16)theKeepAliveInterval
+ cleanSession:(BOOL)theCleanSessionFlag
+ willTopic:(NSString*)willTopic
+ willMsg:(NSData*)willMsg
+ willQoS:(UInt8)willQoS
+ willRetainFlag:(BOOL)willRetainFlag
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode {
+ MQTTMessage *msg = [MQTTMessage connectMessageWithClientId:theClientId
+ userName:theUserName
+ password:thePassword
+ keepAlive:theKeepAliveInterval
+ cleanSession:theCleanSessionFlag
+ willTopic:willTopic
+ willMsg:willMsg
+ willQoS:willQoS
+ willRetain:willRetainFlag];
+ return [self initWithClientId:theClientId
+ keepAlive:theKeepAliveInterval
+ connectMessage:msg
+ runLoop:theRunLoop
+ forMode:theRunLoopMode];
+}
+
+- (id)initWithClientId:(NSString*)theClientId
+ keepAlive:(UInt16)theKeepAliveInterval
+ connectMessage:(MQTTMessage*)theConnectMessage
+ runLoop:(NSRunLoop*)theRunLoop
+ forMode:(NSString*)theRunLoopMode {
+ clientId = theClientId;
+ keepAliveInterval = theKeepAliveInterval;
+ connectMessage = theConnectMessage;
+ runLoop = theRunLoop;
+ runLoopMode = theRunLoopMode;
+
+ self.queue = [NSMutableArray array];
+ txMsgId = 1;
+ txFlows = [[NSMutableDictionary alloc] init];
+ rxFlows = [[NSMutableDictionary alloc] init];
+ self.timerRing = [[NSMutableArray alloc] initWithCapacity:60];
+ int i;
+ for (i = 0; i < 60; i++) {
+ [self.timerRing addObject:[NSMutableSet set]];
+ }
+ ticks = 0;
+
+ return self;
+}
+
+- (void)dealloc {
+ [encoder close];
+ encoder = nil;
+ [decoder close];
+ decoder = nil;
+ if (timer != nil) {
+ [timer invalidate];
+ timer = nil;
+ }
+}
+
+- (void)close {
+ [encoder close];
+ [decoder close];
+ encoder = nil;
+ decoder = nil;
+ if (timer != nil) {
+ [timer invalidate];
+ timer = nil;
+ }
+ [self error:MQTTSessionEventConnectionClosed];
+}
+
+- (void)setDelegate:(id)aDelegate {
+ delegate = aDelegate;
+}
+
+- (void)connectToHost:(NSString*)ip port:(UInt32)port {
+ [self connectToHost:ip port:port usingSSL:false];
+}
+
+- (void)connectToHost:(NSString*)ip port:(UInt32)port usingSSL:(BOOL)usingSSL {
+
+ status = MQTTSessionStatusCreated;
+
+ CFReadStreamRef readStream;
+ CFWriteStreamRef writeStream;
+
+ CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)ip, port, &readStream, &writeStream);
+
+ if (usingSSL) {
+ const void *keys[] = { kCFStreamSSLLevel,
+ kCFStreamSSLPeerName };
+
+ const void *vals[] = { kCFStreamSocketSecurityLevelNegotiatedSSL,
+ kCFNull };
+
+ CFDictionaryRef sslSettings = CFDictionaryCreate(kCFAllocatorDefault, keys, vals, 2,
+ &kCFTypeDictionaryKeyCallBacks,
+ &kCFTypeDictionaryValueCallBacks);
+
+ CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, sslSettings);
+ CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, sslSettings);
+
+ CFRelease(sslSettings);
+ }
+
+ encoder = [[MQTTEncoder alloc] initWithStream:(__bridge NSOutputStream*)writeStream
+ runLoop:runLoop
+ runLoopMode:runLoopMode];
+
+ decoder = [[MQTTDecoder alloc] initWithStream:(__bridge NSInputStream*)readStream
+ runLoop:runLoop
+ runLoopMode:runLoopMode];
+
+ [encoder setDelegate:self];
+ [decoder setDelegate:self];
+
+ [encoder open];
+ [decoder open];
+}
+
+- (void)subscribeTopic:(NSString*)theTopic {
+ [self subscribeToTopic:theTopic atLevel:0];
+}
+
+- (void) subscribeToTopic:(NSString*)topic
+ atLevel:(UInt8)qosLevel {
+ [self send:[MQTTMessage subscribeMessageWithMessageId:[self nextMsgId]
+ topic:topic
+ qos:qosLevel]];
+}
+
+- (void)unsubscribeTopic:(NSString*)theTopic {
+ [self send:[MQTTMessage unsubscribeMessageWithMessageId:[self nextMsgId]
+ topic:theTopic]];
+}
+
+- (void)publishData:(NSData*)data onTopic:(NSString*)topic {
+ [self publishDataAtMostOnce:data onTopic:topic];
+}
+
+- (void)publishDataAtMostOnce:(NSData*)data
+ onTopic:(NSString*)topic {
+ [self publishDataAtMostOnce:data onTopic:topic retain:false];
+}
+
+- (void)publishDataAtMostOnce:(NSData*)data
+ onTopic:(NSString*)topic
+ retain:(BOOL)retainFlag {
+ [self send:[MQTTMessage publishMessageWithData:data
+ onTopic:topic
+ retainFlag:retainFlag]];
+}
+
+- (void)publishDataAtLeastOnce:(NSData*)data
+ onTopic:(NSString*)topic {
+ [self publishDataAtLeastOnce:data onTopic:topic retain:false];
+}
+
+- (void)publishDataAtLeastOnce:(NSData*)data
+ onTopic:(NSString*)topic
+ retain:(BOOL)retainFlag {
+ UInt16 msgId = [self nextMsgId];
+ MQTTMessage *msg = [MQTTMessage publishMessageWithData:data
+ onTopic:topic
+ qos:1
+ msgId:msgId
+ retainFlag:retainFlag
+ dupFlag:false];
+ MQttTxFlow *flow = [MQttTxFlow flowWithMsg:msg
+ deadline:(ticks + 60)];
+ [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
+ [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
+ [self send:msg];
+}
+
+- (void)publishDataExactlyOnce:(NSData*)data
+ onTopic:(NSString*)topic {
+ [self publishDataExactlyOnce:data onTopic:topic retain:false];
+}
+
+- (void)publishDataExactlyOnce:(NSData*)data
+ onTopic:(NSString*)topic
+ retain:(BOOL)retainFlag {
+ UInt16 msgId = [self nextMsgId];
+ MQTTMessage *msg = [MQTTMessage publishMessageWithData:data
+ onTopic:topic
+ qos:2
+ msgId:msgId
+ retainFlag:retainFlag
+ dupFlag:false];
+ MQttTxFlow *flow = [MQttTxFlow flowWithMsg:msg
+ deadline:(ticks + 60)];
+ [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
+ [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
+ [self send:msg];
+}
+
+- (void)publishJson:(id)payload onTopic:(NSString*)theTopic {
+ NSError * error = nil;
+ NSData * data = [NSJSONSerialization dataWithJSONObject:payload options:0 error:&error];
+ if(error!=nil){
+ //NSLog(@"Error creating JSON: %@",error.description);
+ return;
+ }
+ [self publishData:data onTopic:theTopic];
+}
+
+- (void)timerHandler:(NSTimer*)theTimer {
+ idleTimer++;
+ if (idleTimer >= keepAliveInterval) {
+ if ([encoder status] == MQTTEncoderStatusReady) {
+ //NSLog(@"sending PINGREQ");
+ [encoder encodeMessage:[MQTTMessage pingreqMessage]];
+ idleTimer = 0;
+ }
+ }
+ ticks++;
+ NSEnumerator *e = [[self.timerRing objectAtIndex:(ticks % 60)] objectEnumerator];
+ id msgId;
+
+ while ((msgId = [e nextObject])) {
+ MQttTxFlow *flow = [txFlows objectForKey:msgId];
+ MQTTMessage *msg = [flow msg];
+ [flow setDeadline:(ticks + 60)];
+ [msg setDupFlag];
+ [self send:msg];
+ }
+}
+
+- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode {
+ // NSLog(@"encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode ");
+ if(sender == encoder) {
+ switch (eventCode) {
+ case MQTTEncoderEventReady:
+ switch (status) {
+ case MQTTSessionStatusCreated:
+ //NSLog(@"Encoder has been created. Sending Auth Message");
+ [sender encodeMessage:connectMessage];
+ status = MQTTSessionStatusConnecting;
+ break;
+ case MQTTSessionStatusConnecting:
+ break;
+ case MQTTSessionStatusConnected:
+ if ([self.queue count] > 0) {
+ MQTTMessage *msg = [self.queue objectAtIndex:0];
+ [self.queue removeObjectAtIndex:0];
+ [encoder encodeMessage:msg];
+ }
+ break;
+ case MQTTSessionStatusError:
+ break;
+ }
+ break;
+ case MQTTEncoderEventErrorOccurred:
+ [self error:MQTTSessionEventConnectionError];
+ break;
+ }
+ }
+}
+
+- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode {
+ //NSLog(@"decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode");
+ if(sender == decoder) {
+ MQTTSessionEvent event;
+ switch (eventCode) {
+ case MQTTDecoderEventConnectionClosed:
+ event = MQTTSessionEventConnectionError;
+ break;
+ case MQTTDecoderEventConnectionError:
+ event = MQTTSessionEventConnectionError;
+ break;
+ case MQTTDecoderEventProtocolError:
+ event = MQTTSessionEventProtocolError;
+ break;
+ }
+ [self error:event];
+ }
+}
+
+- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg {
+ //NSLog(@"decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg ");
+ if(sender == decoder){
+ switch (status) {
+ case MQTTSessionStatusConnecting:
+ switch ([msg type]) {
+ case MQTTConnack:
+ if ([[msg data] length] != 2) {
+ [self error:MQTTSessionEventProtocolError];
+ }
+ else {
+ const UInt8 *bytes = [[msg data] bytes];
+ if (bytes[1] == 0) {
+ status = MQTTSessionStatusConnected;
+ timer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:1.0]
+ interval:1.0
+ target:self
+ selector:@selector(timerHandler:)
+ userInfo:nil
+ repeats:YES];
+ if ([delegate respondsToSelector:@selector(session:handleEvent:)]) {
+ [delegate session:self handleEvent:MQTTSessionEventConnected];
+ }
+ [runLoop addTimer:timer forMode:runLoopMode];
+ }
+ else {
+ [self error:MQTTSessionEventConnectionRefused];
+ }
+ }
+ break;
+ default:
+ [self error:MQTTSessionEventProtocolError];
+ break;
+ }
+ break;
+ case MQTTSessionStatusConnected:
+ [self newMessage:msg];
+ break;
+ default:
+ break;
+ }
+ }
+}
+
+- (void)newMessage:(MQTTMessage*)msg {
+ switch ([msg type]) {
+ case MQTTPublish:
+ [self handlePublish:msg];
+ break;
+ case MQTTPuback:
+ [self handlePuback:msg];
+ break;
+ case MQTTPubrec:
+ [self handlePubrec:msg];
+ break;
+ case MQTTPubrel:
+ [self handlePubrel:msg];
+ break;
+ case MQTTPubcomp:
+ [self handlePubcomp:msg];
+ break;
+ default:
+ return;
+ }
+}
+
+- (void)handlePublish:(MQTTMessage*)msg {
+ if (![delegate respondsToSelector:@selector(session:newMessage:onTopic:)]) {
+ return;
+ }
+ NSData *data = [msg data];
+ if ([data length] < 2) {
+ return;
+ }
+ UInt8 const *bytes = [data bytes];
+ UInt16 topicLength = 256 * bytes[0] + bytes[1];
+ if ([data length] < 2 + topicLength) {
+ return;
+ }
+ NSData *topicData = [data subdataWithRange:NSMakeRange(2, topicLength)];
+ NSString *topic = [[NSString alloc] initWithData:topicData
+ encoding:NSUTF8StringEncoding];
+ NSRange range = NSMakeRange(2 + topicLength, [data length] - topicLength - 2);
+ data = [data subdataWithRange:range];
+ if ([msg qos] == 0) {
+ [delegate session:self newMessage:data onTopic:topic];
+ }
+ else {
+ if ([data length] < 2) {
+ return;
+ }
+ bytes = [data bytes];
+ UInt16 msgId = 256 * bytes[0] + bytes[1];
+ if (msgId == 0) {
+ return;
+ }
+ data = [data subdataWithRange:NSMakeRange(2, [data length] - 2)];
+ if ([msg qos] == 1) {
+ [delegate session:self newMessage:data onTopic:topic];
+ [self send:[MQTTMessage pubackMessageWithMessageId:msgId]];
+ }
+ else {
+ NSDictionary *dict = [NSDictionary dictionaryWithObjectsAndKeys:
+ data, @"data", topic, @"topic", nil];
+ [rxFlows setObject:dict forKey:[NSNumber numberWithUnsignedInt:msgId]];
+ [self send:[MQTTMessage pubrecMessageWithMessageId:msgId]];
+ }
+ }
+}
+
+- (void)handlePuback:(MQTTMessage*)msg {
+ if ([[msg data] length] != 2) {
+ return;
+ }
+ UInt8 const *bytes = [[msg data] bytes];
+ NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+ if ([msgId unsignedIntValue] == 0) {
+ return;
+ }
+ MQttTxFlow *flow = [txFlows objectForKey:msgId];
+ if (flow == nil) {
+ return;
+ }
+
+ if ([[flow msg] type] != MQTTPublish || [[flow msg] qos] != 1) {
+ return;
+ }
+
+ [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
+ [txFlows removeObjectForKey:msgId];
+}
+
+- (void)handlePubrec:(MQTTMessage*)msg {
+ if ([[msg data] length] != 2) {
+ return;
+ }
+ UInt8 const *bytes = [[msg data] bytes];
+ NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+ if ([msgId unsignedIntValue] == 0) {
+ return;
+ }
+ MQttTxFlow *flow = [txFlows objectForKey:msgId];
+ if (flow == nil) {
+ return;
+ }
+ msg = [flow msg];
+ if ([msg type] != MQTTPublish || [msg qos] != 2) {
+ return;
+ }
+ msg = [MQTTMessage pubrelMessageWithMessageId:[msgId unsignedIntValue]];
+ [flow setMsg:msg];
+ [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
+ [flow setDeadline:(ticks + 60)];
+ [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:msgId];
+
+ [self send:msg];
+}
+
+- (void)handlePubrel:(MQTTMessage*)msg {
+ if ([[msg data] length] != 2) {
+ return;
+ }
+ UInt8 const *bytes = [[msg data] bytes];
+ NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+ if ([msgId unsignedIntValue] == 0) {
+ return;
+ }
+ NSDictionary *dict = [rxFlows objectForKey:msgId];
+ if (dict != nil) {
+ [delegate session:self
+ newMessage:[dict valueForKey:@"data"]
+ onTopic:[dict valueForKey:@"topic"]];
+ [rxFlows removeObjectForKey:msgId];
+ }
+ [self send:[MQTTMessage pubcompMessageWithMessageId:[msgId unsignedIntegerValue]]];
+}
+
+- (void)handlePubcomp:(MQTTMessage*)msg {
+ if ([[msg data] length] != 2) {
+ return;
+ }
+ UInt8 const *bytes = [[msg data] bytes];
+ NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])];
+ if ([msgId unsignedIntValue] == 0) {
+ return;
+ }
+ MQttTxFlow *flow = [txFlows objectForKey:msgId];
+ if (flow == nil || [[flow msg] type] != MQTTPubrel) {
+ return;
+ }
+
+ [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
+ [txFlows removeObjectForKey:msgId];
+}
+
+- (void)error:(MQTTSessionEvent)eventCode {
+
+ [encoder close];
+ encoder = nil;
+
+ [decoder close];
+ decoder = nil;
+
+ if (timer != nil) {
+ [timer invalidate];
+
+ timer = nil;
+ }
+ status = MQTTSessionStatusError;
+
+ usleep(1000000); // 1 sec delay
+
+ if ([delegate respondsToSelector:@selector(session:handleEvent:)]) {
+ [delegate session:self handleEvent:eventCode];
+ }
+
+}
+
+- (void)send:(MQTTMessage*)msg {
+ if ([encoder status] == MQTTEncoderStatusReady) {
+ [encoder encodeMessage:msg];
+ }
+ else {
+ [self.queue addObject:msg];
+ }
+}
+
+- (UInt16)nextMsgId {
+ txMsgId++;
+ while (txMsgId == 0 || [txFlows objectForKey:[NSNumber numberWithUnsignedInt:txMsgId]] != nil) {
+ txMsgId++;
+ }
+ return txMsgId;
+}
+
+@end
diff --git a/src/MqttSDK/MQttTxFlow.h b/src/MqttSDK/MQttTxFlow.h
new file mode 100644
index 0000000..59d9986
--- /dev/null
+++ b/src/MqttSDK/MQttTxFlow.h
@@ -0,0 +1,36 @@
+//
+// MQttTxFlow.h
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import <Foundation/Foundation.h>
+#import "MQTTMessage.h"
+
+@interface MQttTxFlow : NSObject {
+ MQTTMessage * msg;
+ unsigned int deadline;
+}
+
++ (id)flowWithMsg:(MQTTMessage*)aMsg
+ deadline:(unsigned int)aDeadline;
+
+- (id)initWithMsg:(MQTTMessage*)aMsg deadline:(unsigned int)aDeadline;
+
+- (void)setMsg:(MQTTMessage*)aMsg;
+- (void)setDeadline:(unsigned int)newValue;
+- (MQTTMessage*)msg;
+- (unsigned int)deadline;
+
+@end
+
diff --git a/src/MqttSDK/MQttTxFlow.m b/src/MqttSDK/MQttTxFlow.m
new file mode 100644
index 0000000..80604f6
--- /dev/null
+++ b/src/MqttSDK/MQttTxFlow.m
@@ -0,0 +1,50 @@
+//
+// MQtttTxFlow.m
+//
+// Copyright (c) 2011, 2013, 2lemetry LLC
+//
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Eclipse Distribution License v. 1.0 which accompanies this distribution.
+// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
+// and the Eclipse Distribution License is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// Contributors:
+// Kyle Roche - initial API and implementation and/or initial documentation
+//
+
+#import "MQttTxFlow.h"
+
+@implementation MQttTxFlow
+
++ (id)flowWithMsg:(MQTTMessage*)msg
+ deadline:(unsigned int)deadline {
+ return [[MQttTxFlow alloc] initWithMsg:msg deadline:deadline];
+}
+
+- (id)initWithMsg:(MQTTMessage*)aMsg
+ deadline:(unsigned int)aDeadline {
+ msg = aMsg;
+ deadline = aDeadline;
+ return self;
+}
+
+- (void)setMsg:(MQTTMessage*)aMsg {
+ msg = aMsg;
+}
+
+- (void)setDeadline:(unsigned int)newDeadline {
+ deadline = newDeadline;
+}
+
+- (MQTTMessage*)msg {
+ return msg;
+}
+
+- (unsigned int) deadline {
+ return deadline;
+}
+
+
+@end

Back to the top