使用Confluent Kafka,KSQL,Spring Boot和分布式SQL开发物联网应用程序

栏目: Java · 发布时间: 6年前

内容简介:展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:

展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。

场景 - 支持物联网的车队管理

一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:

  • 每条交付路线的车辆类型的总体分布。
  • 最近(例如,在过去30秒内)每个货物交付路线的这些车辆类型的子集。
  • 道路封闭附近的车辆清单,以便他们可以预测交货延误。

应用架构

除了 Confluent Kafka 作为流媒体平台,该应用程序还具有以下组件:

  • 数据存储:  YugaByte DB 用于存储来自Kafka流的原始事件以及来自KSQL数据处理器的聚合。
  • 数据生成器:用于模拟写入Kafka流的车辆事件的程序。
  • 数据处理器: 从Data Producer读取 KSQL ,计算聚合并将结果存储在Data Store中。
  • Data Dashboard:  Spring Boot 应用程序,使用Web套接字,jQuery和Bootstrap显示数据处理器的输出。

下面是显示这些组件如何组合在一起的架构图。我们称之为Confluent Kafka,KSQL和YugaByte DB堆栈或CKY堆栈。

我们现在将详细介绍这些组件中的每一个。

数据存储

该层存储所有用户数据。YugaByte DB用作数据库, YugaByte云查询语言(YCQL) 用作数据库API。所有数据都存储在密钥空间TrafficKeySpace中。有一个Origin_Table用于存储原始事件的表。

CREATE TABLE TrafficKeySpace.Origin_Table (
   vehicleId text, 
   routeId text, 
   vehicleType text, 
   longitude text, 
   latitude text, 
   timeStamp timestamp, 
   speed <b>double</b>, 
   fuelLevel <b>double</b>, 
   PRIMARY KEY ((vehicleId), timeStamp)
) WITH <b>default</b>_time_to_live = 3600;

请注意default_time_to_live设置为3600秒的值,以确保原始事件在1小时后自动删除。这是为了确保原始事件不会消耗数据库中的所有存储,并且在计算聚合后不久就会有效地从数据库中删除。

有三个表用于保存用于面向用户的显示的数据:

  • Total_Traffic 交通信息
  • Window_Traffic 最后30秒的流量和
  • poi_traffic 对于兴趣点附近的交通(道路封闭)。

数据处理器不断更新这些表,仪表板从中读取。

以下是这些表:

CREATE TABLE TrafficKeySpace.Total_Traffic (
   routeId text, 
   vehicleType text, 
   totalCount bigint, 
   timeStamp timestamp, 
   recordDate text, 
   PRIMARY KEY (routeId, recordDate, vehicleType)
);

CREATE TABLE TrafficKeySpace.Window_Traffic (
   routeId text, 
   vehicleType text, 
   totalCount bigint, 
   timeStamp timestamp, 
   recordDate text, 
   PRIMARY KEY (routeId, recordDate, vehicleType)
);

CREATE TABLE TrafficKeySpace.Poi_Traffic(
   vehicleid text, 
   vehicletype text, 
   distance bigint, 
   timeStamp timestamp, 
   PRIMARY KEY (vehicleid)
);

数据生产者

这包含生成模拟测试数据并将其发布到Kafka主题iot-data-event的程序。这模拟了使用现实世界中的消息代理从连接的车辆接收的数据。

单个数据点是JSON有效负载,如下所示:

{
  <font>"vehicleId"</font><font>:</font><font>"0bf45cac-d1b8-4364-a906-980e1c2bdbcb"</font><font>,
  </font><font>"vehicleType"</font><font>:</font><font>"Taxi"</font><font>,
  </font><font>"routeId"</font><font>:</font><font>"Route-37"</font><font>,
  </font><font>"longitude"</font><font>:</font><font>"-95.255615"</font><font>,
  </font><font>"latitude"</font><font>:</font><font>"33.49808"</font><font>,
  </font><font>"timestamp"</font><font>:</font><font>"2017-10-16 12:31:03"</font><font>,
  </font><font>"speed"</font><font>:49.0,
  </font><font>"fuelLevel"</font><font>:38.0
}
</font>

消费者读取上面的iot-data-event主题,将每个这样的事件转换为YCQL INSERT语句,然后调用YugaByte DB持久化到事件表TrafficKeySpace.Origin_Table。

数据处理器

KSQL 是Apache Kafka的流式 SQL 引擎。它为Kafka上的流处理提供了一个易于使用但功能强大的交互式SQL接口,无需使用 JavaPython 等编程语言编写代码。它支持广泛的流操作,包括数据过滤,转换,聚合,连接,窗口和会话。

