实战 | 数据湖中的流式数据摄取之DeltaSteamer

栏目: IT技术 · 发布时间: 4年前

内容简介:2、可以跳过集成测试模块3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译

<hadoop.version>3.0.0</hadoop.version>

2、可以跳过集成测试模块

<modules>
    <module>hudi-common</module>
    <module>hudi-cli</module>
    <module>hudi-client</module>
    <module>hudi-hadoop-mr</module>
    <module>hudi-hive-sync</module>
    <module>hudi-spark</module>
    <module>hudi-timeline-service</module>
    <module>hudi-utilities</module>
    <module>packaging/hudi-hadoop-mr-bundle</module>
    <module>packaging/hudi-hive-sync-bundle</module>
    <module>packaging/hudi-spark-bundle</module>
    <module>packaging/hudi-presto-bundle</module>
    <module>packaging/hudi-utilities-bundle</module>
    <module>packaging/hudi-timeline-server-bundle</module>
    <module>docker/hoodie/hadoop</module>
    <!--<module>hudi-integ-test</module>-->  <!--将此注释即可-->
  </modules>

3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

二、使用hudi自带的DeltaStreamer工具写数据到hudi,开启--enable-hive-sync 即可同步数据到hive表

DeltaStreamer启动命令

spark-submit --master yarn   \      
  --driver-memory 1G \
  --num-executors 2 \
  --executor-memory 1G \
  --executor-cores 4 \
  --deploy-mode cluster \
  --conf spark.yarn.executor.memoryOverhead=512 \
  --conf spark.yarn.driver.memoryOverhead=512 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \
  --props hdfs://../kafka.properties \ '启动delateStreamer所需要的配置文件'
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \  '这里我选择从kafka消费json数据格式'
  --target-base-path hdfs://../business \ 'hudi数据存储地址'
  --op UPSERT \
  --target-table business  \    '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
  --enable-hive-sync \          '开启同步至hive'
  --table-type MERGE_ON_READ \
  --source-ordering-field create_time \
  --source-limit 5000000

kafka.properties配置实例

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

三、查询

Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能

  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

MOR模式

如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询

使用saprk查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

#快照视图
spark.sql("select count(*) from dwd.test_rt").show()
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show()
#增量视图
spark sql暂不支持

使用hive查询

beeline -u jdbc:hive2://.. -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
 #读优化查询
 select * from dwd.test_ro;
 #快照查询
 select * from dwd.test_rt;
 #增量查询
 set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
 set hoodie.test.consume.mode=INCREMENTAL;
 set hoodie.test.consume.max.commits=3;
 set hoodie.test.consume.start.timestamp=20200427114546;
 select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';
 
 #注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,cloudera用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

COW模式

如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图

使用spark查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false

#快照视图
spark.sql("select count(*) from dwd.test").show()
//增量视图 无需遍历全部数据,即可获取时间大于20200426140637的数据
import org.apache.hudi.DataSourceReadOptions
val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")
spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()  

使用hive查询

beeline -u jdbc:hive2://... -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
  #快照查询
  select count(*) from dwd.test;
  #增量查询
  set hoodie.test.consume.mode=INCREMENTAL;
  set hoodie.test.consume.max.commits=3;
  set hoodie.test.consume.start.timestamp=20200427114546;
  select count(*) from  dwd.test where `_hoodie_commit_time` > '20200427114546';

注意:我们需要把和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar放在hive辅助路径和hive/lib包下面。 cloud era用户需要在yarn配置界面,配置辅助路径地址(不支持hdfs目录)

实战 | 数据湖中的流式数据摄取之DeltaSteamer

实战 | 数据湖中的流式数据摄取之DeltaSteamer


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

有限与无限的游戏

有限与无限的游戏

[美]詹姆斯·卡斯 / 马小悟、余倩 / 电子工业出版社 / 2013-10 / 35.00元

在这本书中,詹姆斯·卡斯向我们展示了世界上两种类型的「游戏」:「有限的游戏」和「无限的游戏」。 有限的游戏,其目的在于赢得胜利;无限的游戏,却旨在让游戏永远进行下去。有限的游戏在边界内玩,无限的游戏玩的就是边界。有限的游戏具有一个确定的开始和结束,拥有特定的赢家,规则的存在就是为了保证游戏会结束。无限的游戏既没有确定的开始和结束,也没有赢家,它的目的在于将更多的人带入到游戏本身中来,从而延续......一起来看看 《有限与无限的游戏》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器