Uber永久定位系统实时数据分析过程实践!

栏目: 数据库 · 发布时间: 7年前

内容简介:根据Gartner所言,到2020年,每个智慧城市将使用约13.9亿辆联网汽车,这些汽车配备物联网传感器和其他设备。城市中的车辆定位和行为模式分析将有助于优化流量,更好的规划决策和进行更智能的广告投放。例如,对GPS汽车数据分析可以允许城市基于实时交通信息来优化交通流量。电信公司正在使用移动电话定位数据,识别和预测城市人口的位置活动趋势和生存区域。本文,我们将讨论在数据处理管道中使用Spark Structured Streaming对Uber事件数据进行聚类分析,以检测和可视化用户位置实践。(注:本文所

根据Gartner所言,到2020年,每个智慧城市将使用约13.9亿辆联网汽车,这些汽车配备物联网传感器和其他设备。城市中的车辆定位和行为模式分析将有助于优化流量,更好的规划决策和进行更智能的广告投放。例如,对GPS汽车数据分析可以允许城市基于实时交通信息来优化交通流量。电信公司正在使用移动电话定位数据,识别和预测城市人口的位置活动趋势和生存区域。

本文,我们将讨论在数据处理管道中使用Spark Structured Streaming对Uber事件数据进行聚类分析,以检测和可视化用户位置实践。(注:本文所用数据并非Uber内部实际用户数据,文末附具体代码或者示例获取渠道)

首先,我们回顾几个结构化流媒体涉及的概念,然后探讨端到端用例:

使用MapR-ES发布/订阅事件流

MapR-ES是一个分布式发布/订阅事件流系统,让生产者和消费者能够通过Apache Kafka API以并行和容错方式实时交换事件。

流表示从生产者到消费者的连续事件序列,其中事件被定义为键值对。

Uber永久定位系统实时数据分析过程实践!

topic是一个逻辑事件流,将事件按类别区分,并将生产者与消费者分离。topic按吞吐量和可伸缩性进行分区,MapR-ES可以扩展到非常高的吞吐量级别,使用普通硬件可以轻松实现每秒传输数百万条消息。

Uber永久定位系统实时数据分析过程实践!

你可以将分区视为事件日志:将新事件附加到末尾,并为其分配一个称为偏移的顺序ID号。

Uber永久定位系统实时数据分析过程实践! 与队列一样,事件按接收顺序传递。
Uber永久定位系统实时数据分析过程实践!

但是,与队列不同,消息在读取时不会被删除,它们保留在其他消费者可用分区。消息一旦发布,就不可变且永久保留。

Uber永久定位系统实时数据分析过程实践!

读取消息时不删除消息保证了大规模读取时的高性能,满足不同消费者针对不同目的(例如具有多语言持久性的多个视图)处理相同消息的需求。

Uber永久定位系统实时数据分析过程实践! Spark数据集,DataFrame,SQL

Spark数据集是分布在集群多个节点上类对象的分布式集合,可以使用map,flatMap,filter或Spark SQL来操纵数据集。DataFrame是Row对象的数据集,表示包含行和列的数据表。

Uber永久定位系统实时数据分析过程实践! Spark结构化流

结构化流是一种基于Spark SQL引擎的可扩展、可容错的流处理引擎。通过Structured Streaming,你可以将发布到Kafka的数据视为无界DataFrame,并使用与批处理相同的DataFrame,Dataset和SQL API处理此数据。

Uber永久定位系统实时数据分析过程实践!

随着流数据的不断传播,Spark SQL引擎会逐步持续处理并更新最终结果。

Uber永久定位系统实时数据分析过程实践!

事件的流处理对实时ETL、过滤、转换、创建计数器、聚合、关联值、丰富其他数据源或机器学习、持久化文件或数据库以及发布到管道的不同topic非常有用。

Uber永久定位系统实时数据分析过程实践! Spark结构化流示例代码

下面是Uber事件数据聚类分析用例的数据处理管道,用于检测位置。

Uber永久定位系统实时数据分析过程实践! 使用Kafka API将行车位置数据发布到MapR-ES topic

订阅该topic的Spark Streaming应用程序:

  • 输入Uber行车数据流;
  • 使用已部署的机器学习模型、集群ID和位置丰富行程数据;

在MapR-DB JSON中存储转换和丰富数据。

Uber永久定位系统实时数据分析过程实践!

用例数据示例

示例数据集是Uber旅行数据,传入数据是CSV格式,下面显示了一个示例,topic依次为:

日期/时间,纬度,经度,位置(base),反向时间戳

2014-08-06T05:29:00.000-07:00,40.7276,-74.0033,B02682,9223370505593280605

我们使用集群ID和位置丰富此数据,然后将其转换为以下JSON对象:

{  
"_id":0_922337050559328,  
"dt":"2014-08-01 08:51:00",  
"lat":40.6858,  
"lon":-73.9923,  
"base":"B02682",  
"cid":0,  
"clat":40.67462874550765,  
"clon":-73.98667466026531  
} 
Uber永久定位系统实时数据分析过程实践!

加载K-Means模型

Spark KMeansModel类用于加载k-means模型,该模型安装在历史uber行程数据上,然后保存到MapR-XD集群。接下来,创建集群中心ID和位置数据集,以便稍后与Uber旅行位置连接。

Uber永久定位系统实时数据分析过程实践!

集群中心下方显示在Zeppelin notebook中的Google地图上:

Uber永久定位系统实时数据分析过程实践!

从Kafka的topic中读取数据

为了从Kafka读取,我们必须首先指定流格式,topic和偏移选项。有关配置参数的详细信息,请参阅MapR Streams文档。

Uber永久定位系统实时数据分析过程实践!

这将返回具有以下架构的DataFrame:

Uber永久定位系统实时数据分析过程实践!

下一步是将二进制值列解析并转换为Uber对象的数据集。

将消息值解析为Uber对象的数据集

Scala Uber案例类定义与CSV记录对应的架构,parseUber函数将逗号分隔值字符串解析为Uber对象。

Uber永久定位系统实时数据分析过程实践!

在下面的代码中,我们使用parseUber函数注册一个用户自定义函数(UDF)来反序列化消息值字符串。我们在带有df1列值的String Cast的select表达式中使用UDF,该值返回Uber对象的DataFrame。

Uber永久定位系统实时数据分析过程实践! 使用集群中心ID和位置丰富的Uber对象数据集

VectorAssembler用于转换并返回一个新的DataFrame,其中包含向量列中的纬度和经度要素列。

Uber永久定位系统实时数据分析过程实践!
Uber永久定位系统实时数据分析过程实践!

k-means模型用于通过模型转换方法从特征中获取聚类,该方法返回具有聚类ID(标记为预测)的DataFrame。生成的数据集与先前创建的集群中心数据集(ccdf)连接,以创建UberC对象的数据集,其中包含与集群中心ID和位置相结合的行程信息。

Uber永久定位系统实时数据分析过程实践!
Uber永久定位系统实时数据分析过程实践!

最后的数据集转换是将唯一ID添加到对象以存储在MapR-DB JSON中。createUberwId函数创建一个唯一的ID,包含集群ID和反向时间戳。由于MapR-DB按id对行进行分区和排序,因此行将按簇的ID新旧时间进行排序。 此函数与map一起使用以创建UberwId对象的数据集。

Uber永久定位系统实时数据分析过程实践!
写入内存接收器

接下来,为了进行调试,我们可以开始接收数据并将数据作为内存表存储在内存中,然后进行查询。

Uber永久定位系统实时数据分析过程实践!

以下是来自 %sqlselect * from uber limit 10 的示例输出:

Uber永久定位系统实时数据分析过程实践!

现在我们可以查询流数据,询问哪段时间和集群内的搭乘次数最多?(输出显示在Zeppelin notebook中)

  1. %sql 

SELECT hour(uber.dt) as hr,cid, count(cid) as ct FROM uber group By hour(uber.dt), cid

Uber永久定位系统实时数据分析过程实践!

Spark Streaming写入MapR-DB

Uber永久定位系统实时数据分析过程实践!

用于Apache Spark的MapR-DB连接器使用户可以将MapR-DB用作Spark结构化流或Spark Streaming的接收器。

Uber永久定位系统实时数据分析过程实践!

当你处理大量流数据时,其中一个挑战是存储位置。对于此应用程序,可以选择MapR-DB JSON(一种高性能NoSQL数据库),因为它具有JSON的可伸缩性和灵活易用性。

JSON模式的灵活性

MapR-DB支持JSON文档作为本机数据存储。MapR-DB使用JSON文档轻松存储,查询和构建应用程序。Spark连接器可以轻松地在JSON数据和MapR-DB之间构建实时或批处理管道,并在管道中利用Spark。

Uber永久定位系统实时数据分析过程实践!

使用MapR-DB,表按集群的键范围自动分区,提供可扩展行和快速读写能力。在此用例中,行键_id由集群ID和反向时间戳组成,因此表将自动分区,并按最新的集群ID进行排序。

Uber永久定位系统实时数据分析过程实践!

Spark MapR-DB Connector利用Spark DataSource API。连接器体系结构在每个Spark Executor中都有一个连接对象,允许使用MapR-DB(分区)进行分布式并行写入,读取或扫描。

Uber永久定位系统实时数据分析过程实践!

写入MapR-DB接收器

要将Spark Stream写入MapR-DB,请使用tablePath,idFieldPath,createTable,bulkMode和sampleSize参数指定格式。以下示例将cdf DataFrame写到MapR-DB并启动流。

Uber永久定位系统实时数据分析过程实践!
Uber永久定位系统实时数据分析过程实践!

使用Spark SQL查询MapR-DB JSON

Spark MapR-DB Connector允许用户使用Spark数据集在MapR-DB之上执行复杂的 SQL 查询和更新,同时应用投影和过滤器下推,自定义分区和数据位置等关键技术。

Uber永久定位系统实时数据分析过程实践!

将数据从MapR-DB加载到Spark数据集中

要将MapR-DB JSON表中的数据加载到Apache Spark数据集,我们可在SparkSession对象上调用loadFromMapRDB方法,提供tableName,schema和case类。这将返回UberwId对象的数据集:

Uber永久定位系统实时数据分析过程实践!
Uber永久定位系统实时数据分析过程实践!

使用Spark SQL探索和查询Uber数据

现在,我们可以查询连续流入MapR-DB的数据,使用Spark DataFrames特定于域的语言或使用Spark SQL来询问。

显示第一行(注意行如何按_id分区和排序,_id由集群ID和反向时间戳组成,反向时间戳首先 排序 最近的行)。

df.show 
Uber永久定位系统实时数据分析过程实践!

每个集群发生多少次搭乘?

df.groupBy("cid").count().orderBy(desc( "count")).show 
Uber永久定位系统实时数据分析过程实践!

或者使用Spark SQL:

%sql SELECT COUNT(cid), cid FROM uber GROUP BY cid ORDER BY COUNT(cid) DESC 
Uber永久定位系统实时数据分析过程实践!

使用Zeppelin notebook中的Angular和Google Maps脚本,我们可以在地图上显示集群中心标记和最新的5000个旅行的位置,如下可看出最受欢迎的位置,比如位于曼哈顿的0、3、9。

Uber永久定位系统实时数据分析过程实践!

集群0最高搭乘次数出现在哪个小时?

df.filter($"\_id" <= "1")  
.select(hour($"dt").alias("hour"), $"cid")  
.groupBy("hour","cid").agg(count("cid")  
.alias("count"))show 
Uber永久定位系统实时数据分析过程实践!

一天中的哪个小时和哪个集群的搭乘次数最多?

%sql SELECT hour(uber.dt), cid, count(cid) FROM uber GROUP BY hour(uber.dt), cid 
Uber永久定位系统实时数据分析过程实践!

按日期时间显示uber行程的集群计数

%sql select cid, dt, count(cid) as count from uber group by dt, cid order by dt, cid limit 100 
Uber永久定位系统实时数据分析过程实践!

总结

本文涉及的知识点有Spark结构化流应用程序中的Spark Machine Learning模型、Spark结构化流与MapR-ES使用Kafka API摄取消息、SparkStructured Streaming持久化保存到MapR-DB,以持续快速地进行SQL分析等。此外,上述讨论过的用例体系结构所有组件都可与MapR数据平台在同一集群上运行。

Uber永久定位系统实时数据分析过程实践!

代码:

你可以从此处下载代码和数据以运行这些示例:https://github.com/caroljmcdonald/mapr-spark-structuredstreaming-uber

机器学习notebook的Zeppelin查看器:https://www.zepl.com/viewer/github/caroljmcdonald/mapr-spark-structuredstreaming-uber/blob/master/notebooks/SparkUberML.json

Spark结构化流notebook的Zeppelin查看器:https://www.zepl.com/viewer/github/caroljmcdonald/mapr-spark-structuredstreaming-uber/blob/master/notebooks/SparkUberStructuredStreaming.json

SparkSQL notebook的Zenpelin查看器:https://www.zepl.com/viewer/github/caroljmcdonald/mapr-spark-structuredstreaming-uber/blob/master/notebooks/SparkUberSQLMapR-DB.json

此代码包含在MapR 6.0.1沙箱上运行的说明,这是一个独立的VM以及教程和演示应用程序,可让用户快速使用MapR和Spark。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learn Python the Hard Way

Learn Python the Hard Way

Zed Shaw / Example Product Manufacturer / 2011

This is a very beginner book for people who want to learn to code. If you can already code then the book will probably drive you insane. It's intended for people who have no coding chops to build up t......一起来看看 《Learn Python the Hard Way》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具