python使用MQTT给硬件传输图片

栏目: Python · 发布时间: 5年前

内容简介:最近因需要用python写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程如下:根据上面的协议,可以得到如下的流程图:

最近因需要用 python 写一个微服务来用MQTT给硬件传输图片,其中python用的是flask框架,大概流程如下:

python使用MQTT给硬件传输图片

协议为:

  • 需要将图片数据封装成多个消息进行传输,每个消息传输的数据字节数为1400Byte。
  • 消息(MQTT Payload) 格式:Web服务器-------->BASE:
    python使用MQTT给硬件传输图片
  • 反馈:BASE---------> Web服务器:
    python使用MQTT给硬件传输图片
  • 如果Web服务器发送完一个“数据传输消息”后,5S内没有收到MQTT“反馈消息”或者收到的反馈中显示“数据包不完整”,则重发该“数据传输消息”。

程序流程图

根据上面的协议,可以得到如下的流程图:

python使用MQTT给硬件传输图片

代码如下:

# encoding:utf-8
from flask import Flask, jsonify
from flask_restful import Api, Resource, reqparse
from PIL import Image
from io import BytesIO
import requests
import os, logging, time
import paho.mqtt.client as mqtt
import struct
from flask_cors import *

# 日志配置信息
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s  (runing by %(funcName)s',
)


class Mqtt(object):
    def __init__(self, img_data, size):
        self.MQTTHOST = '*******'
        self.MQTTPORT = "******"

        # 订阅和发送的主题
        self.topic_from_base = 'mqttTestSub'
        self.topic_to_base = 'mqttTestPub'

        self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
        self.client = mqtt.Client(self.client_id)

        # 完成链接后的回掉函数
        self.client.on_connect = self.on_connect
        # 图片大小
        self.size = size

        # 用于跳出死循环,结束任务
        self.finished = None

        # 包的编号
        self.index = 0

        # 将收到的图片数据按大小分成列表
        self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]

        # 记录发布后的数据,用于监控时延
        self.pub_time = 0


        self.header_to_base = 0xffffeeee
        self.header_from_base = 0xeeeeffff

        # 功能标识
        self.function_begin = 0x01
        self.function_doing = 0x02
        self.function_finished = 0x03

        # 包的完整和非完整状态
        self.whole_package = 0x01
        self.bad_package = 0x00

        # 头信息的格式,小端模式
        self.format_to_base = "<Lbhh"
        self.format_from_base = "<Lbhb"

        # 如果重发包时,用于检查是否重发第一个包
        self.first = True

        # 如果重发包时,用于检查是否重发最后一个包
        self.last = False

        self.begin_data = 'image.jpg;' + str(self.size)

    # 链接mqtt服务器函数
    def on_mqtt_connect(self):
        self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
        self.client.loop_start()

    # 链接完成后的回调函数
    def on_connect(self, client, userdata, flags, rc):
        logging.info("+++ Connected with result code {} +++".format(str(rc)))
        self.client.subscribe(self.topic_from_base)

    # 订阅函数
    def subscribe(self):
        self.client.subscribe(self.topic_from_base, 1)
        # 消息到来处理函数
        self.client.on_message = self.on_message

    # 接收到信息后的回调函数

    def on_message(self, client, userdata, msg):
        # 如果接受第一个包则不需要重发第一个
        self.first = False

        # 将接受到的包进行解压,得到一个元组
        base_tuple = struct.unpack(self.format_from_base, msg.payload)
        logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
        logging.info("+++ package_number is {}, package_status_from_base is {}  +++"
                     .format(base_tuple[2], base_tuple[3]))

        # 检查接受到信息的头部是否正确
        if base_tuple[0] == self.header_from_base:
            logging.info("+++ function_from_base is {}  +++".format(base_tuple[1]))

            # 是否完成传输,如果完成则退出
            if base_tuple[1] == self.function_finished:
                logging.info("+++  finish work +++")
                self.finished = 1
                self.client.disconnect()
            else:
                # 是否是最后一个包
                if self.index == len(self.image_data_list) - 1:
                    self.publish('finished', self.function_finished)
                    self.last = True
                    logging.info("+++ finished_data_to_base is finished+++")
                else:

                    # 如果接收到的包不是 0x03则进行传送数据
                    if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
                        logging.info("+++ package_number is {}, package_status_from_base is {}  +++"
                                     .format(base_tuple[2],base_tuple[3]))

                        # 如果数据的反馈中,包的状态是1则继续发下一个包
                        if base_tuple[3] == self.whole_package:
                            self.publish(self.index, self.function_doing)
                            logging.info("+++ data_to_base is finished+++")
                            self.index += 1

                        # 如果数据的反馈中,包的状态是0则重发数据包
                        elif base_tuple[3] == self.bad_package:
                            re_package_number = base_tuple[2]
                            self.publish(re_package_number-1, self.function_doing)
                            logging.info("+++ re_data_to_base is finished+++")
                        else:
                            logging.info("+++ package_status_from_base is not 0 or 1 +++")
                            self.client.disconnect()
                    else:
                        logging.info("+++  function_identifier is illegal +++")
                        self.client.disconnect()
        else:
            logging.info("+++ header_from_base is illegal +++")
            self.client.disconnect()

    # 数据发送函数
    def publish(self, index, fuc):
        # 看是否是最后一个包
        if index == 'finished':
            length = 0
            package_number = 0
            data = b''
        else:
            length = len(self.image_data_list[index])
            package_number = index
            data = self.image_data_list[index]

        # 打包数据头信息
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            fuc,
            package_number,
            length
        )
        to_base_data = buffer + data

        # mqtt发送
        self.client.publish(
            self.topic_to_base,
            to_base_data
        )
        self.pub_time = time.time()

    # 发送第一个包函数
    def publish_begin(self):
        buffer = struct.pack(
            self.format_to_base,
            self.header_to_base,
            self.function_begin,
            0,
            len(self.begin_data.encode('utf-8')),
        )
        begin_data = buffer + self.begin_data.encode('utf-8')
        self.client.publish(self.topic_to_base, begin_data)

    # 控制函数
    def control(self):
        self.on_mqtt_connect()
        self.publish_begin()
        begin_time = time.time()
        self.pub_time = time.time()
        self.subscribe()
        while True:
            time.sleep(1)
            # 超过5秒重传
            date = time.time() - self.pub_time
            if date > 5:
                # 是否重传第一个包
                if self.first == True:
                    self.publish_begin()
                    logging.info('+++ this is timeout first_data +++')

                # 是否重传最后一个包
                elif self.last == True:
                    self.publish('finished', self.function_finished)
                    logging.info('+++ this is timeout last_data +++')
                else:
                    self.publish(self.index-1, self.function_doing)
                    logging.info('+++ this is timeout middle_data +++')
            if self.finished == 1:
                logging.info('+++ all works is finished+++')
                break

        print(str(time.time()-begin_time) + 'begin_time - end_time')