使用KSQL的第一步是STREAM从原始事件创建一个iot-data-event如下所示。

CREATE STREAM traffic_stream (
           vehicleId varchar,
           vehicleType varchar,
           routeId varchar,
           timeStamp varchar,
           latitude varchar,
           longitude varchar)
    WITH (
           KAFKA_TOPIC='iot-data-event',
           VALUE_FORMAT='json',
           TIMESTAMP='timeStamp',
           TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss');

现在可以在上面的流上运行各种聚合/查询,每种类型的查询的结果存储在它自己的新Kafka主题中。此应用程序使用3个此类查询/主题。此后, Kafka Connect YugaByte DB Sink Connector 读取这3个主题,并将结果保存到YugaByte DB中的3个相应表中。

CREATE TABLE total_traffic
     WITH ( PARTITIONS=1,
            KAFKA_TOPIC='total_traffic',
            TIMESTAMP='timeStamp',
            TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS
     SELECT routeId,
            vehicleType,
            count(vehicleId) AS totalCount,
            max(rowtime) AS timeStamp,
            TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate
     FROM traffic_stream
     GROUP BY routeId, vehicleType;

CREATE TABLE window_traffic
     WITH ( TIMESTAMP='timeStamp',
            KAFKA_TOPIC='window_traffic',
            TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss',
            PARTITIONS=1) AS
     SELECT routeId,
            vehicleType,
            count(vehicleId) AS totalCount,
            max(rowtime) AS timeStamp,
          TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate
     FROM traffic_stream
     WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
     GROUP BY routeId, vehicleType;

CREATE STREAM poi_traffic
      WITH ( PARTITIONS=1,
             KAFKA_TOPIC='poi_traffic',
             TIMESTAMP='timeStamp',
             TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS
      SELECT vehicleId,
             vehicleType,
             <b>cast</b>(GEO_DISTANCE(<b>cast</b>(latitude AS <b>double</b>),<b>cast</b>(longitude AS <b>double</b>),33.877495,-95.50238,'KM') AS bigint) AS distance,
             timeStamp
      FROM traffic_stream
      WHERE GEO_DISTANCE(<b>cast</b>(latitude AS <b>double</b>),<b>cast</b>(longitude AS <b>double</b>),33.877495,-95.50238,'KM') < 30;

所有 Kafka Connect YugaByte DB Sink Connector 用于存储两个原始事件以及集合数据(这是使用KSQL生成)。它计算如下:

  • 到目前为止所有车辆和货物的车辆类型和装运路线细分。
  • 仅针对有效货件计算上述细分。这是通过计算最近30秒的车辆类型和装运路线的细分来完成的。
  • 检测距离给定兴趣点(POI)20英里范围内的车辆,这表示道路封闭。

数据仪表板

这是一个 Spring Boot 应用程序,它从YugaByte DB查询数据,并使用 Web SocketsjQuery 将数据推送到网页。数据以固定间隔推送到网页,因此数据将自动刷新。主UI页面使用 bootstrap.js 显示包含图表和表格的仪表板。

我们为三个表创建实体类Total_Traffic,Window_Traffic并poi_traffic为所有实体扩展创建数据访问对象(DAO)接口CassandraRepository。例如,我们为TotalTrafficData实体创建DAO类,如下所示。

@Repository
<b>public</b> <b>interface</b> TotalTrafficDataRepository <b>extends</b> CassandraRepository<TotalTrafficData>{

     @Query(<font>"SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ?0 ALLOW FILTERING"</font><font>)
     Iterable<TotalTrafficData> findTrafficDataByDate(String date);
</font>

为了连接到YugaByte数据库集群并获得数据库操作的连接,我们还编写了一个 DatabaseConfig 类。

请注意,目前仪表板不访问原始事件表,仅依赖于存储在聚合表中的数据。

总结

此应用程序是使用Confluent Kafka,KSQL,Spring Boot和YugaByte DB构建IoT应用程序的蓝图。虽然这篇文章专注于本地集群部署,但Kafka代理和YugaByte数据库节点可以在真正的集群部署中进行水平扩展,以获得更多的应用程序吞吐量和容错能力。可以在 yb-iot-fleet-management GitHub仓库中找到构建和运行应用程序的说明以及源代码。​​​​​​​


以上所述就是小编给大家介绍的《使用Confluent Kafka,KSQL,Spring Boot和分布式SQL开发物联网应用程序》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Remote

Remote

Jason Fried、David Heinemeier Hansson / Crown Business / 2013-10-29 / CAD 26.95

The “work from home” phenomenon is thoroughly explored in this illuminating new book from bestselling 37signals founders Fried and Hansson, who point to the surging trend of employees working from hom......一起来看看 《Remote》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具