内容简介:EMQ 项目设计目标是承载移动终端或物联网终端海量 MQTT 连接,并实现在海量物联网设备间快速低延时消息路由:以下介绍其中的两种
EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。
EMQ 项目设计目标是承载移动终端或物联网终端海量 MQTT 连接,并实现在海量物联网设备间快速低延时消息路由:
- 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
- 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
- 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
- 完整物联网协议支持,MQTT、MQTT-SN、CoAP、WebSocket 或私有协议支持。
选择 MQTT SDK 分为多种
以下介绍其中的两种 MQTTKit 和 MQTT-Client-Framework 这两种都是OC 使用
Swift 版本可参考 CocoaMQTT
1、 ** MQTTKit **
已经不更新 但是基本使用没问题
pod 'MQTTKit'
头文件
#import <MQTTKit.h> #define WEAKSELF __typeof(&*self) __weak weakSelf = self; @property (nonatomic, strong) MQTTClient *client;
初始化 链接
WEAKSELF NSString *clientID = @"测试client - 必须是全局唯一的id "; MQTTClient *client = [[MQTTClient alloc] initWithClientId:StrFormat(@"%@", clientID)]; client.username = @"username"; client.password = @"password"; client.cleanSession = false; client.keepAlive = 20; client.port = 11883;// 端口号 根据服务端 选择 self.client = client; // 链接MQTT [client connectToHost:@"链接的MQTT的URL" completionHandler:^(MQTTConnectionReturnCode code) { if (code == ConnectionAccepted) { NSLog(@"链接MQTT 成功 :grin::grin::grin:"); // 链接成功 订阅相对应的主题 [weakSelf.client subscribe:@"你需要订阅的主题" withQos:AtLeastOnce completionHandler:^(NSArray *grantedQos) { DLog(@"订阅 返回 %@",grantedQos); }]; }else if (code == ConnectionRefusedBadUserNameOrPassword){ NSLog(@"MQTT 账号或验证码错误"); } else if (code == ConnectionRefusedUnacceptableProtocolVersion){ NSLog(@"MQTT 不可接受的协议"); }else if (code == ConnectionRefusedIdentiferRejected){ NSLog(@"MQTT不认可"); }else if (code == ConnectionRefusedServerUnavailable){ NSLog(@"MQTT拒绝链接"); }else { NSLog(@"MQTT 未授权"); } }]; // 接收消息体 client.messageHandler = ^(MQTTMessage *message) { NSString *jsonStr = [[NSString alloc] initWithData:message.payload encoding:NSUTF8StringEncoding]; NSLog(@"EasyMqttService mqtt connect success %@",jsonStr); }; 复制代码
订阅主题
// 方法 封装 可外部调用 -(void)subscribeType:(NSString *)example{ // 订阅主题 [self.client subscribe:@"你需要订阅的主题" withQos:AtMostOnce completionHandler:^(NSArray *grantedQos) { NSLog(@"订阅 返回 %@",grantedQos); }]; } 复制代码
关闭MQTTKit
-(void)closeMQTTClient{ WEAKSELF [self.client disconnectWithCompletionHandler:^(NSUInteger code) { // The client is disconnected when this completion handler is called NSLog(@"MQTT client is disconnected"); [weakSelf.client unsubscribe:@"已经订阅的主题" withCompletionHandler:^{ NSLog(@"取消订阅"); }]; }]; } 复制代码
发送消息
[self.client publishString:postMsg toTopic:@"发送消息的主题 根据服务端定" withQos:AtLeastOnce retain:NO completionHandler:^(int mid) { if (cmd != METHOD_SOCKET_CHAT_TO) { NSLog(@"发送消息 返回 %d",mid); } }]; 复制代码
2、 ** MQTTClient ** MQTTClient 配置更多 是可持续更新,可配置 SSL
基本使用
pod 'MQTTClient' websocket
方式连接
pod 'MQTTClient/MinL'
pod 'MQTTClient/ManagerL'
pod 'MQTTClient/WebsocketL'
头文件
基本使用
#import <MQTTClient.h> websocket
需要添加的头文件
#import <MQTTWebsocketTransport.h>
#define WEAKSELF __typeof(&*self) __weak weakSelf = self;
@property (nonatomic, strong) MQTTSession *mySession; 需要添加协议头
<MQTTSessionDelegate,MQTTSessionManagerDelegate>
初始化 链接
基本使用
#import "MQTTClient.h" \@interface MyDelegate : ... <MQTTSessionDelegate> ... MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init]; transport.host = @"localhost"; transport.port = 1883; MQTTSession *session = [[MQTTSession alloc] init]; session.transport = transport; session.delegate = self; [session connectAndWaitTimeout:30]; //this is part of the synchronous API 复制代码
websocket 连接
WEAKSELF NSString *clientID = @"测试client - 必须是全局唯一的id "; MQTTWebsocketTransport *transport = [[MQTTWebsocketTransport alloc] init]; transport.host = @"连接MQTT 地址"; transport.port = 8083; // 端口号 transport.tls = YES; // 根据需要配置 YES 开起 SSL 验证 此处为单向验证 双向验证 根据SDK 提供方法直接添加 MQTTSession *session = [[MQTTSession alloc] init]; NSString *linkUserName = @"username"; NSString *linkPassWord = @"password"; [session setUserName:linkUserName]; [session setClientId:clientID]; [session setPassword:linkPassWord]; [session setKeepAliveInterval:5]; session.transport = transport; session.delegate = self; self.mySession = session; [self reconnect]; [self.mySession addObserver:self forKeyPath:@"state" options:NSKeyValueObservingOptionNew|NSKeyValueObservingOptionOld context:nil]; //添加事件监听 复制代码
websocket
监听 响应事件
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context { switch (self.mySession.status) { case MQTTSessionManagerStateClosed: NSLog(@"连接已经关闭"); break; case MQTTSessionManagerStateClosing: NSLog(@"连接正在关闭"); break; case MQTTSessionManagerStateConnected: NSLog(@"已经连接"); break; case MQTTSessionManagerStateConnecting: NSLog(@"正在连接中"); break; case MQTTSessionManagerStateError: { // NSString *errorCode = self.mySession.lastErrorCode.localizedDescription; NSString *errorCode = self.mySession.description; NSLog(@"连接异常 ----- %@",errorCode); } break; case MQTTSessionManagerStateStarting: NSLog(@"开始连接"); break; default: break; } } 复制代码
session Delegate 协议
连接 返回状态
-(void)handleEvent:(MQTTSession *)session event:(MQTTSessionEvent)eventCode error:(NSError *)error{ if (eventCode == MQTTSessionEventConnected) { NSLog(@"2222222 链接MQTT 成功"); }else if (eventCode == MQTTSessionEventConnectionRefused) { NSLog(@"MQTT拒绝链接"); }else if (eventCode == MQTTSessionEventConnectionClosed){ NSLog(@"MQTT链接关闭"); }else if (eventCode == MQTTSessionEventConnectionError){ NSLog(@"MQTT 链接错误"); }else if (eventCode == MQTTSessionEventProtocolError){ NSLog(@"MQTT 不可接受的协议"); }else{//MQTTSessionEventConnectionClosedByBroker NSLog(@"MQTT链接 其他错误"); } if (error) { NSLog(@"链接报错 -- %@",error); } } 复制代码
收到发送的消息
-(void)newMessage:(MQTTSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid { NSDictionary *dic = [NSJSONSerialization JSONObjectWithData:data options:NSJSONReadingMutableContainers error:nil]; NSLog(@"EasyMqttService mqtt connect success %@",dic); // 做相对应的操作 } 复制代码
订阅主题
基本使用
// 方法 封装 可外部调用 [session subscribeToTopic:@"example/#" atLevel:2 subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss){ if (error) { NSLog(@"Subscription failed %@", error.localizedDescription); } else { NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss); } }]; // this is part of the block API 复制代码
websocket
- (void)subscibeToTopicAction { dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ dispatch_async(dispatch_get_main_queue(), ^{ [self subscibeToTopic:@"你要订阅的主题"]; }); }); } -(void)subscibeToTopic:(NSString *)topicUrl { // self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelAtMostOnce] forKey:topicUrl]; [self.mySession subscribeToTopic:topicUrl atLevel:MQTTQosLevelAtMostOnce subscribeHandler:^(NSError *error, NSArray<NSNumber *> *gQoss) { if (error) { NSLog(@"订阅 %@ 失败 原因 %@",topicUrl,error); }else { NSLog(@"订阅 %@ 成功 g1oss %@",topicUrl,gQoss); dispatch_async(dispatch_get_main_queue(), ^{ // 操作 }); }; }]; } 复制代码
关闭MQTT-Client
-(void)closeMQTTClient{ [self.mySession disconnect]; [self.mySession unsubscribeTopics:@[@"已经订阅的主题"] unsubscribeHandler:^(NSError *error) { if (error) { DLog(@"取消订阅失败"); }else{ DLog(@"取消订阅成功"); } }]; } 复制代码
发送消息
[self.mySession publishData:jsonData onTopic:@"发送消息的主题 服务端定义" retain:NO qos:MQTTQosLevelAtMostOnce publishHandler:^(NSError *error) { if (error) { NSLog(@"发送失败 - %@",error); }else{ NSLog(@"发送成功"); } }]; 复制代码
以上所述就是小编给大家介绍的《iOS MQTT 简单使用流程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- MUI使用个推推送流程分析
- Goland使用初探以及运行流程浅析
- freeeyes - TarvisCI 全流程使用实践(一)
- 如何使用 CODING 实践 DevOps 全流程
- 【git】前端使用git分支的开发流程
- Kafka简介、基本原理、执行流程与使用场景
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java JDK6学习笔记
林信良 / 清华大学出版社 / 2007-4 / 59.90元
《Java JDK6学习笔记》是作者良葛格本人近几年来学习Java的心得笔记,结构按照作者的学习脉络依次展开,从什么是Java、如何配置Java开发环境、基本的Java语法到程序流程控制、管理类文件、异常处理、枚举类型、泛型、J2SE中标准的API等均进行了详细介绍。本书还安排了一个“文字编辑器”的专题制作。此外,Java SE6的新功能,对Java lang等套件的功能加强,以及JDBC4.0、......一起来看看 《Java JDK6学习笔记》 这本书的介绍吧!