在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

栏目: 后端 · 发布时间: 8年前

内容简介:在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

最近真是的好一个劲的折腾,算是完全搞明白了如何在STM32上实现MQTT协议了。

目录

  • 一、本教程中说明的内容
  • 三、在ARM mbed中使用MQTT

一、本教程中说明的内容

先说说本文化的适用范围吧:

一、使用的芯片是STM32F103C8T6,但是并没有任何与平台相关的代码,应该在所有STM32芯片中都是可以用的。

二、本文使用的是SIM800C模块,驱动是用C++实现的,基于ARM mbed平台写的。但是从原理上来讲,C和C++差别不大,本文的代码经过修改也可以直接用于其他平台的使用。

三、本文数据传输使用的是“透传模式”,对于所有的透传模块,本文都有很大的参考意义。

二、MQTT的使用

首先,推荐一个MQTT的库:Paho,这个库支持非常多的平台,当然也包括了嵌入式平台: GitHub – paho.mqtt.embedded-c 。将该库中的MQTTPacket文件夹下载下来,MQTTPacket文件夹下面主要有三个文件夹,我们使用的文件主要集中在src文件夹和samples文件夹中。

在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

src文件夹中存放着MQTT核心功能的代码,而samples中存放着三个例子:pub00sub1、pub0sub1_nc、qos0pub和网络驱动(transport.c和transport.h)。

由于三个驱动都有一个main函数,所以无法同时存在,本文中只使用了pub0sub,所以将此文件夹内容精减到只有pub0sub1.c、transport.c、transport.h三个文件。

在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议

当然,在实现使用时可能会改变目录结构,使目录结构更加清楚,可以根据自己的喜好来进行更改,并不影响使用。

将transport.h的内容精减到以下内容:

int transport_sendPacketBuffer(unsigned char* buf, int buflen);
int transport_getdata(unsigned char* buf, int count);
int transport_open(char* host, int port);
int transport_close();

主要的工作有:

1、为了方便表示,删除了版权信息,有实际使用时请保留。

2、没有使用pub0sub1_nc这个例子,所以将transport_getdatanb方法去除。

3、透传模块中使用不到socket,所以将与socket相关的参数去掉。

这些方法实现的主要功能是:

1、transport_open的作用是初始化模块连网的信息、transport_close作用是关闭链接。

2、transport_sendPacketBuffer用于发送数据、transport_getdata用于接收数据。

然后用transport.c来实现transport.h中声明的4个函数。

三、在ARM mbed中使用MQTT

首先说句题外话,自我感觉mbed是一个非常不错的平台,很大程度上提高了代码的可重用性。但也有一个问题,就是其支持是以开发板为单位的,所以并不是对每一种芯片的支持都很好。

首先介绍一个例子, HelloMQTT – a mercurial repository | mbed 。但这个例子其中有很多不完善的地方,而且该例子使用的网络驱动也不是GPRS模块。

如果要用不同的连网方式,那么就写一个驱动,驱动中至少要包含以下两个方法:

int read(unsigned char* buffer, int len, int timeout);
int write(unsigned char* buffer, int len, int timeout);

这两个方法会在MQTTClient中自动调用,timeout表示毫秒。返回值为读或写的字节数。

对此,我写了驱动程序:

MQTTGRPSEthernet.h

#pragma once
#if !defined(MQTTGPRSETHERNET_H)
#define MQTTGPRSETHERNET_H
 
#define DEFAULT_GPRS_TIMEOUT 6000000  //6s
#define SERIAL_BUFFER_SIZE 256
 
#include "mbed.h"
 
class MQTTGPRSEthernet
{
public:
 MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate = 115200);
 ~MQTTGPRSEthernet();
 bool initNet(const char* apn, const char* userName = "", const char* passWord = "", int timeout = DEFAULT_GPRS_TIMEOUT, bool isReconnect = false);
 bool connect(char* hostname, int port, int timeout = DEFAULT_GPRS_TIMEOUT);
 int read_line(char* buffer, int timeout = DEFAULT_GPRS_TIMEOUT);
 int read(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
 int write(unsigned char* buffer, int len, int timeout = DEFAULT_GPRS_TIMEOUT);
 bool disconnect();
private:
 bool initNet();
 Serial eth;
 bool command(const char* cmd, const char* ack = "");
 bool connected = false;
 bool initialized = false;
 char* localIP;
 const char *_apn;
 const char *_passWord;
 const char *_userName;
};
#endif

