diff options
author | Andy Piper | 2014-02-13 01:16:09 +0000 |
---|---|---|
committer | Andy Piper | 2014-02-13 01:16:09 +0000 |
commit | ad94df19379cd0ef86f800d4d3225324ec7e8de8 (patch) | |
tree | 29ba6a118eef084c7bc4ea17f915f8b51125c9bd | |
download | org.eclipse.paho.mqtt.objc-ad94df19379cd0ef86f800d4d3225324ec7e8de8.tar.gz org.eclipse.paho.mqtt.objc-ad94df19379cd0ef86f800d4d3225324ec7e8de8.tar.xz org.eclipse.paho.mqtt.objc-ad94df19379cd0ef86f800d4d3225324ec7e8de8.zip |
initial import
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | src/MqttSDK/MQTTDecoder.h | 62 | ||||
-rw-r--r-- | src/MqttSDK/MQTTDecoder.m | 147 | ||||
-rw-r--r-- | src/MqttSDK/MQTTEncoder.h | 59 | ||||
-rw-r--r-- | src/MqttSDK/MQTTEncoder.m | 161 | ||||
-rw-r--r-- | src/MqttSDK/MQTTMessage.h | 104 | ||||
-rw-r--r-- | src/MqttSDK/MQTTMessage.m | 268 | ||||
-rw-r--r-- | src/MqttSDK/MQTTSession.h | 148 | ||||
-rw-r--r-- | src/MqttSDK/MQTTSession.m | 630 | ||||
-rw-r--r-- | src/MqttSDK/MQttTxFlow.h | 36 | ||||
-rw-r--r-- | src/MqttSDK/MQttTxFlow.m | 50 |
11 files changed, 1688 insertions, 0 deletions
diff --git a/README.md b/README.md new file mode 100644 index 0000000..b0a098b --- /dev/null +++ b/README.md @@ -0,0 +1,23 @@ +mqttIO-iOS-SDK +============== + +iOS SDK for MQTT + +Copyright © 2011, 2013 2lemetry, LLC + +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided that +the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. +Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/src/MqttSDK/MQTTDecoder.h b/src/MqttSDK/MQTTDecoder.h new file mode 100644 index 0000000..aaeb232 --- /dev/null +++ b/src/MqttSDK/MQTTDecoder.h @@ -0,0 +1,62 @@ +// +// MQTTDecoder.h +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import <Foundation/Foundation.h> +#import "MQTTMessage.h" + +@interface MQTTDecoder : NSObject <NSStreamDelegate> { + NSInteger status; + NSInputStream* stream; + NSRunLoop* runLoop; + NSString* runLoopMode; + id delegate; + UInt8 header; + UInt32 length; + UInt32 lengthMultiplier; + NSMutableData* dataBuffer; +} + +typedef enum { + MQTTDecoderEventProtocolError, + MQTTDecoderEventConnectionClosed, + MQTTDecoderEventConnectionError +} MQTTDecoderEvent; + +enum { + MQTTDecoderStatusInitializing, + MQTTDecoderStatusDecodingHeader, + MQTTDecoderStatusDecodingLength, + MQTTDecoderStatusDecodingData, + MQTTDecoderStatusConnectionClosed, + MQTTDecoderStatusConnectionError, + MQTTDecoderStatusProtocolError +}; + +- (id)initWithStream:(NSInputStream*)aStream + runLoop:(NSRunLoop*)aRunLoop + runLoopMode:(NSString*)aMode; +- (void)setDelegate:(id)aDelegate; +- (void)open; +- (void)close; +- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode; +@end + +@interface NSObject (MQTTDecoderDelegate) +- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg; +- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode; + +@end diff --git a/src/MqttSDK/MQTTDecoder.m b/src/MqttSDK/MQTTDecoder.m new file mode 100644 index 0000000..ea12f57 --- /dev/null +++ b/src/MqttSDK/MQTTDecoder.m @@ -0,0 +1,147 @@ +// +// MQTTDecoder.m +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import "MQTTDecoder.h" + +@implementation MQTTDecoder + +- (id)initWithStream:(NSInputStream*)aStream + runLoop:(NSRunLoop*)aRunLoop + runLoopMode:(NSString*)aMode { + status = MQTTDecoderStatusInitializing; + stream = aStream; + [stream setDelegate:self]; + runLoop = aRunLoop; + runLoopMode = aMode; + return self; +} + +- (void)setDelegate:(id)aDelegate { + delegate = aDelegate; +} + +- (void)open { + [stream setDelegate:self]; + [stream scheduleInRunLoop:runLoop forMode:runLoopMode]; + [stream open]; +} + +- (void)close { + [stream setDelegate:nil]; + [stream close]; + [stream removeFromRunLoop:runLoop forMode:runLoopMode]; + stream = nil; +} + +- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode { + if(stream == nil) + return; + switch (eventCode) { + case NSStreamEventOpenCompleted: + status = MQTTDecoderStatusDecodingHeader; + break; + case NSStreamEventHasBytesAvailable: + if (status == MQTTDecoderStatusDecodingHeader) { + NSInteger n = [stream read:&header maxLength:1]; + if (n == -1) { + status = MQTTDecoderStatusConnectionError; + [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError]; + } + else if (n == 1) { + length = 0; + lengthMultiplier = 1; + status = MQTTDecoderStatusDecodingLength; + } + } + while (status == MQTTDecoderStatusDecodingLength) { + UInt8 digit; + NSInteger n = [stream read:&digit maxLength:1]; + if (n == -1) { + status = MQTTDecoderStatusConnectionError; + [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError]; + break; + } + else if (n == 0) { + break; + } + length += (digit & 0x7f) * lengthMultiplier; + if ((digit & 0x80) == 0x00) { + dataBuffer = [NSMutableData dataWithCapacity:length]; + status = MQTTDecoderStatusDecodingData; + } + else { + lengthMultiplier *= 128; + } + } + if (status == MQTTDecoderStatusDecodingData) { + if (length > 0) { + NSInteger n, toRead; + UInt8 buffer[768]; + toRead = length - [dataBuffer length]; + if (toRead > sizeof buffer) { + toRead = sizeof buffer; + } + n = [stream read:buffer maxLength:toRead]; + if (n == -1) { + status = MQTTDecoderStatusConnectionError; + [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError]; + } + else { + [dataBuffer appendBytes:buffer length:n]; + } + } + if ([dataBuffer length] == length) { + MQTTMessage* msg; + UInt8 type, qos; + BOOL isDuplicate, retainFlag; + type = (header >> 4) & 0x0f; + isDuplicate = NO; + if ((header & 0x08) == 0x08) { + isDuplicate = YES; + } + // XXX qos > 2 + qos = (header >> 1) & 0x03; + retainFlag = NO; + if ((header & 0x01) == 0x01) { + retainFlag = YES; + } + msg = [[MQTTMessage alloc] initWithType:type + qos:qos + retainFlag:retainFlag + dupFlag:isDuplicate + data:dataBuffer]; + [delegate decoder:self newMessage:msg]; + dataBuffer = NULL; + status = MQTTDecoderStatusDecodingHeader; + } + } + break; + case NSStreamEventEndEncountered: + status = MQTTDecoderStatusConnectionClosed; + [delegate decoder:self handleEvent:MQTTDecoderEventConnectionClosed]; + break; + case NSStreamEventErrorOccurred: + status = MQTTDecoderStatusConnectionError; + [delegate decoder:self handleEvent:MQTTDecoderEventConnectionError]; + break; + default: + NSLog(@"unhandled event code"); + break; + } +} + +@end diff --git a/src/MqttSDK/MQTTEncoder.h b/src/MqttSDK/MQTTEncoder.h new file mode 100644 index 0000000..e063fbd --- /dev/null +++ b/src/MqttSDK/MQTTEncoder.h @@ -0,0 +1,59 @@ +// +// MQTTEncoder.h +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import <Foundation/Foundation.h> +#import "MQTTMessage.h" + +@interface MQTTEncoder : NSObject <NSStreamDelegate> { + NSInteger status; + NSOutputStream* stream; + NSRunLoop* runLoop; + NSString* runLoopMode; + NSMutableData* buffer; + NSInteger byteIndex; + id delegate; +} + +typedef enum { + MQTTEncoderEventReady, + MQTTEncoderEventErrorOccurred +} MQTTEncoderEvent; + +typedef enum { + MQTTEncoderStatusInitializing, + MQTTEncoderStatusReady, + MQTTEncoderStatusSending, + MQTTEncoderStatusEndEncountered, + MQTTEncoderStatusError +} MQTTEncoderStatus; + +- (id)initWithStream:(NSOutputStream*)aStream + runLoop:(NSRunLoop*)aRunLoop + runLoopMode:(NSString*)aMode; +- (void)setDelegate:(id)aDelegate; +- (void)open; +- (void)close; +- (MQTTEncoderStatus)status; +- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode; +- (void)encodeMessage:(MQTTMessage*)msg; + +@end + +@interface NSObject (MQTTEncoderDelegate) +- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent)eventCode; + +@end diff --git a/src/MqttSDK/MQTTEncoder.m b/src/MqttSDK/MQTTEncoder.m new file mode 100644 index 0000000..c99e0c2 --- /dev/null +++ b/src/MqttSDK/MQTTEncoder.m @@ -0,0 +1,161 @@ +// +// MQTTEncoder.m +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import "MQTTEncoder.h" + +@implementation MQTTEncoder + +- (id)initWithStream:(NSOutputStream*)aStream + runLoop:(NSRunLoop*)aRunLoop + runLoopMode:(NSString*)aMode { + status = MQTTEncoderStatusInitializing; + stream = aStream; + [stream setDelegate:self]; + runLoop = aRunLoop; + runLoopMode = aMode; + return self; +} + +- (void)setDelegate:(id)aDelegate { + delegate = aDelegate; +} + +- (MQTTEncoderStatus)status { + return status; +} + +- (void)open { + [stream setDelegate:self]; + [stream scheduleInRunLoop:runLoop forMode:runLoopMode]; + [stream open]; +} + +- (void)close { + [stream close]; + [stream setDelegate:nil]; + [stream removeFromRunLoop:runLoop forMode:runLoopMode]; + stream = nil; +} + +- (void)stream:(NSStream*)sender handleEvent:(NSStreamEvent)eventCode { + if(stream == nil) + return; + assert(sender == stream); + switch (eventCode) { + case NSStreamEventOpenCompleted: + break; + case NSStreamEventHasSpaceAvailable: + if (status == MQTTEncoderStatusInitializing) { + status = MQTTEncoderStatusReady; + [delegate encoder:self handleEvent:MQTTEncoderEventReady]; + } + else if (status == MQTTEncoderStatusReady) { + [delegate encoder:self handleEvent:MQTTEncoderEventReady]; + } + else if (status == MQTTEncoderStatusSending) { + UInt8* ptr; + NSInteger n, length; + + ptr = (UInt8*) [buffer bytes] + byteIndex; + // Number of bytes pending for transfer + length = [buffer length] - byteIndex; + n = [stream write:ptr maxLength:length]; + if (n == -1) { + status = MQTTEncoderStatusError; + [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred]; + } + else if (n < length) { + byteIndex += n; + } + else { + buffer = NULL; + byteIndex = 0; + status = MQTTEncoderStatusReady; + } + } + break; + case NSStreamEventErrorOccurred: + case NSStreamEventEndEncountered: + if (status != MQTTEncoderStatusError) { + status = MQTTEncoderStatusError; + [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred]; + } + break; + default: + NSLog(@"Oops, event code not handled: 0x%02x", eventCode); + break; + } +} + +- (void)encodeMessage:(MQTTMessage*)msg { + UInt8 header; + NSInteger n, length; + + if (status != MQTTEncoderStatusReady) { + NSLog(@"Encoder not ready"); + return; + } + + assert (buffer == NULL); + assert (byteIndex == 0); + + buffer = [[NSMutableData alloc] init]; + + // encode fixed header + header = [msg type] << 4; + if ([msg isDuplicate]) { + header |= 0x08; + } + header |= [msg qos] << 1; + if ([msg retainFlag]) { + header |= 0x01; + } + [buffer appendBytes:&header length:1]; + + // encode remaining length + length = [[msg data] length]; + do { + UInt8 digit = length % 128; + length /= 128; + if (length > 0) { + digit |= 0x80; + } + [buffer appendBytes:&digit length:1]; + } + while (length > 0); + + // encode message data + if ([msg data] != NULL) { + [buffer appendData:[msg data]]; + } + + n = [stream write:[buffer bytes] maxLength:[buffer length]]; + if (n == -1) { + status = MQTTEncoderStatusError; + [delegate encoder:self handleEvent:MQTTEncoderEventErrorOccurred]; + } + else if (n < [buffer length]) { + byteIndex += n; + status = MQTTEncoderStatusSending; + } + else { + buffer = NULL; + // XXX [delegate encoder:self handleEvent:MQTTEncoderEventReady]; + } +} + +@end diff --git a/src/MqttSDK/MQTTMessage.h b/src/MqttSDK/MQTTMessage.h new file mode 100644 index 0000000..35f98bd --- /dev/null +++ b/src/MqttSDK/MQTTMessage.h @@ -0,0 +1,104 @@ +// +// MQTTMessage.h +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import <Foundation/Foundation.h> + +@interface MQTTMessage : NSObject { + UInt8 type; + UInt8 qos; + BOOL retainFlag; + BOOL dupFlag; +} + +enum { + MQTTConnect = 1, + MQTTConnack = 2, + MQTTPublish = 3, + MQTTPuback = 4, + MQTTPubrec = 5, + MQTTPubrel = 6, + MQTTPubcomp = 7, + MQTTSubscribe = 8, + MQTTSuback = 9, + MQTTUnsubscribe = 10, + MQTTUnsuback = 11, + MQTTPingreq = 12, + MQTTPingresp = 13, + MQTTDisconnect = 14 +}; + +// instance methods ++ (id)connectMessageWithClientId:(NSString*)clientId + userName:(NSString*)userName + password:(NSString*)password + keepAlive:(NSInteger)keeplive + cleanSession:(BOOL)cleanSessionFlag; ++ (id)connectMessageWithClientId:(NSString*)clientId + userName:(NSString*)userName + password:(NSString*)password + keepAlive:(NSInteger)keeplive + cleanSession:(BOOL)cleanSessionFlag + willTopic:(NSString*)willTopic + willMsg:(NSData*)willData + willQoS:(UInt8)willQoS + willRetain:(BOOL)willRetainFlag; + ++ (id)pingreqMessage; ++ (id)subscribeMessageWithMessageId:(UInt16)msgId + topic:(NSString*)topic + qos:(UInt8)qos; ++ (id)unsubscribeMessageWithMessageId:(UInt16)msgId + topic:(NSString*)topic; ++ (id)publishMessageWithData:(NSData*)payload + onTopic:(NSString*)theTopic + retainFlag:(BOOL)retain; ++ (id)publishMessageWithData:(NSData*)payload + onTopic:(NSString*)topic + qos:(UInt8)qosLevel + msgId:(UInt16)msgId + retainFlag:(BOOL)retain + dupFlag:(BOOL)dup; ++ (id)pubackMessageWithMessageId:(UInt16)msgId; ++ (id)pubrecMessageWithMessageId:(UInt16)msgId; ++ (id)pubrelMessageWithMessageId:(UInt16)msgId; ++ (id)pubcompMessageWithMessageId:(UInt16)msgId; + +- (id)initWithType:(UInt8)aType; +- (id)initWithType:(UInt8)aType data:(NSData*)aData; +- (id)initWithType:(UInt8)aType + qos:(UInt8)aQos + data:(NSData*)aData; +- (id)initWithType:(UInt8)aType + qos:(UInt8)aQos + retainFlag:(BOOL)aRetainFlag + dupFlag:(BOOL)aDupFlag + data:(NSData*)aData; +- (void)setDupFlag; +- (UInt8)type; +- (UInt8)qos; +- (BOOL)retainFlag; +- (BOOL)isDuplicate; +@property (strong,nonatomic) NSData * data; + +@end + +@interface NSMutableData (MQTT) +- (void)appendByte:(UInt8)byte; +- (void)appendUInt16BigEndian:(UInt16)val; +- (void)appendMQTTString:(NSString*)s; + +@end diff --git a/src/MqttSDK/MQTTMessage.m b/src/MqttSDK/MQTTMessage.m new file mode 100644 index 0000000..5871bdb --- /dev/null +++ b/src/MqttSDK/MQTTMessage.m @@ -0,0 +1,268 @@ +// +// MQTTMessage.m +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import "MQTTMessage.h" + +@implementation MQTTMessage + ++ (id)connectMessageWithClientId:(NSString*)clientId + userName:(NSString*)userName + password:(NSString*)password + keepAlive:(NSInteger)keepAlive + cleanSession:(BOOL)cleanSessionFlag { + MQTTMessage* msg; + UInt8 flags = 0x00; + + if (cleanSessionFlag) { + flags |= 0x02; + } + if ([userName length] > 0) { + flags |= 0x80; + if ([password length] > 0) { + flags |= 0x40; + } + } + + NSMutableData* data = [NSMutableData data]; + [data appendMQTTString:@"MQIsdp"]; + [data appendByte:3]; + [data appendByte:flags]; + [data appendUInt16BigEndian:keepAlive]; + [data appendMQTTString:clientId]; + if ([userName length] > 0) { + [data appendMQTTString:userName]; + if ([password length] > 0) { + [data appendMQTTString:password]; + } + } + + msg = [[MQTTMessage alloc] initWithType:MQTTConnect data:data]; + return msg; +} + ++ (id)connectMessageWithClientId:(NSString*)clientId + userName:(NSString*)userName + password:(NSString*)password + keepAlive:(NSInteger)keepAlive + cleanSession:(BOOL)cleanSessionFlag + willTopic:(NSString*)willTopic + willMsg:(NSData*)willMsg + willQoS:(UInt8)willQoS + willRetain:(BOOL)willRetainFlag { + UInt8 flags = 0x04 | (willQoS << 4 & 0x18); + + if (willRetainFlag) { + flags |= 0x20; + } + if (cleanSessionFlag) { + flags |= 0x02; + } + if ([userName length] > 0) { + flags |= 0x80; + if ([password length] > 0) { + flags |= 0x40; + } + } + + NSMutableData* data = [NSMutableData data]; + [data appendMQTTString:@"MQIsdp"]; + [data appendByte:3]; + [data appendByte:flags]; + [data appendUInt16BigEndian:keepAlive]; + [data appendMQTTString:clientId]; + [data appendMQTTString:willTopic]; + [data appendUInt16BigEndian:[willMsg length]]; + [data appendData:willMsg]; + if ([userName length] > 0) { + [data appendMQTTString:userName]; + if ([password length] > 0) { + [data appendMQTTString:password]; + } + } + + MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTConnect + data:data]; + return msg; +} + ++ (id)pingreqMessage { + return [[MQTTMessage alloc] initWithType:MQTTPingreq]; +} + ++ (id)subscribeMessageWithMessageId:(UInt16)msgId + topic:(NSString*)topic + qos:(UInt8)qos { + NSMutableData* data = [NSMutableData data]; + [data appendUInt16BigEndian:msgId]; + [data appendMQTTString:topic]; + [data appendByte:qos]; + MQTTMessage* msg = [[MQTTMessage alloc] initWithType:MQTTSubscribe + qos:1 + data:data]; + return msg; +} + ++ (id)unsubscribeMessageWithMessageId:(UInt16)msgId + topic:(NSString*)topic { + NSMutableData* data = [NSMutableData data]; + [data appendUInt16BigEndian:msgId]; + [data appendMQTTString:topic]; + MQTTMessage* msg = [[MQTTMessage alloc] initWithType:MQTTUnsubscribe + qos:1 + data:data]; + return msg; +} + ++ (id)publishMessageWithData:(NSData*)payload + onTopic:(NSString*)topic + retainFlag:(BOOL)retain { + NSMutableData* data = [NSMutableData data]; + [data appendMQTTString:topic]; + [data appendData:payload]; + MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTPublish + qos:0 + retainFlag:retain + dupFlag:false + data:data]; + return msg; +} + ++ (id)publishMessageWithData:(NSData*)payload + onTopic:(NSString*)topic + qos:(UInt8)qosLevel + msgId:(UInt16)msgId + retainFlag:(BOOL)retain + dupFlag:(BOOL)dup { + NSMutableData* data = [NSMutableData data]; + [data appendMQTTString:topic]; + [data appendUInt16BigEndian:msgId]; + [data appendData:payload]; + MQTTMessage *msg = [[MQTTMessage alloc] initWithType:MQTTPublish + qos:qosLevel + retainFlag:retain + dupFlag:dup + data:data]; + return msg; +} + ++ (id)pubackMessageWithMessageId:(UInt16)msgId { + NSMutableData* data = [NSMutableData data]; + [data appendUInt16BigEndian:msgId]; + return [[MQTTMessage alloc] initWithType:MQTTPuback + data:data]; +} + ++ (id)pubrecMessageWithMessageId:(UInt16)msgId { + NSMutableData* data = [NSMutableData data]; + [data appendUInt16BigEndian:msgId]; + return [[MQTTMessage alloc] initWithType:MQTTPubrec + data:data]; +} + ++ (id)pubrelMessageWithMessageId:(UInt16)msgId { + NSMutableData* data = [NSMutableData data]; + [data appendUInt16BigEndian:msgId]; + return [[MQTTMessage alloc] initWithType:MQTTPubrel + data:data]; +} + ++ (id)pubcompMessageWithMessageId:(UInt16)msgId { + NSMutableData* data = [NSMutableData data]; + [data appendUInt16BigEndian:msgId]; + return [[MQTTMessage alloc] initWithType:MQTTPubcomp + data:data]; +} + +- (id)initWithType:(UInt8)aType { + type = aType; + self.data = nil; + return self; +} + +- (id)initWithType:(UInt8)aType data:(NSData*)aData { + type = aType; + self.data = aData; + return self; +} + +- (id)initWithType:(UInt8)aType + qos:(UInt8)aQos + data:(NSData*)aData { + type = aType; + qos = aQos; + self.data = aData; + return self; +} + +- (id)initWithType:(UInt8)aType + qos:(UInt8)aQos + retainFlag:(BOOL)aRetainFlag + dupFlag:(BOOL)aDupFlag + data:(NSData*)aData { + type = aType; + qos = aQos; + retainFlag = aRetainFlag; + dupFlag = aDupFlag; + self.data = aData; + return self; +} + +- (void)setDupFlag { + dupFlag = true; +} + +- (UInt8)type { + return type; +} + +- (UInt8)qos { + return qos; +} + +- (BOOL)retainFlag { + return retainFlag; +} + +- (BOOL)isDuplicate { + return dupFlag; +} + + +@end + +@implementation NSMutableData (MQTT) + +- (void)appendByte:(UInt8)byte { + [self appendBytes:&byte length:1]; +} + +- (void)appendUInt16BigEndian:(UInt16)val { + [self appendByte:val / 256]; + [self appendByte:val % 256]; +} + +- (void)appendMQTTString:(NSString*)string { + UInt8 buf[2]; + const char* utf8String = [string UTF8String]; + int strLen = strlen(utf8String); + buf[0] = strLen / 256; + buf[1] = strLen % 256; + [self appendBytes:buf length:2]; + [self appendBytes:utf8String length:strLen]; +} + +@end diff --git a/src/MqttSDK/MQTTSession.h b/src/MqttSDK/MQTTSession.h new file mode 100644 index 0000000..bb01e03 --- /dev/null +++ b/src/MqttSDK/MQTTSession.h @@ -0,0 +1,148 @@ +// +// MQTTSession.h +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import <Foundation/Foundation.h> +#import "MQTTDecoder.h" +#import "MQTTEncoder.h" + +typedef enum { + MQTTSessionStatusCreated, + MQTTSessionStatusConnecting, + MQTTSessionStatusConnected, + MQTTSessionStatusError +} MQTTSessionStatus; + +typedef enum { + MQTTSessionEventConnected, + MQTTSessionEventConnectionRefused, + MQTTSessionEventConnectionClosed, + MQTTSessionEventConnectionError, + MQTTSessionEventProtocolError +} MQTTSessionEvent; + +@interface MQTTSession : NSObject { + MQTTSessionStatus status; + NSString* clientId; + //NSString* userName; + //NSString* password; + UInt16 keepAliveInterval; + BOOL cleanSessionFlag; + MQTTMessage* connectMessage; + NSRunLoop* runLoop; + NSString* runLoopMode; + NSTimer* timer; + NSInteger idleTimer; + MQTTEncoder* encoder; + MQTTDecoder* decoder; + UInt16 txMsgId; + id delegate; + NSMutableDictionary* txFlows; + NSMutableDictionary* rxFlows; + unsigned int ticks; +} + +- (id)initWithClientId:(NSString*)theClientId; +- (id)initWithClientId:(NSString*)theClientId runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode; +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUsername + password:(NSString*)thePassword; +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode; +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUsername + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)cleanSessionFlag; +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUsername + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAlive + cleanSession:(BOOL)theCleanSessionFlag + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theMode; +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)theCleanSessionFlag + willTopic:(NSString*)willTopic + willMsg:(NSData*)willMsg + willQoS:(UInt8)willQoS + willRetainFlag:(BOOL)willRetainFlag; +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)theCleanSessionFlag + willTopic:(NSString*)willTopic + willMsg:(NSData*)willMsg + willQoS:(UInt8)willQoS + willRetainFlag:(BOOL)willRetainFlag + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode; +- (id)initWithClientId:(NSString*)theClientId + keepAlive:(UInt16)theKeepAliveInterval + connectMessage:(MQTTMessage*)theConnectMessage + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode; + +- (void)dealloc; +- (void)close; +- (void)setDelegate:aDelegate; +- (void)connectToHost:(NSString*)ip port:(UInt32)port; +- (void)connectToHost:(NSString*)ip port:(UInt32)port usingSSL:(BOOL)usingSSL; +- (void)subscribeTopic:(NSString*)theTopic; +- (void)subscribeToTopic:(NSString*)topic atLevel:(UInt8)qosLevel; +- (void)unsubscribeTopic:(NSString*)theTopic; +- (void)publishData:(NSData*)theData onTopic:(NSString*)theTopic; +- (void)publishDataAtLeastOnce:(NSData*)theData onTopic:(NSString*)theTopic; +- (void)publishDataAtLeastOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag; +- (void)publishDataAtMostOnce:(NSData*)theData onTopic:(NSString*)theTopic; +- (void)publishDataAtMostOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag; +- (void)publishDataExactlyOnce:(NSData*)theData onTopic:(NSString*)theTopic; +- (void)publishDataExactlyOnce:(NSData*)theData onTopic:(NSString*)theTopic retain:(BOOL)retainFlag; +- (void)publishJson:(id)payload onTopic:(NSString*)theTopic; +- (void)timerHandler:(NSTimer*)theTimer; +- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode; +- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent) eventCode; +- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*) msg; + +// private methods +- (void)newMessage:(MQTTMessage*)msg; +- (void)error:(MQTTSessionEvent)event; +- (void)handlePublish:(MQTTMessage*)msg; +- (void)handlePuback:(MQTTMessage*)msg; +- (void)handlePubrec:(MQTTMessage*)msg; +- (void)handlePubrel:(MQTTMessage*)msg; +- (void)handlePubcomp:(MQTTMessage*)msg; +- (void)send:(MQTTMessage*)msg; +- (UInt16)nextMsgId; + +@property (strong,atomic) NSMutableArray* queue; +@property (strong,atomic) NSMutableArray* timerRing; + +@end + +@interface NSObject (MQTTSessionDelegate) +- (void)session:(MQTTSession*)session handleEvent:(MQTTSessionEvent)eventCode; +- (void)session:(MQTTSession*)session newMessage:(NSData*)data onTopic:(NSString*)topic; + +@end diff --git a/src/MqttSDK/MQTTSession.m b/src/MqttSDK/MQTTSession.m new file mode 100644 index 0000000..9dd1fc2 --- /dev/null +++ b/src/MqttSDK/MQTTSession.m @@ -0,0 +1,630 @@ +// +// MQTTSession.m +// MQtt Client +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import "MQTTSession.h" +#import "MQttTxFlow.h" + +@implementation MQTTSession + +- (id)initWithClientId:(NSString*)theClientId { + return [self initWithClientId:theClientId userName:@"" password:@""]; +} + +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword { + return [self initWithClientId:theClientId + userName:theUserName + password:thePassword + keepAlive:60 + cleanSession:YES]; +} + +- (id)initWithClientId:(NSString*)theClientId runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode { + return [self initWithClientId:theClientId userName:@"" password:@"" runLoop:theRunLoop forMode:theRunLoopMode]; +} + +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode { + return [self initWithClientId:theClientId + userName:theUserName + password:thePassword + keepAlive:60 + cleanSession:YES + runLoop:theRunLoop + forMode:theRunLoopMode]; +} + + +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)theCleanSessionFlag { + return [self initWithClientId:theClientId + userName:theUserName + password:thePassword + keepAlive:theKeepAliveInterval + cleanSession:theCleanSessionFlag + runLoop:[NSRunLoop currentRunLoop] + forMode:NSDefaultRunLoopMode]; +} + +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)theCleanSessionFlag + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode { + MQTTMessage *msg = [MQTTMessage connectMessageWithClientId:theClientId + userName:theUserName + password:thePassword + keepAlive:theKeepAliveInterval + cleanSession:theCleanSessionFlag]; + return [self initWithClientId:theClientId + keepAlive:theKeepAliveInterval + connectMessage:msg + runLoop:theRunLoop + forMode:theRunLoopMode]; +} + +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)theCleanSessionFlag + willTopic:(NSString*)willTopic + willMsg:(NSData*)willMsg + willQoS:(UInt8)willQoS + willRetainFlag:(BOOL)willRetainFlag { + return [self initWithClientId:theClientId + userName:theUserName + password:thePassword + keepAlive:theKeepAliveInterval + cleanSession:theCleanSessionFlag + willTopic:willTopic + willMsg:willMsg + willQoS:willQoS + willRetainFlag:willRetainFlag + runLoop:[NSRunLoop currentRunLoop] + forMode:NSDefaultRunLoopMode]; +} + +- (id)initWithClientId:(NSString*)theClientId + userName:(NSString*)theUserName + password:(NSString*)thePassword + keepAlive:(UInt16)theKeepAliveInterval + cleanSession:(BOOL)theCleanSessionFlag + willTopic:(NSString*)willTopic + willMsg:(NSData*)willMsg + willQoS:(UInt8)willQoS + willRetainFlag:(BOOL)willRetainFlag + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode { + MQTTMessage *msg = [MQTTMessage connectMessageWithClientId:theClientId + userName:theUserName + password:thePassword + keepAlive:theKeepAliveInterval + cleanSession:theCleanSessionFlag + willTopic:willTopic + willMsg:willMsg + willQoS:willQoS + willRetain:willRetainFlag]; + return [self initWithClientId:theClientId + keepAlive:theKeepAliveInterval + connectMessage:msg + runLoop:theRunLoop + forMode:theRunLoopMode]; +} + +- (id)initWithClientId:(NSString*)theClientId + keepAlive:(UInt16)theKeepAliveInterval + connectMessage:(MQTTMessage*)theConnectMessage + runLoop:(NSRunLoop*)theRunLoop + forMode:(NSString*)theRunLoopMode { + clientId = theClientId; + keepAliveInterval = theKeepAliveInterval; + connectMessage = theConnectMessage; + runLoop = theRunLoop; + runLoopMode = theRunLoopMode; + + self.queue = [NSMutableArray array]; + txMsgId = 1; + txFlows = [[NSMutableDictionary alloc] init]; + rxFlows = [[NSMutableDictionary alloc] init]; + self.timerRing = [[NSMutableArray alloc] initWithCapacity:60]; + int i; + for (i = 0; i < 60; i++) { + [self.timerRing addObject:[NSMutableSet set]]; + } + ticks = 0; + + return self; +} + +- (void)dealloc { + [encoder close]; + encoder = nil; + [decoder close]; + decoder = nil; + if (timer != nil) { + [timer invalidate]; + timer = nil; + } +} + +- (void)close { + [encoder close]; + [decoder close]; + encoder = nil; + decoder = nil; + if (timer != nil) { + [timer invalidate]; + timer = nil; + } + [self error:MQTTSessionEventConnectionClosed]; +} + +- (void)setDelegate:(id)aDelegate { + delegate = aDelegate; +} + +- (void)connectToHost:(NSString*)ip port:(UInt32)port { + [self connectToHost:ip port:port usingSSL:false]; +} + +- (void)connectToHost:(NSString*)ip port:(UInt32)port usingSSL:(BOOL)usingSSL { + + status = MQTTSessionStatusCreated; + + CFReadStreamRef readStream; + CFWriteStreamRef writeStream; + + CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)ip, port, &readStream, &writeStream); + + if (usingSSL) { + const void *keys[] = { kCFStreamSSLLevel, + kCFStreamSSLPeerName }; + + const void *vals[] = { kCFStreamSocketSecurityLevelNegotiatedSSL, + kCFNull }; + + CFDictionaryRef sslSettings = CFDictionaryCreate(kCFAllocatorDefault, keys, vals, 2, + &kCFTypeDictionaryKeyCallBacks, + &kCFTypeDictionaryValueCallBacks); + + CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, sslSettings); + CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, sslSettings); + + CFRelease(sslSettings); + } + + encoder = [[MQTTEncoder alloc] initWithStream:(__bridge NSOutputStream*)writeStream + runLoop:runLoop + runLoopMode:runLoopMode]; + + decoder = [[MQTTDecoder alloc] initWithStream:(__bridge NSInputStream*)readStream + runLoop:runLoop + runLoopMode:runLoopMode]; + + [encoder setDelegate:self]; + [decoder setDelegate:self]; + + [encoder open]; + [decoder open]; +} + +- (void)subscribeTopic:(NSString*)theTopic { + [self subscribeToTopic:theTopic atLevel:0]; +} + +- (void) subscribeToTopic:(NSString*)topic + atLevel:(UInt8)qosLevel { + [self send:[MQTTMessage subscribeMessageWithMessageId:[self nextMsgId] + topic:topic + qos:qosLevel]]; +} + +- (void)unsubscribeTopic:(NSString*)theTopic { + [self send:[MQTTMessage unsubscribeMessageWithMessageId:[self nextMsgId] + topic:theTopic]]; +} + +- (void)publishData:(NSData*)data onTopic:(NSString*)topic { + [self publishDataAtMostOnce:data onTopic:topic]; +} + +- (void)publishDataAtMostOnce:(NSData*)data + onTopic:(NSString*)topic { + [self publishDataAtMostOnce:data onTopic:topic retain:false]; +} + +- (void)publishDataAtMostOnce:(NSData*)data + onTopic:(NSString*)topic + retain:(BOOL)retainFlag { + [self send:[MQTTMessage publishMessageWithData:data + onTopic:topic + retainFlag:retainFlag]]; +} + +- (void)publishDataAtLeastOnce:(NSData*)data + onTopic:(NSString*)topic { + [self publishDataAtLeastOnce:data onTopic:topic retain:false]; +} + +- (void)publishDataAtLeastOnce:(NSData*)data + onTopic:(NSString*)topic + retain:(BOOL)retainFlag { + UInt16 msgId = [self nextMsgId]; + MQTTMessage *msg = [MQTTMessage publishMessageWithData:data + onTopic:topic + qos:1 + msgId:msgId + retainFlag:retainFlag + dupFlag:false]; + MQttTxFlow *flow = [MQttTxFlow flowWithMsg:msg + deadline:(ticks + 60)]; + [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]]; + [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]]; + [self send:msg]; +} + +- (void)publishDataExactlyOnce:(NSData*)data + onTopic:(NSString*)topic { + [self publishDataExactlyOnce:data onTopic:topic retain:false]; +} + +- (void)publishDataExactlyOnce:(NSData*)data + onTopic:(NSString*)topic + retain:(BOOL)retainFlag { + UInt16 msgId = [self nextMsgId]; + MQTTMessage *msg = [MQTTMessage publishMessageWithData:data + onTopic:topic + qos:2 + msgId:msgId + retainFlag:retainFlag + dupFlag:false]; + MQttTxFlow *flow = [MQttTxFlow flowWithMsg:msg + deadline:(ticks + 60)]; + [txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]]; + [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]]; + [self send:msg]; +} + +- (void)publishJson:(id)payload onTopic:(NSString*)theTopic { + NSError * error = nil; + NSData * data = [NSJSONSerialization dataWithJSONObject:payload options:0 error:&error]; + if(error!=nil){ + //NSLog(@"Error creating JSON: %@",error.description); + return; + } + [self publishData:data onTopic:theTopic]; +} + +- (void)timerHandler:(NSTimer*)theTimer { + idleTimer++; + if (idleTimer >= keepAliveInterval) { + if ([encoder status] == MQTTEncoderStatusReady) { + //NSLog(@"sending PINGREQ"); + [encoder encodeMessage:[MQTTMessage pingreqMessage]]; + idleTimer = 0; + } + } + ticks++; + NSEnumerator *e = [[self.timerRing objectAtIndex:(ticks % 60)] objectEnumerator]; + id msgId; + + while ((msgId = [e nextObject])) { + MQttTxFlow *flow = [txFlows objectForKey:msgId]; + MQTTMessage *msg = [flow msg]; + [flow setDeadline:(ticks + 60)]; + [msg setDupFlag]; + [self send:msg]; + } +} + +- (void)encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode { + // NSLog(@"encoder:(MQTTEncoder*)sender handleEvent:(MQTTEncoderEvent) eventCode "); + if(sender == encoder) { + switch (eventCode) { + case MQTTEncoderEventReady: + switch (status) { + case MQTTSessionStatusCreated: + //NSLog(@"Encoder has been created. Sending Auth Message"); + [sender encodeMessage:connectMessage]; + status = MQTTSessionStatusConnecting; + break; + case MQTTSessionStatusConnecting: + break; + case MQTTSessionStatusConnected: + if ([self.queue count] > 0) { + MQTTMessage *msg = [self.queue objectAtIndex:0]; + [self.queue removeObjectAtIndex:0]; + [encoder encodeMessage:msg]; + } + break; + case MQTTSessionStatusError: + break; + } + break; + case MQTTEncoderEventErrorOccurred: + [self error:MQTTSessionEventConnectionError]; + break; + } + } +} + +- (void)decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode { + //NSLog(@"decoder:(MQTTDecoder*)sender handleEvent:(MQTTDecoderEvent)eventCode"); + if(sender == decoder) { + MQTTSessionEvent event; + switch (eventCode) { + case MQTTDecoderEventConnectionClosed: + event = MQTTSessionEventConnectionError; + break; + case MQTTDecoderEventConnectionError: + event = MQTTSessionEventConnectionError; + break; + case MQTTDecoderEventProtocolError: + event = MQTTSessionEventProtocolError; + break; + } + [self error:event]; + } +} + +- (void)decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg { + //NSLog(@"decoder:(MQTTDecoder*)sender newMessage:(MQTTMessage*)msg "); + if(sender == decoder){ + switch (status) { + case MQTTSessionStatusConnecting: + switch ([msg type]) { + case MQTTConnack: + if ([[msg data] length] != 2) { + [self error:MQTTSessionEventProtocolError]; + } + else { + const UInt8 *bytes = [[msg data] bytes]; + if (bytes[1] == 0) { + status = MQTTSessionStatusConnected; + timer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:1.0] + interval:1.0 + target:self + selector:@selector(timerHandler:) + userInfo:nil + repeats:YES]; + if ([delegate respondsToSelector:@selector(session:handleEvent:)]) { + [delegate session:self handleEvent:MQTTSessionEventConnected]; + } + [runLoop addTimer:timer forMode:runLoopMode]; + } + else { + [self error:MQTTSessionEventConnectionRefused]; + } + } + break; + default: + [self error:MQTTSessionEventProtocolError]; + break; + } + break; + case MQTTSessionStatusConnected: + [self newMessage:msg]; + break; + default: + break; + } + } +} + +- (void)newMessage:(MQTTMessage*)msg { + switch ([msg type]) { + case MQTTPublish: + [self handlePublish:msg]; + break; + case MQTTPuback: + [self handlePuback:msg]; + break; + case MQTTPubrec: + [self handlePubrec:msg]; + break; + case MQTTPubrel: + [self handlePubrel:msg]; + break; + case MQTTPubcomp: + [self handlePubcomp:msg]; + break; + default: + return; + } +} + +- (void)handlePublish:(MQTTMessage*)msg { + if (![delegate respondsToSelector:@selector(session:newMessage:onTopic:)]) { + return; + } + NSData *data = [msg data]; + if ([data length] < 2) { + return; + } + UInt8 const *bytes = [data bytes]; + UInt16 topicLength = 256 * bytes[0] + bytes[1]; + if ([data length] < 2 + topicLength) { + return; + } + NSData *topicData = [data subdataWithRange:NSMakeRange(2, topicLength)]; + NSString *topic = [[NSString alloc] initWithData:topicData + encoding:NSUTF8StringEncoding]; + NSRange range = NSMakeRange(2 + topicLength, [data length] - topicLength - 2); + data = [data subdataWithRange:range]; + if ([msg qos] == 0) { + [delegate session:self newMessage:data onTopic:topic]; + } + else { + if ([data length] < 2) { + return; + } + bytes = [data bytes]; + UInt16 msgId = 256 * bytes[0] + bytes[1]; + if (msgId == 0) { + return; + } + data = [data subdataWithRange:NSMakeRange(2, [data length] - 2)]; + if ([msg qos] == 1) { + [delegate session:self newMessage:data onTopic:topic]; + [self send:[MQTTMessage pubackMessageWithMessageId:msgId]]; + } + else { + NSDictionary *dict = [NSDictionary dictionaryWithObjectsAndKeys: + data, @"data", topic, @"topic", nil]; + [rxFlows setObject:dict forKey:[NSNumber numberWithUnsignedInt:msgId]]; + [self send:[MQTTMessage pubrecMessageWithMessageId:msgId]]; + } + } +} + +- (void)handlePuback:(MQTTMessage*)msg { + if ([[msg data] length] != 2) { + return; + } + UInt8 const *bytes = [[msg data] bytes]; + NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])]; + if ([msgId unsignedIntValue] == 0) { + return; + } + MQttTxFlow *flow = [txFlows objectForKey:msgId]; + if (flow == nil) { + return; + } + + if ([[flow msg] type] != MQTTPublish || [[flow msg] qos] != 1) { + return; + } + + [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId]; + [txFlows removeObjectForKey:msgId]; +} + +- (void)handlePubrec:(MQTTMessage*)msg { + if ([[msg data] length] != 2) { + return; + } + UInt8 const *bytes = [[msg data] bytes]; + NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])]; + if ([msgId unsignedIntValue] == 0) { + return; + } + MQttTxFlow *flow = [txFlows objectForKey:msgId]; + if (flow == nil) { + return; + } + msg = [flow msg]; + if ([msg type] != MQTTPublish || [msg qos] != 2) { + return; + } + msg = [MQTTMessage pubrelMessageWithMessageId:[msgId unsignedIntValue]]; + [flow setMsg:msg]; + [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId]; + [flow setDeadline:(ticks + 60)]; + [[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:msgId]; + + [self send:msg]; +} + +- (void)handlePubrel:(MQTTMessage*)msg { + if ([[msg data] length] != 2) { + return; + } + UInt8 const *bytes = [[msg data] bytes]; + NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])]; + if ([msgId unsignedIntValue] == 0) { + return; + } + NSDictionary *dict = [rxFlows objectForKey:msgId]; + if (dict != nil) { + [delegate session:self + newMessage:[dict valueForKey:@"data"] + onTopic:[dict valueForKey:@"topic"]]; + [rxFlows removeObjectForKey:msgId]; + } + [self send:[MQTTMessage pubcompMessageWithMessageId:[msgId unsignedIntegerValue]]]; +} + +- (void)handlePubcomp:(MQTTMessage*)msg { + if ([[msg data] length] != 2) { + return; + } + UInt8 const *bytes = [[msg data] bytes]; + NSNumber *msgId = [NSNumber numberWithUnsignedInt:(256 * bytes[0] + bytes[1])]; + if ([msgId unsignedIntValue] == 0) { + return; + } + MQttTxFlow *flow = [txFlows objectForKey:msgId]; + if (flow == nil || [[flow msg] type] != MQTTPubrel) { + return; + } + + [[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId]; + [txFlows removeObjectForKey:msgId]; +} + +- (void)error:(MQTTSessionEvent)eventCode { + + [encoder close]; + encoder = nil; + + [decoder close]; + decoder = nil; + + if (timer != nil) { + [timer invalidate]; + + timer = nil; + } + status = MQTTSessionStatusError; + + usleep(1000000); // 1 sec delay + + if ([delegate respondsToSelector:@selector(session:handleEvent:)]) { + [delegate session:self handleEvent:eventCode]; + } + +} + +- (void)send:(MQTTMessage*)msg { + if ([encoder status] == MQTTEncoderStatusReady) { + [encoder encodeMessage:msg]; + } + else { + [self.queue addObject:msg]; + } +} + +- (UInt16)nextMsgId { + txMsgId++; + while (txMsgId == 0 || [txFlows objectForKey:[NSNumber numberWithUnsignedInt:txMsgId]] != nil) { + txMsgId++; + } + return txMsgId; +} + +@end diff --git a/src/MqttSDK/MQttTxFlow.h b/src/MqttSDK/MQttTxFlow.h new file mode 100644 index 0000000..59d9986 --- /dev/null +++ b/src/MqttSDK/MQttTxFlow.h @@ -0,0 +1,36 @@ +// +// MQttTxFlow.h +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import <Foundation/Foundation.h> +#import "MQTTMessage.h" + +@interface MQttTxFlow : NSObject { + MQTTMessage * msg; + unsigned int deadline; +} + ++ (id)flowWithMsg:(MQTTMessage*)aMsg + deadline:(unsigned int)aDeadline; + +- (id)initWithMsg:(MQTTMessage*)aMsg deadline:(unsigned int)aDeadline; + +- (void)setMsg:(MQTTMessage*)aMsg; +- (void)setDeadline:(unsigned int)newValue; +- (MQTTMessage*)msg; +- (unsigned int)deadline; + +@end + diff --git a/src/MqttSDK/MQttTxFlow.m b/src/MqttSDK/MQttTxFlow.m new file mode 100644 index 0000000..80604f6 --- /dev/null +++ b/src/MqttSDK/MQttTxFlow.m @@ -0,0 +1,50 @@ +// +// MQtttTxFlow.m +// +// Copyright (c) 2011, 2013, 2lemetry LLC +// +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Eclipse Distribution License v. 1.0 which accompanies this distribution. +// The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html +// and the Eclipse Distribution License is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// Contributors: +// Kyle Roche - initial API and implementation and/or initial documentation +// + +#import "MQttTxFlow.h" + +@implementation MQttTxFlow + ++ (id)flowWithMsg:(MQTTMessage*)msg + deadline:(unsigned int)deadline { + return [[MQttTxFlow alloc] initWithMsg:msg deadline:deadline]; +} + +- (id)initWithMsg:(MQTTMessage*)aMsg + deadline:(unsigned int)aDeadline { + msg = aMsg; + deadline = aDeadline; + return self; +} + +- (void)setMsg:(MQTTMessage*)aMsg { + msg = aMsg; +} + +- (void)setDeadline:(unsigned int)newDeadline { + deadline = newDeadline; +} + +- (MQTTMessage*)msg { + return msg; +} + +- (unsigned int) deadline { + return deadline; +} + + +@end |