内容简介:在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议
最近真是的好一个劲的折腾,算是完全搞明白了如何在STM32上实现MQTT协议了。
目录
- 一、本教程中说明的内容
- 三、在ARM mbed中使用MQTT
一、本教程中说明的内容
先说说本文化的适用范围吧:
一、使用的芯片是STM32F103C8T6,但是并没有任何与平台相关的代码,应该在所有STM32芯片中都是可以用的。
二、本文使用的是SIM800C模块,驱动是用C++实现的,基于ARM mbed平台写的。但是从原理上来讲,C和C++差别不大,本文的代码经过修改也可以直接用于其他平台的使用。
三、本文数据传输使用的是“透传模式”,对于所有的透传模块,本文都有很大的参考意义。
二、MQTT的使用
首先,推荐一个MQTT的库:Paho,这个库支持非常多的平台,当然也包括了嵌入式平台: GitHub – paho.mqtt.embedded-c 。将该库中的MQTTPacket文件夹下载下来,MQTTPacket文件夹下面主要有三个文件夹,我们使用的文件主要集中在src文件夹和samples文件夹中。
src文件夹中存放着MQTT核心功能的代码,而samples中存放着三个例子:pub00sub1、pub0sub1_nc、qos0pub和网络驱动(transport.c和transport.h)。
由于三个驱动都有一个main函数,所以无法同时存在,本文中只使用了pub0sub,所以将此文件夹内容精减到只有pub0sub1.c、transport.c、transport.h三个文件。
当然,在实现使用时可能会改变目录结构,使目录结构更加清楚,可以根据自己的喜好来进行更改,并不影响使用。
将transport.h的内容精减到以下内容:
int transport_sendPacketBuffer(unsigned char* buf, int buflen); int transport_getdata(unsigned char* buf, int count); int transport_open(char* host, int port); int transport_close();
主要的工作有:
1、为了方便表示,删除了版权信息,有实际使用时请保留。
2、没有使用pub0sub1_nc这个例子,所以将transport_getdatanb方法去除。
3、透传模块中使用不到socket,所以将与socket相关的参数去掉。
这些方法实现的主要功能是:
1、transport_open的作用是初始化模块连网的信息、transport_close作用是关闭链接。
2、transport_sendPacketBuffer用于发送数据、transport_getdata用于接收数据。
然后用transport.c来实现transport.h中声明的4个函数。
三、在ARM mbed中使用MQTT
首先说句题外话,自我感觉mbed是一个非常不错的平台,很大程度上提高了代码的可重用性。但也有一个问题,就是其支持是以开发板为单位的,所以并不是对每一种芯片的支持都很好。
首先介绍一个例子, HelloMQTT – a mercurial repository | mbed 。但这个例子其中有很多不完善的地方,而且该例子使用的网络驱动也不是GPRS模块。
如果要用不同的连网方式,那么就写一个驱动,驱动中至少要包含以下两个方法:
int read(unsigned char* buffer, int len, int timeout); int write(unsigned char* buffer, int len, int timeout);
这两个方法会在MQTTClient中自动调用,timeout表示毫秒。返回值为读或写的字节数。
对此,我写了驱动程序:
MQTTGRPSEthernet.h
#pragma once #if !defined(MQTTGPRSETHERNET_H) #define MQTTGPRSETHERNET_H #define DEFAULT_GPRS_TIMEOUT 6000000 //6s #define SERIAL_BUFFER_SIZE 256 #include "mbed.h" class MQTTGPRSEthernet { public: MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate = 115200); ~MQTTGPRSEthernet(); bool initNet(const char* apn, const char* userName = "", const char* passWord = "", int timeout = DEFAULT_GPRS_TIMEOUT, bool isReconnect = false); bool connect(char* hostname, int port, int timeout = DEFAULT_GPRS_TIMEOUT); int read_line(char* buffer, int timeout = DEFAULT_GPRS_TIMEOUT); int read(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT); int write(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT); bool disconnect(); private: bool initNet(); Serial eth; bool command(const char* cmd, const char* ack = ""); bool connected = false; bool initialized = false; char* localIP; const char *_apn; const char *_passWord; const char *_userName; }; #endif
MQTTGRPSEthernet.cpp
#include "MQTTGPRSEthernet.h" #include "StringHelper.h" #ifdef DEBUG Serial pc(PB_10, PB_11, 115200); #define LOG(args...) pc.printf(args) #define LOGC(args...) pc.putc(args) #else #define LOG(args...) (args) #define LOGC(args...) (args) #endif // DEBUG //还需要增加一个表示已经初始化成功的变量 //后续可能要增加表示6条连接的东东,如果host,port,连接状态等 MQTTGPRSEthernet::MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate) : eth(tx, rx, baudrate){} bool MQTTGPRSEthernet::initNet(const char* apn, const char* userName, const char* passWord, int timeout, bool isReconnect) { _apn = apn; _userName = userName; _passWord = passWord; command("ATE0"); wait_ms(800); command("AT+CIPMUX=0"); wait_ms(800); //检查 GPRS 附着状态 while (!command("AT+CGATT?\r\n", "+CGATT: 1")) { LOG("GPRS NOT ATTACHED!\r\n"); wait_ms(1200); eth.printf("+++"); wait_ms(1200); command("AT"); command("AT+CIPSHUT\r\n"); } //Select multiple connection //单链路模式 //command("AT+CIPMUX=0"); //wait_ms(800); //透传模式 command("AT+CIPMODE=1"); wait_ms(800); command("AT+CIPCCFG=5,2,1024,1,0,1460,50"); wait_ms(800); // Set APN command(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"", apn, userName, passWord)); wait_ms(800); LOG(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"\r\n", apn, userName, passWord)); uint32_t start = us_ticker_read(); do { // Brings up wireless connection //建立无线链路(GPRS 或者 CSD) bool flag = command("AT+CIICR", "OK"); // Get local IP address eth.printf("AT+CIFSR\r\n"); char ip_addr_buf[32]; if (read_line(ip_addr_buf) <= 0) { //LOG("failed to join network\r\n"); initialized = false; } else if (StringHelper::CheckIP(ip_addr_buf)) { LOG("IP ADDRESS:"); LOG(ip_addr_buf); LOG("\r\n"); localIP = ip_addr_buf; initialized = true; break; } else { initialized = false; } if (flag == false && initialized == false && isReconnect == false) { //表明此时建立无线链路返回error,得不到IP地址 //此时应使用AT+CIPSHUT关闭PDP上下文后再重新进行连接 //防止在数据状态 wait_ms(1200); eth.printf("+++"); wait_ms(1200); command("AT"); command("AT+CIPSHUT\r\n"); initialized = false; return initNet(_apn, _userName, _passWord, timeout, true); } } while (us_ticker_read() - start < timeout); //command("AT+CRPRXGET=0"); return initialized; } bool MQTTGPRSEthernet::initNet() { return initNet(_apn, _userName, _passWord); } bool MQTTGPRSEthernet::connect(char* hostname, int port, int timeout) { uint32_t start = us_ticker_read(); do { if (initialized == false) initNet(); else break; } while (us_ticker_read() - start < timeout); start = us_ticker_read(); do { //AT+CIPSTART=0,”TCP”,”116.228.221.51”,”8500” char strPort[10]; itoa(port, strPort, 10); char response[64] = { 0, }; int connectStart = us_ticker_read(); LOG(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort)); eth.printf(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort)); do { read_line(response); if (strstr(response, "CONNECT") != NULL) { connected = true; return connected; } } while (us_ticker_read() - connectStart < timeout); } while (us_ticker_read() - start < timeout); connected = false; return connected; } //向SIM800C发送命令 //cmd:发送的命令字符串(不需要添加回车了),当cmd<0XFF的时候,发送数字(比如发送0X1A),大于的时候发送字符串. //ack:期待的应答结果,如果为空,则表示不需要等待应答,对于那些并不关心返回结果的直接返回true //此函数只能对应立即进行读取的情况。 //返回值:0,发送成功(得到了期待的应答结果) // 1,发送失败 bool MQTTGPRSEthernet::command(const char* cmd, const char* ack) { char response[64] = { 0, }; if (StringHelper::EndWith(cmd, "\r\n")) { LOG(cmd); eth.printf(cmd); } else if (StringHelper::EndWith(cmd, "\r")) { LOG(StringHelper::Add(cmd, "\n")); eth.printf(StringHelper::Add(cmd, "\n")); } else { LOG(StringHelper::Add(cmd, "\r\n")); eth.printf(StringHelper::Add(cmd, "\r\n")); } read_line(response); if (strstr(response, ack) != NULL) { return true; } return false; } int MQTTGPRSEthernet::read_line(char* buffer, int timeout) { int bytes = 0; uint32_t start = us_ticker_read(); while (true) { if (eth.readable()) { char ch = eth.getc(); if ((ch == '\n' || ch == '\r') && bytes == 0) { //此时说明以\r\n开头,在无回显的情况下经常会出现 //此时忽略空行\r\n continue; } if (ch == '\n') { if (bytes > 0 && buffer[bytes - 1] == '\r') { //表明读取到\r\n,此行结束 bytes--; } if (bytes > 0) { buffer[bytes] = '\0'; return bytes; } } else { buffer[bytes] = ch; bytes++; } } else { if ((uint32_t)(us_ticker_read() - start) > timeout) { return bytes; } } } //此时表示读到最后一个字节还没有读到\n //有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0 return bytes; } int MQTTGPRSEthernet::read(unsigned char* buffer, int len, int timeout) { int bytes = 0; uint32_t start = us_ticker_read(); while (bytes < len) { if (eth.readable()) { char ch = eth.getc(); buffer[bytes] = ch; bytes++; } else { if ((uint32_t)(us_ticker_read() - start) > timeout) { return bytes; } } } //此时表示读到最后一个字节还没有读到\n //有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0 return bytes; } int MQTTGPRSEthernet::write(unsigned char* buffer, int len, int timeout) { uint32_t start = us_ticker_read(); while (!connected) { if ((us_ticker_read() - start) > timeout) { return -1; } initNet(); } for (size_t i = 0; i < len; i++) { LOGC(buffer[i]); eth.putc(buffer[i]); } return len; } bool MQTTGPRSEthernet::disconnect() { wait_ms(1200); eth.printf("+++"); wait_ms(1200); eth.printf("AT\r\n"); eth.printf("AT+CIPSHUT\r\n"); connected = false; return true; } MQTTGPRSEthernet::~MQTTGPRSEthernet() { if (connected) { disconnect(); } }
主程序
#include "mbed.h" #define APN "uninet" #define USERNAME NULL #define PASSWORD NULL #define INIT_TIMES 5 //初始化次数 #ifdef DEBUG Serial pc2(PB_10, PB_11, 115200); #define LOG(args...) pc2.printf(args) #else #define LOG(args...) (args) #endif // DEBUG #include "MQTTClient.h" #include "MQTTmbed.h" #include "MQTTGPRSEthernet.h" #include "StringHelper.h" int arrivedcount = 0; void messageArrived(MQTT::MessageData& md) { MQTT::Message &message = md.message; LOG("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id); LOG("Payload %.*s\n", message.payloadlen, (char*)message.payload); ++arrivedcount; } int main(int argc, char* argv[]) { MQTTGPRSEthernet ethernet(PB_6, PB_7); for (int i = 0; i < INIT_TIMES; i++) { if (ethernet.initNet(APN, USERNAME, PASSWORD)) break; else LOG("SIM CARD INIT FAILED!\r\n"); } MQTT::Client<MQTTGPRSEthernet, Countdown> client = MQTT::Client<MQTTGPRSEthernet, Countdown>(ethernet); char* hostname = "iot.eclipse.org"; int port = 1883; for (int i = 0; i < INIT_TIMES; i++) { if (ethernet.connect(hostname, port)) break; else LOG("CAN NOT CONNECT TO SERVER!\r\n"); } char* topic = "tson-topic-18669888635"; MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.MQTTVersion = 3; data.clientID.cstring = "tson-client-18669888635"; int rc; for (int i = 0; i < INIT_TIMES; i++) { if ((rc = client.connect(data)) != 0) //无法连接到MQTT服务器 LOG("CAN NOT CONNECT TO MQTT SERVER!\r\n"); else break; } if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0) //无法订阅 LOG("CAN NOT SUBSCRIBE THE TOPIC!\r\n"); MQTT::Message message; // QoS 0 char buf[100]; sprintf(buf, "Hello World! QoS 0 message from tson\r\n"); message.qos = MQTT::QOS0; message.retained = false; message.dup = false; message.payload = (void*)buf; message.payloadlen = strlen(buf) + 1; rc = client.publish(topic, message); while (arrivedcount == 0) client.yield(100000); // QoS 1 sprintf(buf, "Hello World! QoS 1 message from tson\n"); message.qos = MQTT::QOS1; message.payloadlen = strlen(buf) + 1; rc = client.publish(topic, message); while (arrivedcount == 1) client.yield(100000); // QoS 2 sprintf(buf, "Hello World! QoS 2 message from tson\n"); message.qos = MQTT::QOS2; message.payloadlen = strlen(buf) + 1; rc = client.publish(topic, message); while (arrivedcount == 2) client.yield(100000); if ((rc = client.unsubscribe(topic)) != 0) printf("rc from unsubscribe was %d\n", rc); if ((rc = client.disconnect()) != 0) printf("rc from disconnect was %d\n", rc); ethernet.disconnect(); return 0; }
client的yield函数中用调用messageArrived函数,之前的示例给出的是100,可能是由于间太短的缘故,总是调用不了回调函数,所以我将其改的非常大,便于调试。实际使用时可以使用1000。
四、总结
其实paho embeded-c用起来还是挺方便的,但是代码的重要性不高,所以移植起来往往会让人无从下手。但深放研究就会发现其实使用起来是非常简单的。
—————————–可爱的分割线——————————————————
有路过的吃瓜群众说想要StringHelper这个类,当时这个类用处不大,里面多数功能都可以用std::string中的功能来实现。不过自己刚从C转过来自己并不太了解,所以自己又写了一个,功能并不是很完善。也分享给大家:StringHelper
原创文章,转载请注明:转载自 TsonTec:测量解决方案提供者
本文链接地址: 在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- “安全大脑”透视“芯片门” 如何应对未来芯片黑产
- 倪光南:未来国产开源芯片将成为主流芯片
- 6小时完成芯片布局,谷歌用强化学习助力芯片设计
- 人工智能是芯片,如何将芯片行业去除商品化
- 【PPT下载】5大维度对比主流芯片架构,类脑芯片未来可期
- 阿里CTO张建锋:成立自主芯片公司,全球最领先神经网络芯片明年面世
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Art of UNIX Programming
Eric S. Raymond / Addison-Wesley / 2003-10-3 / USD 54.99
Writing better software: 30 years of UNIX development wisdom In this book, five years in the making, the author encapsulates three decades of unwritten, hard-won software engineering wisdom. Raymond b......一起来看看 《The Art of UNIX Programming》 这本书的介绍吧!