MQTTGRPSEthernet.cpp

#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
 
#ifdef DEBUG
Serial pc(PB_10, PB_11, 115200);
#define LOG(args...)    pc.printf(args)
#define LOGC(args...)   pc.putc(args)
#else
#define LOG(args...)    (args)
#define LOGC(args...)   (args)
#endif // DEBUG
 
//还需要增加一个表示已经初始化成功的变量
//后续可能要增加表示6条连接的东东,如果host,port,连接状态等
MQTTGPRSEthernet::MQTTGPRSEthernet(PinName tx, PinName rx, int baudrate)
 : eth(tx, rx, baudrate){}
bool MQTTGPRSEthernet::initNet(const char* apn, const char* userName, const char* passWord, int timeout, bool isReconnect)
{
 _apn = apn;
 _userName = userName;
 _passWord = passWord;
 command("ATE0");
 wait_ms(800);
 command("AT+CIPMUX=0");
 wait_ms(800);
 //检查 GPRS 附着状态
 while (!command("AT+CGATT?\r\n", "+CGATT: 1"))
 {
 LOG("GPRS NOT ATTACHED!\r\n");
 wait_ms(1200);
 eth.printf("+++");
 wait_ms(1200);
 command("AT");
 command("AT+CIPSHUT\r\n");
 }
 //Select multiple connection
 //单链路模式
 //command("AT+CIPMUX=0");
 //wait_ms(800);
 //透传模式
 command("AT+CIPMODE=1");
 wait_ms(800); 
 command("AT+CIPCCFG=5,2,1024,1,0,1460,50");
 wait_ms(800);
 // Set APN 
 command(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"", apn, userName, passWord));
 wait_ms(800);
 LOG(StringHelper::Format("AT+CSTT=\"%s\",\"%s\",\"%s\"\r\n", apn, userName, passWord));
 uint32_t start = us_ticker_read();
 do
 {
 // Brings up wireless connection
 //建立无线链路(GPRS 或者 CSD)
 bool flag = command("AT+CIICR", "OK");
 // Get local IP address
 eth.printf("AT+CIFSR\r\n");
 char ip_addr_buf[32];
 
 if (read_line(ip_addr_buf) <= 0) {
 //LOG("failed to join network\r\n");
 initialized = false;
 }
 else if (StringHelper::CheckIP(ip_addr_buf))
 {
 LOG("IP ADDRESS:");
 LOG(ip_addr_buf);
 LOG("\r\n");
 localIP = ip_addr_buf;
 initialized = true;
 break; 
 }
 else
 {
 initialized = false;
 }
 if (flag == false && initialized == false && isReconnect == false)
 {
 //表明此时建立无线链路返回error,得不到IP地址
 //此时应使用AT+CIPSHUT关闭PDP上下文后再重新进行连接
 //防止在数据状态
 wait_ms(1200);
 eth.printf("+++");
 wait_ms(1200);
 command("AT");
 command("AT+CIPSHUT\r\n");
 initialized = false;
 return initNet(_apn, _userName, _passWord, timeout, true);
 }
 
 } while (us_ticker_read() - start < timeout);
 //command("AT+CRPRXGET=0");
 return initialized;
}
bool MQTTGPRSEthernet::initNet()
{
 return initNet(_apn, _userName, _passWord);
}
bool MQTTGPRSEthernet::connect(char* hostname, int port, int timeout)
{
 uint32_t start = us_ticker_read();
 do
 {
 if (initialized == false)
 initNet();
 else
 break;
 } while (us_ticker_read() - start < timeout);
 start = us_ticker_read();
 do
 {
 //AT+CIPSTART=0,”TCP”,”116.228.221.51”,”8500”
 char strPort[10];
 itoa(port, strPort, 10);
 char response[64] = { 0, };
 int connectStart = us_ticker_read();
 LOG(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
 eth.printf(StringHelper::Format("AT+CIPSTART=TCP,%s,%s\r\n", hostname, strPort));
 
 do
 {
 read_line(response);
 if (strstr(response, "CONNECT") != NULL)
 {
 connected = true;
 return connected;
 }
 } while (us_ticker_read() - connectStart < timeout);
 } while (us_ticker_read() - start < timeout);
 connected = false;
 return connected;
}
//向SIM800C发送命令
//cmd:发送的命令字符串(不需要添加回车了),当cmd<0XFF的时候,发送数字(比如发送0X1A),大于的时候发送字符串.
//ack:期待的应答结果,如果为空,则表示不需要等待应答,对于那些并不关心返回结果的直接返回true
//此函数只能对应立即进行读取的情况。
//返回值:0,发送成功(得到了期待的应答结果)
//       1,发送失败
bool MQTTGPRSEthernet::command(const char* cmd, const char* ack)
{
 char response[64] = { 0, };
 if (StringHelper::EndWith(cmd, "\r\n"))
 {
 LOG(cmd);
 eth.printf(cmd);
 }
 else if (StringHelper::EndWith(cmd, "\r"))
 {
 LOG(StringHelper::Add(cmd, "\n"));
 eth.printf(StringHelper::Add(cmd, "\n")); 
 }
 else
 {
 LOG(StringHelper::Add(cmd, "\r\n"));
 eth.printf(StringHelper::Add(cmd, "\r\n"));
 } 
 read_line(response);
 
 if (strstr(response, ack) != NULL) {
 return true;
 } 
 return false;
}
int MQTTGPRSEthernet::read_line(char* buffer, int timeout)
{
 int bytes = 0;
 uint32_t start = us_ticker_read();
 
 while (true) {
 if (eth.readable()) {
 char ch = eth.getc();
 if ((ch == '\n' || ch == '\r') && bytes == 0)
 {
 //此时说明以\r\n开头,在无回显的情况下经常会出现
 //此时忽略空行\r\n
 continue;
 }
 if (ch == '\n') {
 if (bytes > 0 && buffer[bytes - 1] == '\r') {
 //表明读取到\r\n,此行结束
 bytes--;
 }
 if (bytes > 0) {
 buffer[bytes] = '\0';
 return bytes;
 }
 }
 else {
 buffer[bytes] = ch;
 bytes++;
 }
 }
 else {
 if ((uint32_t)(us_ticker_read() - start) > timeout) {
 return bytes;
 }
 }
 }
 //此时表示读到最后一个字节还没有读到\n
 //有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
 return bytes;
}
int MQTTGPRSEthernet::read(unsigned char* buffer, int len, int timeout)
{
 int bytes = 0;
 uint32_t start = us_ticker_read();
 
 while (bytes < len) {
 if (eth.readable()) {
 char ch = eth.getc();
 buffer[bytes] = ch;
 bytes++;
 }
 else {
 if ((uint32_t)(us_ticker_read() - start) > timeout) {
 return bytes;
 }
 }
 }
 //此时表示读到最后一个字节还没有读到\n
 //有两种情况,一种是len不够长,另一种是恰好读完,此时数组的末尾没有\0
 return bytes;
}
 
int MQTTGPRSEthernet::write(unsigned char* buffer, int len, int timeout)
{
 uint32_t start = us_ticker_read();
 while (!connected)
 {
 if ((us_ticker_read() - start) > timeout)
 {
 return -1;
 }
 initNet();
 }
 for (size_t i = 0; i < len; i++)
 {
 LOGC(buffer[i]);
 eth.putc(buffer[i]);
 } 
 return len;
}
bool MQTTGPRSEthernet::disconnect()
{
 wait_ms(1200);
 eth.printf("+++");
 wait_ms(1200);
 eth.printf("AT\r\n");
 eth.printf("AT+CIPSHUT\r\n");
 connected = false;
 return true;
}
 
MQTTGPRSEthernet::~MQTTGPRSEthernet()
{
 if (connected)
 {
 disconnect();
 }
}

主程序

#include "mbed.h"
 
#define APN         "uninet"
#define USERNAME    NULL
#define PASSWORD    NULL
#define INIT_TIMES  5            //初始化次数
 
#ifdef DEBUG
Serial pc2(PB_10, PB_11, 115200);
#define LOG(args...)    pc2.printf(args)
#else
#define LOG(args...)    (args)
#endif // DEBUG
 
#include "MQTTClient.h"
#include "MQTTmbed.h"
#include "MQTTGPRSEthernet.h"
#include "StringHelper.h"
 
int arrivedcount = 0;
 
void messageArrived(MQTT::MessageData& md)
{ 
 MQTT::Message &message = md.message;
 LOG("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message.qos, message.retained, message.dup, message.id);
 LOG("Payload %.*s\n", message.payloadlen, (char*)message.payload);
 ++arrivedcount;
}
 
int main(int argc, char* argv[])
{
 MQTTGPRSEthernet ethernet(PB_6, PB_7);
 for (int i = 0; i < INIT_TIMES; i++)
 {
 if (ethernet.initNet(APN, USERNAME, PASSWORD))
 break;
 else 
 LOG("SIM CARD INIT FAILED!\r\n");
 } 
           
 MQTT::Client<MQTTGPRSEthernet, Countdown> client = MQTT::Client<MQTTGPRSEthernet, Countdown>(ethernet);
    
 char* hostname = "iot.eclipse.org";
 int port = 1883;
 for (int i = 0; i < INIT_TIMES; i++)
 {
 if (ethernet.connect(hostname, port)) 
 break; 
 else
 LOG("CAN NOT CONNECT TO SERVER!\r\n");
 }
 
 char* topic = "tson-topic-18669888635";
 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;       
 data.MQTTVersion = 3;
 data.clientID.cstring = "tson-client-18669888635"; 
 int rc;
 for (int i = 0; i < INIT_TIMES; i++)
 {
 if ((rc = client.connect(data)) != 0)
 //无法连接到MQTT服务器
 LOG("CAN NOT CONNECT TO MQTT SERVER!\r\n"); 
 else 
 break; 
 }
 
 if ((rc = client.subscribe(topic, MQTT::QOS0, messageArrived)) != 0)
 //无法订阅
 LOG("CAN NOT SUBSCRIBE THE TOPIC!\r\n");
 
 MQTT::Message message;
 
    // QoS 0
 char buf[100];
 sprintf(buf, "Hello World!  QoS 0 message from tson\r\n");
 message.qos = MQTT::QOS0;
 message.retained = false;
 message.dup = false;
 message.payload = (void*)buf;
 message.payloadlen = strlen(buf) + 1;
 rc = client.publish(topic, message);
 while (arrivedcount == 0)
 client.yield(100000);
        
 // QoS 1
 sprintf(buf, "Hello World!  QoS 1 message from tson\n");
 message.qos = MQTT::QOS1;
 message.payloadlen = strlen(buf) + 1;
 rc = client.publish(topic, message);
 while (arrivedcount == 1)
 client.yield(100000);
        
 // QoS 2
 sprintf(buf, "Hello World!  QoS 2 message from tson\n");
 message.qos = MQTT::QOS2;
 message.payloadlen = strlen(buf) + 1;
 rc = client.publish(topic, message);
 while (arrivedcount == 2)
 client.yield(100000);
    
 if ((rc = client.unsubscribe(topic)) != 0)
 printf("rc from unsubscribe was %d\n", rc);
    
 if ((rc = client.disconnect()) != 0)
 printf("rc from disconnect was %d\n", rc);
    
 ethernet.disconnect(); 
 return 0;
}

client的yield函数中用调用messageArrived函数,之前的示例给出的是100,可能是由于间太短的缘故,总是调用不了回调函数,所以我将其改的非常大,便于调试。实际使用时可以使用1000。

四、总结

其实paho embeded-c用起来还是挺方便的,但是代码的重要性不高,所以移植起来往往会让人无从下手。但深放研究就会发现其实使用起来是非常简单的。

—————————–可爱的分割线——————————————————

有路过的吃瓜群众说想要StringHelper这个类,当时这个类用处不大,里面多数功能都可以用std::string中的功能来实现。不过自己刚从C转过来自己并不太了解,所以自己又写了一个,功能并不是很完善。也分享给大家:StringHelper

原创文章,转载请注明:转载自 TsonTec:测量解决方案提供者

本文链接地址: 在GPRS模块(SIM800C)和STM32芯片上实现MQTT协议


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

大演算

大演算

佩德羅.多明戈斯 / 張正苓,胡玉城 / 三采 / 2016-8-1 / 620

揭開大數據、人工智慧、機器學習的祕密, 打造人類文明史上最強大的科技——終極演算法! 有一個終極演算法,可以解開宇宙所有的祕密, 現在大家都在競爭,誰能最先解開它! .機器學習是什麼?大演算又是什麼? .大演算如何運作與發展,機器可以預測什麼? .我們可以信任機器學過的東西嗎? .商業、政治為什麼要擁抱機器學習? .不只商業與政治,醫學與科學界也亟需......一起来看看 《大演算》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具