云中树莓派(2):将传感器数据上传到AWS IoT 并利用Kibana进行展示
云中树莓派(3):通过 AWS IoT 控制树莓派上的Led
1. Led 连接与测试
在某宝上买了几样配件,包括T型GPIO扩展板、40P排线、亚克力外壳、400孔面包板、若干杜邦线。现在我的树莓派长得这个样子了:
不由得感谢神奇的某宝,这些东西每一样都不超过三四块钱。
1.1 接线
以下几个简单步骤就完成了接线:
- 将排线一头插在树莓派的40个pin脚上,将另一头插在扩展板上。要注意方向,多试几次。还要注意在树莓派关机时候再插入。
- 把扩展板插在面包板上。
- 把Led 的长脚(正极)插在面包板第6行的任何一个孔内(对应GPIO18),将其短脚(负极或接地)插在第7行的任何一个孔内(对应GND)。
简单说下面包板。刚拿到手时还有点不知所措,稍微研究一下后就简单了。面包板为长方形,长边的两边是为了接电源的,每个长边都是联通的;中间区域内,每行内是联通的。
1.2 简单测试
下面的 python 能让led 灯每两秒钟亮一次:
import RPi.GPIO as GPIO import time PIN_NO=18 GPIO.setmode(GPIO.BCM) GPIO.setup(PIN_NO, GPIO.OUT) loopCount = 0 for x in xrange(500): print("Loop " + str(loopCount)) GPIO.output(PIN_NO, GPIO.HIGH) time.sleep(2) GPIO.output(PIN_NO, GPIO.LOW) time.sleep(2) loopCount += 1 GPIO.cleanup()
也就是通过控制GPIO18的电压为高还是低来控制Led 灯是亮还是灭。
2. AWS IoT Device Shadow
AWS IoT 中一个功能叫做 Device Shadow,翻译为『设备影子』。它本质上为用于存储和检索设备的当前状态信息的 JSON 文档。Device Shadow 服务可以为您连接到 AWS IoT 的每台设备保留一个影子。您可以使用该影子通过 MQTT 或 HTTP 获取和设置设备的状态,无论该设备是否连接到 Internet。每台设备的影子都由相应事物的名称唯一标识。Device Shadow 服务充当中介,支持设备和应用程序检索和更新设备的影子。
AWS IoT 针对设备的影子提供了三项操作:
- UPDATE:如果设备的影子不存在,则创建一个该影子;如果存在,则使用请求中提供的数据更新设备的影子的内容。存储数据时使用时间戳信息,以指明最新更新时间。向所有订阅者发送消息,告知 desired 状态与 reported 状态之间的差异 (增量)。接收到消息的事物或应用程序可以根据 desired 状态和 reported 状态之间的差异执行操作。例如,设备可将其状态更新为预期状态,或者应用程序可以更新其 UI,以反映设备状态的更改。
- GET:检索设备的影子中存储的最新状态 (例如,在设备启动期间,检索配置和最新操作状态)。此操作将返回整个 JSON 文档,其中包括元数据。
- DELETE:删除设备的影子,包括其所有内容。这将从数据存储中删除 JSON 文档。您无法还原已删除的设备的影子,但可以创建具有相同名称的新影子。
- 连接着的 Led 灯发送封装在MQTT消息中的 reported 状态 『off』 到 AWS IoT
- AWS IoT 通过 led 灯使用的证书来确定它所属的虚拟事物
- AWS IoT 将 reported 状态保存在设备影子 JSON 文件中
- 一条 AWS IoT rule 正监控着 led 的 off 状态,它发送一个消息到某个 SNS topic
- 某 application 收到 SNS 消息。用户将 led 期望状态(desired state)设置为 on
- AWS IoT 收到该消息,它更新设备影子中的 desired state,同时发送包含期望状态的 message 到某些topic。此时,reported 和 desired 状态是 『Out of Sync』的
- led 收到 delta 消息,开始根据其中的 desired status 来设置其实际状态
- led 将其状态设置为 on,向 MQTT topic 发送新的 reported 状态
- AWS IoT 更新设备影子,现在该设备的 reported 和 desired 状态一致了
也就是说,要通过 AWS IoT 来操作设备,需要通过设备影子进行。下图是控制Led 灯泡的示意图。外部应用和设备之间的交互通过设备影子进行。
Device Shadow 使用系统预留的 MQTT 主题来做应用程序和设备之间的通信:
MQTT 主题 | 用途 |
$aws/things/myLightBulb/shadow/update/accepted |
当设备的影子更新成功时,Device Shadow 服务将向此主题发送消息 |
$aws/things/myLightBulb/shadow/update/rejected |
当设备的影子更新遭拒时,Device Shadow 服务将向此主题发送消息 |
$aws/things/myLightBulb/shadow/update/delta |
当检测到设备的影子的“reported”部分与“desired”部分之间存在差异时,Device Shadow 服务将向此主题发送消息。 |
$aws/things/myLightBulb/shadow/get/accepted |
当获取设备的影子的请求获批时,Device Shadow 服务将向此主题发送消息。 |
$aws/things/myLightBulb/shadow/get/rejected |
当获取设备的影子的请求遭拒时,Device Shadow 服务将向此主题发送消息。 |
$aws/things/myLightBulb/shadow/delete/accepted |
当设备的影子被删除时,Device Shadow 服务将向此主题发送消息。 |
$aws/things/myLightBulb/shadow/delete/rejected |
当删除设备的影子的请求遭拒时,Device Shadow 服务将向此主题发送消息。 |
$aws/things/myLightBulb/shadow/update/documents |
每次设备的影子更新成功执行时,Device Shadow 服务都会向此主题发布状态文档。 |
下图显示了控制流程:
3. Python 代码
代码一共就两个文件。文件 ledController.py 充当Led 灯的控制器,它定期向Led 发出『开』或『关』的指令,并定期获取其状态;文件 ledSwitch.py 充当 Led 等的操纵器,它通过调用树莓派的 GPIO 接口来设置和获取 led 灯的状态,以及将其状态上报给 IoT 服务。
AWS IoT 提供了 Device Shadow python SDK,因此不需要直接操作各种 MQTT 主题,而可以使用 get,update 和 delete 这种API。其地址在 https://github.com/aws/aws-iot-device-sdk-python/tree/master/AWSIoTPythonSDK/core/shadow 。
3.1 ledController.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient import logging import time import json import threading # Led shadow JSON Schema # # # Name: Led # { # "state: { # "desired": { # "light": <on|off> # } # } #} deviceShadowHandler = None def getDeviceStatus(): while True: print("Getting device status...\n") deviceShadowHandler.shadowGet(customShadowCallback_get, 50) time.sleep(60) def customShadowCallback_get(payload, responseStatus, token): if responseStatus == "timeout": print("Get request with token " + token + " time out!") if responseStatus == "accepted": print("========== Printing Device Current Status =========") print(payload) payloadDict = json.loads(payload) #{"state":{"desired":{"light":0},"reported":{"light":100} try: desired = payloadDict["state"]["desired"]["light"] desiredTime = payloadDict["metadata"]["desired"]["light"]["timestamp"] except Exception: print("Failed to get desired state and timestamp.") else: print("Desired status: " + str(desired) + " @ " + time.ctime(int(desiredTime))) try: reported = payloadDict["state"]["reported"]["light"] #"metadata":{"desired":{"light":{"timestamp":1533893848}},"reported":{"light":{"timestamp":1533893853}}} reportedTime = payloadDict["metadata"]["reported"]["light"]["timestamp"] except Exception: print("Failed to get reported time or timestamp") else: print("Reported status: " + str(reported) + " @ " + time.ctime(int(reportedTime))) print("=======================================\n\n") if responseStatus == "rejected": print("Get request with token " + token + " rejected!") def customShadowCallback_upate(payload, responseStatus, token): # payload is a JSON string which will be parsed by jason lib if responseStatus == "timeout": print("Update request with " + token + " time out!") if responseStatus == "accepted": playloadDict = json.loads(payload) print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") print("Update request with token: " + token + " accepted!") print("light: " + str(playloadDict["state"]["desired"]["light"])) print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n") if responseStatus == "rejected": print("Update request " + token + " rejected!") def customShadowCallback_delete(payload, responseStatus, token): if responseStatus == "timeout": print("Delete request " + token + " time out!") if responseStatus == "accepted": print("Delete request with token " + token + " accepted!") if responseStatus == "rejected": print("Delete request with token " + token + " rejected!") # Cofigure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.ERROR) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) # AWS IoT Core endpoint. Need change some values to yours. awsiotHost = "**********.iot.*********.amazonaws.com" awsiotPort = 8883; # AWS IoT Root Certificate. Needn't change. rootCAPath = "/home/pi/awsiot/VeriSign-Class3-Public-Primary-Certification-Authority-G5.pem" # Device private key. Need change to yours. privateKeyPath = "/home/pi/awsiot/aec2731afd-private.pem.key" # Device certificate. Need change to yours. certificatePath = "/home/pi/awsiot/aec2731afd-certificate.pem.crt" myAWSIoTMQTTShadowClient = None; myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient("RaspberryLedController") myAWSIoTMQTTShadowClient.configureEndpoint(awsiotHost, awsiotPort) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) myAWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(60) # 10sec myAWSIoTMQTTShadowClient.configureMQTTOperationTimeout(50) #5sec #connect to AWS IoT myAWSIoTMQTTShadowClient.connect() #create a devcie Shadow with persistent subscription thingName = "homepi" deviceShadowHandler = myAWSIoTMQTTShadowClient.createShadowHandlerWithName(thingName, True) #Delete shadow JSON doc deviceShadowHandler.shadowDelete(customShadowCallback_delete, 50) #start a thread to get device status every 5 seconds statusLoopThread = threading.Thread(target=getDeviceStatus) statusLoopThread.start() #update shadow in a loop loopCount = 0 while True: desiredState = "off" if (loopCount % 2 == 0): desiredState = "on" print("To change Led desired status to \"" + desiredState + "\" ...\n") jsonPayload = '{"state":{"desired":{"light":"' + desiredState + '"}}}' print("payload is: " + jsonPayload + "\n") deviceShadowHandler.shadowUpdate(jsonPayload, customShadowCallback_upate, 60) loopCount += 1 time.sleep(60)
3.2 ledSwitch.py
import RPi.GPIO as GPIO from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient import logging import time import json # Led shadow JSON Schema # # # Name: Led # { # "state: { # "desired": { # "light": <on|off> # } # } #} LED_PIN_NUM = 18 # GPIO Number of Led long pin. Change to yours. deviceShadowHandler = None #initialize GOPI GPIO.setmode(GPIO.BCM) GPIO.setup(LED_PIN_NUM, GPIO.OUT) def customShadowCallback_Delta(payload, responseStatus, token): # payload is a JSON string which will be parsed by jason lib print(responseStatus) print(payload) payloadDict = json.loads(payload) print("++++++++ Get DELTA data +++++++++++") desiredStatus = str(payloadDict["state"]["light"]) print("desired status: " + desiredStatus) print("version: " + str(payloadDict["version"])) #get device current status currentStatus = getDeviceStatus() print("Device current status is " + currentStatus) #udpate device current status if (currentStatus != desiredStatus): # update device status as desired updateDeviceStatus(desiredStatus) # send current status to IoT service sendCurrentState2AWSIoT() print("+++++++++++++++++++++++++++\n") def updateDeviceStatus(status): print("=============================") print("Set device status to " + status) if (status == "on"): turnLedOn(LED_PIN_NUM) else: turnLedOff(LED_PIN_NUM) print("=============================\n") def getDeviceStatus(): return getLedStatus(LED_PIN_NUM) def turnLedOn(gpionum): GPIO.output(gpionum, GPIO.HIGH) def turnLedOff(gpionum): GPIO.output(gpionum, GPIO.LOW) def getLedStatus(gpionum): outputFlag = GPIO.input(gpionum) print("outputFlag is " + str(outputFlag)) if outputFlag: return "on" else: return "off" def sendCurrentState2AWSIoT(): #check current status of device currentStatus = getDeviceStatus() print("Device current status is " + currentStatus) print("Sending reported status to MQTT...") jsonPayload = '{"state":{"reported":{"light":"' + currentStatus + '"}}}' print("Payload is: " + jsonPayload + "\n") deviceShadowHandler.shadowUpdate(jsonPayload, customShadowCallback_upate, 50) def customShadowCallback_upate(payload, responseStatus, token): # payload is a JSON string which will be parsed by jason lib if responseStatus == "timeout": print("Update request with " + token + " time out!") if responseStatus == "accepted": playloadDict = json.loads(payload) print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") print(payload) print("Update request with token: " + token + " accepted!") print("light: " + str(playloadDict["state"]["reported"]["light"])) print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n") if responseStatus == "rejected": print("Update request " + token + " rejected!") def customShadowCallback_Get(payload, responseStatus, token): print("responseStatus: " + responseStatus) print("payload: " + payload) payloadDict = json.loads(payload) # {"state":{"desired":{"light":37},"delta":{"light":37}},"metadata":{"desired":{"light":{"timestamp":1533888405}}},"version":54 stateStr = "" try: stateStr = stateStr + "Desired: " + str(payloadDict["state"]["desired"]["light"]) + ", " except Exception: print("No desired state") try: stateStr = stateStr + "Delta: " + str(payloadDict["state"]["delta"]["light"]) + ", " except Exception: print("No delta state") try: stateStr = stateStr + "Reported: " + str(payloadDict["state"]["reported"]["light"]) + ", " except Exception: print("No reported state") print(stateStr + ", Version: " + str(payloadDict["version"])) def printDeviceStatus(): print("=========================") status = getDeviceStatus() print(" Current status: " + str(status)) print("=========================\n\n") # Cofigure logging logger = logging.getLogger("AWSIoTPythonSDK.core") logger.setLevel(logging.DEBUG) streamHandler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') streamHandler.setFormatter(formatter) logger.addHandler(streamHandler) awsiotHost = "***********.iot.********.amazonaws.com" awsiotPort = 8883; rootCAPath = "/home/pi/awsiot/VeriSign-Class3-Public-Primary-Certification-Authority-G5.pem" privateKeyPath = "/home/pi/awsiot/aec2731afd-private.pem.key" certificatePath = "/home/pi/awsiot/aec2731afd-certificate.pem.crt" myAWSIoTMQTTShadowClient = None; myAWSIoTMQTTShadowClient = AWSIoTMQTTShadowClient("RaspberryLedSwitch") myAWSIoTMQTTShadowClient.configureEndpoint(awsiotHost, awsiotPort) myAWSIoTMQTTShadowClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) myAWSIoTMQTTShadowClient.configureAutoReconnectBackoffTime(1, 32, 20) myAWSIoTMQTTShadowClient.configureConnectDisconnectTimeout(60) # 10sec myAWSIoTMQTTShadowClient.configureMQTTOperationTimeout(30) #5sec #connect to AWS IoT myAWSIoTMQTTShadowClient.connect() #create a devcie Shadow with persistent subscription thingName = "homepi" deviceShadowHandler = myAWSIoTMQTTShadowClient.createShadowHandlerWithName(thingName, True) #listen on deleta deviceShadowHandler.shadowRegisterDeltaCallback(customShadowCallback_Delta) #print the intital status printDeviceStatus() #send initial status to IoT service sendCurrentState2AWSIoT() #get the shadow after started deviceShadowHandler.shadowGet(customShadowCallback_Get, 60) #update shadow in a loop loopCount = 0 while True: time.sleep(1)
3.3 主要过程
(1)ledSwitch.py 在运行后,获取led 的初始状态,并将其发给 AWS IoT 服务:
========================= outputFlag is 0 Current status: off ========================= outputFlag is 0 Device current status is off Sending reported status to MQTT... Payload is: {"state":{"reported":{"light":"off"}}}
(2)ledController.py 开始运行后,它首先获取led 的当前状态,为 『off』
(3)Controller 将其 desired 状态设置为 on
To change Led desired status to "on" ... payload is: {"state":{"desired":{"light":"on"}}}
(4)Switch 收到 DELTA 消息,调用 GPIO 接口设置其状态,并向 IOT 服务报告其状态
{"version":513,"timestamp":1533983956,"state":{"light":"on"},"metadata":{"light":{"timestamp":1533983956}},"clientToken":"93dfc84c-c9f9-49fb-b844-d55203991208"} ++++++++ Get DELTA data +++++++++++ desired status: on version: 513 outputFlag is 0 Device current status is off ============================= Set device status to on ============================= outputFlag is 1 Device current status is on Sending reported status to MQTT... Payload is: {"state":{"reported":{"light":"on"}}}
(5)Controller 获取最新状态
{"state":{"desired":{"light":"on"},"reported":{"light":"on"}},"metadata":{"desired":{"light":{"timestamp":1533983956}},"reported":{"light":{"timestamp":1533983957}}},"version":514,"timestamp":1533983959,"clientToken":"f24bcbbb-4b24-4354-b1df-349afdf23422"} Desired status: on @ Sat Aug 11 18:39:16 2018 Reported status: on @ Sat Aug 11 18:39:17 2018
(6)循环往复
参考链接:
- https://www.linkedin.com/pulse/understanding-internet-things-aws-iot-kay-lerch/
- https://iotbytes.wordpress.com/device-shadows-part-1-basics/
- https://github.com/pradeesi/AWS-IoT-Shadows
- https://github.com/aws/aws-iot-device-sdk-python/tree/master/samples/basicShadow
- https://raspi.tv/2013/rpi-gpio-basics-5-setting-up-and-using-outputs-with-rpi-gpio
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 树莓派通过 n2n 实现内网穿透
- Windows系统监听键盘通过UDP协议控制树莓派小车
- [树莓派]树莓派的入门教程
- 「玩转树莓派」树莓派 3B+ 安装 OpenCv
- 「玩转树莓派」树莓派 3B+ 查询本机IP
- 「玩转树莓派」树莓派 3B+ 配置静态IP
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python金融大数据分析
[德] 伊夫·希尔皮斯科 / 姚军 / 人民邮电出版社 / 2015-12 / CNY 99.00
唯一一本详细讲解使用Python分析处理金融大数据的专业图书;金融应用开发领域从业人员必读。 Python凭借其简单、易读、可扩展性以及拥有巨大而活跃的科学计算社区,在需要分析、处理大量数据的金融行业得到了广泛而迅速的应用,并且成为该行业开发核心应用的首选编程语言。《Python金融大数据分析》提供了使用Python进行数据分析,以及开发相关应用程序的技巧和工具。 《Python金融大......一起来看看 《Python金融大数据分析》 这本书的介绍吧!