内容简介:在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议
最近真是的好一个劲的折腾,算是完全搞明白了如何在STM32上实现MQTT协议了。
目录
- 一、本教程中说明的内容
- 三、在ARM mbed中使用MQTT
一、本教程中说明的内容
先说说本文化的适用范围吧:
一、使用的芯片是STM32F103C8T6,但是并没有任何与平台相关的代码,应该在所有STM32芯片中都是可以用的。
二、本文使用的是SIM800C模块,驱动是用C++实现的,基于ARM mbed平台写的。但是从原理上来讲,C和C++差别不大,本文的代码经过修改也可以直接用于其他平台的使用。
三、本文数据传输使用的是“透传模式”,对于所有的透传模块,本文都有很大的参考意义。
二、MQTT的使用
首先,推荐一个MQTT的库:Paho,这个库支持非常多的平台,当然也包括了嵌入式平台: GitHub – paho.mqtt.embedded-c 。将该库中的MQTTPacket文件夹下载下来,MQTTPacket文件夹下面主要有三个文件夹,我们使用的文件主要集中在src文件夹和samples文件夹中。
src文件夹中存放着MQTT核心功能的代码,而samples中存放着三个例子:pub00sub1、pub0sub1_nc、qos0pub和网络驱动(transport.c和transport.h)。
由于三个驱动都有一个main函数,所以无法同时存在,本文中只使用了pub0sub,所以将此文件夹内容精减到只有pub0sub1.c、transport.c、transport.h三个文件。
当然,在实现使用时可能会改变目录结构,使目录结构更加清楚,可以根据自己的喜好来进行更改,并不影响使用。
将transport.h的内容精减到以下内容:
int transport_sendPacketBuffer(unsigned char* buf, int buflen); int transport_getdata(unsigned char* buf, int count); int transport_open(char* host, int port); int transport_close();
主要的工作有:
1、为了方便表示,删除了版权信息,有实际使用时请保留。
2、没有使用pub0sub1_nc这个例子,所以将transport_getdatanb方法去除。
3、透传模块中使用不到socket,所以将与socket相关的参数去掉。
这些方法实现的主要功能是:
1、transport_open的作用是初始化模块连网的信息、transport_close作用是关闭链接。
2、transport_sendPacketBuffer用于发送数据、transport_getdata用于接收数据。
然后用transport.c来实现transport.h中声明的4个函数。
三、在ARM mbed中使用MQTT
首先说句题外话,自我感觉mbed是一个非常不错的平台,很大程度上提高了代码的可重用性。但也有一个问题,就是其支持是以开发板为单位的,所以并不是对每一种芯片的支持都很好。
首先介绍一个例子, HelloMQTT – a mercurial repository | mbed 。但这个例子其中有很多不完善的地方,而且该例子使用的网络驱动也不是GPRS模块。
如果要用不同的连网方式,那么就写一个驱动,驱动中至少要包含以下两个方法:
int read(unsigned char* buffer, int len, int timeout); int write(unsigned char* buffer, int len, int timeout);
这两个方法会在MQTTClient中自动调用,timeout表示毫秒。返回值为读或写的字节数。
对此,我写了驱动程序:
MQTTGRPSEthernet.h
#pragma once
#if !defined(MQTTGPRSETHERNET_H)
#define MQTTGPRSETHERNET_H
#define DEFAULT_GPRS_TIMEOUT 6000000 //6s
#define SERIAL_BUFFER_SIZE 256
#include "mbed.h"
class MQTTGPRSEthernet
{
public:
MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate = 115200);
~MQTTGPRSEthernet();
bool initNet(const char* apn, const char* userName = "", const char* passWord = "", int timeout = DEFAULT_GPRS_TIMEOUT, bool isReconnect = false);
bool connect(char* hostname, int port, int timeout = DEFAULT_GPRS_TIMEOUT);
int read_line(char* buffer, int timeout = DEFAULT_GPRS_TIMEOUT);
int read(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
int write(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
bool disconnect();
private:
bool initNet();
Serial eth;
bool command(const char* cmd, const char* ack = "");
bool connected = false;
bool initialized = false;
char* localIP;
const char *_apn;
const char *_passWord;
const char *_userName;
};
#endif
MQTTGRPSEthernet.cpp
#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
#ifdef DEBUG
Serial pc(PB_10, PB_11, 115200);
#define LOG(args...) pc.printf(args)
#define LOGC(args...) pc.putc(args)
#else
#define LOG(args...) (args)
#define LOGC(args...) (args)
#endif // DEBUG
//还需要增加一个表示已经初始化成功的变量
//后续可能要增加表示6条连接的东东,如果host,port,连接状态等
MQTTGPRSEthernet::MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate)
: eth(tx, rx, baudrate){}
bool MQTTGPRSEthernet::initNet(const char* apn, const char* userName, const char* passWord, int timeout, bool isReconnect)
{
_apn = apn;
_userName = userName;
_passWord = passWord;
command("ATE0");
wait_ms(800);
command("AT+CIPMUX=0");
wait_ms(800);
//检查 GPRS 附着状态
while (!command("AT+CGATT?\r\n", "+CGATT: 1"))
{
LOG("GPRS NOT ATTACHED!\r\n");
wait_ms(1200);
eth.printf("+++");
wait_ms(1200);
command("AT");
command("AT+CIPSHUT\r\n");
}
//Select multiple connection
//单链路模式
//command("AT+CIPMUX=0");
//wait_ms(800);
//透传模式
command("AT+CIPMODE=1");
wait_ms(800);
command("AT+CIPCCFG=5,2,1024,1,0,1460,50");
wait_ms(800);
// Set APN
command(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"", apn, userName, passWord));
wait_ms(800);
LOG(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"\r\n", apn, userName, passWord));
uint32_t start = us_ticker_read();
do
{
// Brings up wireless connection
//建立无线链路(GPRS 或者 CSD)
bool flag = command("AT+CIICR", "OK");
// Get local IP address
eth.printf("AT+CIFSR\r\n");
char ip_addr_buf[32];
if (read_line(ip_addr_buf) <= 0) {
//LOG("failed to join network\r\n");
initialized = false;
}
else if (StringHelper::CheckIP(ip_addr_buf))
{
LOG("IP ADDRESS:");
LOG(ip_addr_buf);
LOG("\r\n");
localIP = ip_addr_buf;
initialized = true;
break;
}
else
{
initialized = false;
}
if (flag == false && initialized == false && isReconnect == false)
{
//表明此时建立无线链路返回error,得不到IP地址
//此时应使用AT+CIPSHUT关闭PDP上下文后再重新进行连接
//防止在数据状态
wait_ms(1200);
eth.printf("+++");
wait_ms(1200);
command("AT");
command("AT+CIPSHUT\r\n");
initialized = false;
return initNet(_apn, _userName, _passWord, timeout, true);
}
} while (us_ticker_read() - start < timeout);
//command("AT+CRPRXGET=0");
return initialized;
}
bool MQTTGPRSEthernet::initNet()
{
return initNet(_apn, _userName, _passWord);
}
bool MQTTGPRSEthernet::connect(char* hostname, int port, int timeout)
{
uint32_t start = us_ticker_read();
do
{
if (initialized == false)
initNet();
else
break;
} while (us_ticker_read() - start < timeout);
start = us_ticker_read();
do
{
//AT+CIPSTART=0,”TCP”,”116.228.221.51”,”8500”
char strPort[10];
itoa(port, strPort, 10);
char response[64] = { 0, };
int connectStart = us_ticker_read();
LOG(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
eth.printf(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
do
{
read_line(response);
if (strstr(response, "CONNECT") != NULL)
{
connected = true;
return connected;
}
} while (us_ticker_read() - connectStart < timeout);
} while (us_ticker_read() - start < timeout);
connected = false;
return connected;
}
//向SIM800C发送命令
//cmd:发送的命令字符串(不需要添加回车了),当cmd<0XFF的时候,发送数字(比如发送0X1A),大于的时候发送字符串.
//ack:期待的应答结果,如果为空,则表示不需要等待应答,对于那些并不关心返回结果的直接返回true
//此函数只能对应立即进行读取的情况。
//返回值:0,发送成功(得到了期待的应答结果)
// 1,发送失败
bool MQTTGPRSEthernet::command(const char* cmd, const char* ack)
{
char response[64] = { 0, };
if (StringHelper::EndWith(cmd, "\r\n"))
{
LOG(cmd);
eth.printf(cmd);
}
else if (StringHelper::EndWith(cmd, "\r"))
{
LOG(StringHelper::Add(cmd, "\n"));
eth.printf(StringHelper::Add(cmd, "\n"));
}
else
{
LOG(StringHelper::Add(cmd, "\r\n"));
eth.printf(StringHelper::Add(cmd, "\r\n"));
}
read_line(response);
if (strstr(response, ack) != NULL) {
return true;
}
return false;
}
int MQTTGPRSEthernet::read_line(char* buffer, int timeout)
{
int bytes = 0;
uint32_t start = us_ticker_read();
while (true) {
if (eth.readable()) {
char ch = eth.getc();
if ((ch == '\n' || ch == '\r') && bytes == 0)
{
//此时说明以\r\n开头,在无回显的情况下经常会出现
//此时忽略空行\r\n
continue;
}
if (ch == '\n') {
if (bytes > 0 && buffer[bytes - 1] == '\r') {
//表明读取到\r\n,此行结束
bytes--;
}
if (bytes > 0) {
buffer[bytes] = '\0';
return bytes;
}
}
else {
buffer[bytes] = ch;
bytes++;
}
}
else {
if ((uint32_t)(us_ticker_read() - start) > timeout) {
return bytes;
}
}
}
//此时表示读到最后一个字节还没有读到\n
//有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
return bytes;
}
int MQTTGPRSEthernet::read(unsigned char* buffer, int len, int timeout)
{
int bytes = 0;
uint32_t start = us_ticker_read();
while (bytes < len) {
if (eth.readable()) {
char ch = eth.getc();
buffer[bytes] = ch;
bytes++;
}
else {
if ((uint32_t)(us_ticker_read() - start) > timeout) {
return bytes;
}
}
}
//此时表示读到最后一个字节还没有读到\n
//有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
return bytes;
}
int MQTTGPRSEthernet::write(unsigned char* buffer, int len, int timeout)
{
uint32_t start = us_ticker_read();
while (!connected)
{
if ((us_ticker_read() - start) > timeout)
{
return -1;
}
initNet();
}
for (size_t i = 0; i < len; i++)
{
LOGC(buffer[i]);
eth.putc(buffer[i]);
}
return len;
}
bool MQTTGPRSEthernet::disconnect()
{
wait_ms(1200);
eth.printf("+++");
wait_ms(1200);
eth.printf("AT\r\n");
eth.printf("AT+CIPSHUT\r\n");
connected = false;
return true;
}
MQTTGPRSEthernet::~MQTTGPRSEthernet()
{
if (connected)
{
disconnect();
}
}
主程序
#include "mbed.h"
#define APN "uninet"
#define USERNAME NULL
#define PASSWORD NULL
#define INIT_TIMES 5 //初始化次数
#ifdef DEBUG
Serial pc2(PB_10, PB_11, 115200);
#define LOG(args...) pc2.printf(args)
#else
#define LOG(args...) (args)
#endif // DEBUG
#include "MQTTClient.h"
#include "MQTTmbed.h"
#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
int arrivedcount = 0;
void messageArrived(MQTT::MessageData& md)
{
MQTT::Message &message = md.message;
LOG("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
LOG("Payload %.*s\n", message.payloadlen, (char*)message.payload);
++arrivedcount;
}
int main(int argc, char* argv[])
{
MQTTGPRSEthernet ethernet(PB_6, PB_7);
for (int i = 0; i < INIT_TIMES; i++)
{
if (ethernet.initNet(APN, USERNAME, PASSWORD))
break;
else
LOG("SIM CARD INIT FAILED!\r\n");
}
MQTT::Client<MQTTGPRSEthernet, Countdown> client = MQTT::Client<MQTTGPRSEthernet, Countdown>(ethernet);
char* hostname = "iot.eclipse.org";
int port = 1883;
for (int i = 0; i < INIT_TIMES; i++)
{
if (ethernet.connect(hostname, port))
break;
else
LOG("CAN NOT CONNECT TO SERVER!\r\n");
}
char* topic = "tson-topic-18669888635";
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
data.MQTTVersion = 3;
data.clientID.cstring = "tson-client-18669888635";
int rc;
for (int i = 0; i < INIT_TIMES; i++)
{
if ((rc = client.connect(data)) != 0)
//无法连接到MQTT服务器
LOG("CAN NOT CONNECT TO MQTT SERVER!\r\n");
else
break;
}
if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
//无法订阅
LOG("CAN NOT SUBSCRIBE THE TOPIC!\r\n");
MQTT::Message message;
// QoS 0
char buf[100];
sprintf(buf, "Hello World! QoS 0 message from tson\r\n");
message.qos = MQTT::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*)buf;
message.payloadlen = strlen(buf) + 1;
rc = client.publish(topic, message);
while (arrivedcount == 0)
client.yield(100000);
// QoS 1
sprintf(buf, "Hello World! QoS 1 message from tson\n");
message.qos = MQTT::QOS1;
message.payloadlen = strlen(buf) + 1;
rc = client.publish(topic, message);
while (arrivedcount == 1)
client.yield(100000);
// QoS 2
sprintf(buf, "Hello World! QoS 2 message from tson\n");
message.qos = MQTT::QOS2;
message.payloadlen = strlen(buf) + 1;
rc = client.publish(topic, message);
while (arrivedcount == 2)
client.yield(100000);
if ((rc = client.unsubscribe(topic)) != 0)
printf("rc from unsubscribe was %d\n", rc);
if ((rc = client.disconnect()) != 0)
printf("rc from disconnect was %d\n", rc);
ethernet.disconnect();
return 0;
}
client的yield函数中用调用messageArrived函数,之前的示例给出的是100,可能是由于间太短的缘故,总是调用不了回调函数,所以我将其改的非常大,便于调试。实际使用时可以使用1000。
四、总结
其实paho embeded-c用起来还是挺方便的,但是代码的重要性不高,所以移植起来往往会让人无从下手。但深放研究就会发现其实使用起来是非常简单的。
—————————–可爱的分割线——————————————————
有路过的吃瓜群众说想要StringHelper这个类,当时这个类用处不大,里面多数功能都可以用std::string中的功能来实现。不过自己刚从C转过来自己并不太了解,所以自己又写了一个,功能并不是很完善。也分享给大家:StringHelper
原创文章,转载请注明:转载自 TsonTec:测量解决方案提供者
本文链接地址: 在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- “安全大脑”透视“芯片门” 如何应对未来芯片黑产
- 倪光南:未来国产开源芯片将成为主流芯片
- 6小时完成芯片布局,谷歌用强化学习助力芯片设计
- 人工智能是芯片,如何将芯片行业去除商品化
- 【PPT下载】5大维度对比主流芯片架构,类脑芯片未来可期
- 阿里CTO张建锋:成立自主芯片公司,全球最领先神经网络芯片明年面世
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
SHA 加密
SHA 加密工具
XML、JSON 在线转换
在线XML、JSON转换工具