内容简介:展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:
展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。
场景 - 支持物联网的车队管理
一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:
- 每条交付路线的车辆类型的总体分布。
- 最近(例如,在过去30秒内)每个货物交付路线的这些车辆类型的子集。
- 道路封闭附近的车辆清单,以便他们可以预测交货延误。
应用架构
除了 Confluent Kafka 作为流媒体平台,该应用程序还具有以下组件:
- 数据存储: YugaByte DB 用于存储来自Kafka流的原始事件以及来自KSQL数据处理器的聚合。
- 数据生成器:用于模拟写入Kafka流的车辆事件的程序。
- 数据处理器: 从Data Producer读取 KSQL ,计算聚合并将结果存储在Data Store中。
- Data Dashboard: Spring Boot 应用程序,使用Web套接字,jQuery和Bootstrap显示数据处理器的输出。
下面是显示这些组件如何组合在一起的架构图。我们称之为Confluent Kafka,KSQL和YugaByte DB堆栈或CKY堆栈。
我们现在将详细介绍这些组件中的每一个。
数据存储
该层存储所有用户数据。YugaByte DB用作数据库, YugaByte云查询语言(YCQL) 用作数据库API。所有数据都存储在密钥空间TrafficKeySpace中。有一个Origin_Table用于存储原始事件的表。
CREATE TABLE TrafficKeySpace.Origin_Table ( vehicleId text, routeId text, vehicleType text, longitude text, latitude text, timeStamp timestamp, speed <b>double</b>, fuelLevel <b>double</b>, PRIMARY KEY ((vehicleId), timeStamp) ) WITH <b>default</b>_time_to_live = 3600;
请注意default_time_to_live设置为3600秒的值,以确保原始事件在1小时后自动删除。这是为了确保原始事件不会消耗数据库中的所有存储,并且在计算聚合后不久就会有效地从数据库中删除。
有三个表用于保存用于面向用户的显示的数据:
- Total_Traffic 交通信息
- Window_Traffic 最后30秒的流量和
- poi_traffic 对于兴趣点附近的交通(道路封闭)。
数据处理器不断更新这些表,仪表板从中读取。
以下是这些表:
CREATE TABLE TrafficKeySpace.Total_Traffic ( routeId text, vehicleType text, totalCount bigint, timeStamp timestamp, recordDate text, PRIMARY KEY (routeId, recordDate, vehicleType) ); CREATE TABLE TrafficKeySpace.Window_Traffic ( routeId text, vehicleType text, totalCount bigint, timeStamp timestamp, recordDate text, PRIMARY KEY (routeId, recordDate, vehicleType) ); CREATE TABLE TrafficKeySpace.Poi_Traffic( vehicleid text, vehicletype text, distance bigint, timeStamp timestamp, PRIMARY KEY (vehicleid) );
数据生产者
这包含生成模拟测试数据并将其发布到Kafka主题iot-data-event的程序。这模拟了使用现实世界中的消息代理从连接的车辆接收的数据。
单个数据点是JSON有效负载,如下所示:
{ <font>"vehicleId"</font><font>:</font><font>"0bf45cac-d1b8-4364-a906-980e1c2bdbcb"</font><font>, </font><font>"vehicleType"</font><font>:</font><font>"Taxi"</font><font>, </font><font>"routeId"</font><font>:</font><font>"Route-37"</font><font>, </font><font>"longitude"</font><font>:</font><font>"-95.255615"</font><font>, </font><font>"latitude"</font><font>:</font><font>"33.49808"</font><font>, </font><font>"timestamp"</font><font>:</font><font>"2017-10-16 12:31:03"</font><font>, </font><font>"speed"</font><font>:49.0, </font><font>"fuelLevel"</font><font>:38.0 } </font>
消费者读取上面的iot-data-event主题,将每个这样的事件转换为YCQL INSERT语句,然后调用YugaByte DB持久化到事件表TrafficKeySpace.Origin_Table。
数据处理器
KSQL 是Apache Kafka的流式 SQL 引擎。它为Kafka上的流处理提供了一个易于使用但功能强大的交互式SQL接口,无需使用 Java 或 Python 等编程语言编写代码。它支持广泛的流操作,包括数据过滤,转换,聚合,连接,窗口和会话。
使用KSQL的第一步是STREAM从原始事件创建一个iot-data-event如下所示。
CREATE STREAM traffic_stream ( vehicleId varchar, vehicleType varchar, routeId varchar, timeStamp varchar, latitude varchar, longitude varchar) WITH ( KAFKA_TOPIC='iot-data-event', VALUE_FORMAT='json', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss');
现在可以在上面的流上运行各种聚合/查询,每种类型的查询的结果存储在它自己的新Kafka主题中。此应用程序使用3个此类查询/主题。此后, Kafka Connect YugaByte DB Sink Connector 读取这3个主题,并将结果保存到YugaByte DB中的3个相应表中。
CREATE TABLE total_traffic WITH ( PARTITIONS=1, KAFKA_TOPIC='total_traffic', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS SELECT routeId, vehicleType, count(vehicleId) AS totalCount, max(rowtime) AS timeStamp, TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate FROM traffic_stream GROUP BY routeId, vehicleType; CREATE TABLE window_traffic WITH ( TIMESTAMP='timeStamp', KAFKA_TOPIC='window_traffic', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', PARTITIONS=1) AS SELECT routeId, vehicleType, count(vehicleId) AS totalCount, max(rowtime) AS timeStamp, TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate FROM traffic_stream WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) GROUP BY routeId, vehicleType; CREATE STREAM poi_traffic WITH ( PARTITIONS=1, KAFKA_TOPIC='poi_traffic', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS SELECT vehicleId, vehicleType, <b>cast</b>(GEO_DISTANCE(<b>cast</b>(latitude AS <b>double</b>),<b>cast</b>(longitude AS <b>double</b>),33.877495,-95.50238,'KM') AS bigint) AS distance, timeStamp FROM traffic_stream WHERE GEO_DISTANCE(<b>cast</b>(latitude AS <b>double</b>),<b>cast</b>(longitude AS <b>double</b>),33.877495,-95.50238,'KM') < 30;
所有 Kafka Connect YugaByte DB Sink Connector 用于存储两个原始事件以及集合数据(这是使用KSQL生成)。它计算如下:
- 到目前为止所有车辆和货物的车辆类型和装运路线细分。
- 仅针对有效货件计算上述细分。这是通过计算最近30秒的车辆类型和装运路线的细分来完成的。
- 检测距离给定兴趣点(POI)20英里范围内的车辆,这表示道路封闭。
数据仪表板
这是一个 Spring Boot 应用程序,它从YugaByte DB查询数据,并使用 Web Sockets 和 jQuery 将数据推送到网页。数据以固定间隔推送到网页,因此数据将自动刷新。主UI页面使用 bootstrap.js 显示包含图表和表格的仪表板。
我们为三个表创建实体类Total_Traffic,Window_Traffic并poi_traffic为所有实体扩展创建数据访问对象(DAO)接口CassandraRepository。例如,我们为TotalTrafficData实体创建DAO类,如下所示。
@Repository <b>public</b> <b>interface</b> TotalTrafficDataRepository <b>extends</b> CassandraRepository<TotalTrafficData>{ @Query(<font>"SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ?0 ALLOW FILTERING"</font><font>) Iterable<TotalTrafficData> findTrafficDataByDate(String date); </font>
为了连接到YugaByte数据库集群并获得数据库操作的连接,我们还编写了一个 DatabaseConfig 类。
请注意,目前仪表板不访问原始事件表,仅依赖于存储在聚合表中的数据。
总结
此应用程序是使用Confluent Kafka,KSQL,Spring Boot和YugaByte DB构建IoT应用程序的蓝图。虽然这篇文章专注于本地集群部署,但Kafka代理和YugaByte数据库节点可以在真正的集群部署中进行水平扩展,以获得更多的应用程序吞吐量和容错能力。可以在 yb-iot-fleet-management GitHub仓库中找到构建和运行应用程序的说明以及源代码。
以上所述就是小编给大家介绍的《使用Confluent Kafka,KSQL,Spring Boot和分布式SQL开发物联网应用程序》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ZooKeeper:分布式应用程序的分布式协调服务
- ZooKeeper:入门:用 ZooKeeper 协调分布式应用程序
- ZooKeeper 3.4.13 发布,分布式应用程序协调服务
- ZooKeeper 3.5.4-beta 发布,分布式应用程序协调服务
- Android里应用程序,应用程序窗口和视图对象之间的关系
- 使用 Bluemix、Watson Discovery 和 Cloudant 构建移动应用程序来分析其他应用程序
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
SEM修炼手册:百度竞价、信息流广告、数据分析与专题页策划实战详解
陈丰洲 / 电子工业出版社 / 2018-10 / 59.80元
SEM人员在职场打拼的过程中,会遇到一个又一个坑,《SEM修炼手册:百度竞价、信息流广告、数据分析与专题页策划实战详解》尝试站在一定的高度,将从业者从专员走向管理岗位过程中可能碰到的问题进行整理,不仅谈竞价推广,也谈基于SEM的营销体系。 《SEM修炼手册:百度竞价、信息流广告、数据分析与专题页策划实战详解》包括11章内容,由浅入深地分享SEM的进阶过程。第1章是SEM概述,让读者对SEM有......一起来看看 《SEM修炼手册:百度竞价、信息流广告、数据分析与专题页策划实战详解》 这本书的介绍吧!