app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)

# 接受参数
parser = reqparse.RequestParser()
parser.add_argument('url', help='mqttImage url', location='args', type=str)


class GetImage(Resource):
    # 得到参数并从图床下载到本地
    def get(self):
        args = parser.parse_args()
        url = args.get('url')
        response = requests.get(url)
        # 获取图片
        image = Image.open(BytesIO(response.content))
        # 存取图片
        add = os.path.join(os.path.abspath(''), 'image.jpg')
        image.save(add)
        # 得到图片大小
        size = os.path.getsize(add)
        f = open(add, 'rb')
        imageData = f.read()
        f.close()

        # 进行mqtt传输
        mqtt = Mqtt(imageData, size)
        mqtt.control()

        # 删除文件
        os.remove(add)
        logging.info('*** the result of control is {} ***'.format(1))
        return jsonify({
            "imageData": 1
        })


api.add_resource(GetImage, '/image')

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0')

复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

GWT in Action

GWT in Action

Robert Hanson、Adam Tacy / Manning Publications / 2007-06-05 / USD 49.99

This book will show Java developers how to use the Google Web Toolkit (GWT) to rapidly create rich web-based applications using their existing skills. It will cover the full development cycle, from ......一起来看看 《GWT in Action》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

html转js在线工具
html转js在线工具

html转js在线工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具