实战 | 数据湖中的流式数据摄取之DeltaSteamer

栏目: IT技术 · 发布时间: 4年前

内容简介:2、可以跳过集成测试模块3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译

<hadoop.version>3.0.0</hadoop.version>

2、可以跳过集成测试模块

<modules>
    <module>hudi-common</module>
    <module>hudi-cli</module>
    <module>hudi-client</module>
    <module>hudi-hadoop-mr</module>
    <module>hudi-hive-sync</module>
    <module>hudi-spark</module>
    <module>hudi-timeline-service</module>
    <module>hudi-utilities</module>
    <module>packaging/hudi-hadoop-mr-bundle</module>
    <module>packaging/hudi-hive-sync-bundle</module>
    <module>packaging/hudi-spark-bundle</module>
    <module>packaging/hudi-presto-bundle</module>
    <module>packaging/hudi-utilities-bundle</module>
    <module>packaging/hudi-timeline-server-bundle</module>
    <module>docker/hoodie/hadoop</module>
    <!--<module>hudi-integ-test</module>-->  <!--将此注释即可-->
  </modules>

3、编译过后会得到hudi-utilities-bundle_2.11-0.6.0-SNAPSHOT.jar和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar两个我们所需要的jar包

二、使用hudi自带的DeltaStreamer工具写数据到hudi,开启--enable-hive-sync 即可同步数据到hive表

DeltaStreamer启动命令

spark-submit --master yarn   \      
  --driver-memory 1G \
  --num-executors 2 \
  --executor-memory 1G \
  --executor-cores 4 \
  --deploy-mode cluster \
  --conf spark.yarn.executor.memoryOverhead=512 \
  --conf spark.yarn.driver.memoryOverhead=512 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \
  --props hdfs://../kafka.properties \ '启动delateStreamer所需要的配置文件'
  --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
  --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \  '这里我选择从kafka消费json数据格式'
  --target-base-path hdfs://../business \ 'hudi数据存储地址'
  --op UPSERT \
  --target-table business  \    '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'
  --enable-hive-sync \          '开启同步至hive'
  --table-type MERGE_ON_READ \
  --source-ordering-field create_time \
  --source-limit 5000000

kafka.properties配置实例

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

三、查询

Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照。该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能

  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。

  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

MOR模式

如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询

使用saprk查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

#快照视图
spark.sql("select count(*) from dwd.test_rt").show()
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show()
#增量视图
spark sql暂不支持

使用hive查询

beeline -u jdbc:hive2://.. -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
 #读优化查询
 select * from dwd.test_ro;
 #快照查询
 select * from dwd.test_rt;
 #增量查询
 set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
 set hoodie.test.consume.mode=INCREMENTAL;
 set hoodie.test.consume.max.commits=3;
 set hoodie.test.consume.start.timestamp=20200427114546;
 select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';
 
 #注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,cloudera用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

COW模式

如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图

使用spark查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false

#快照视图
spark.sql("select count(*) from dwd.test").show()
//增量视图 无需遍历全部数据,即可获取时间大于20200426140637的数据
import org.apache.hudi.DataSourceReadOptions
val hoodieIncViewDF = spark.read.format("org.apache.hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200426140637").load("hdfs://..../t3_trip_t_business15")
spark.sql("select count(*) from dwd.test_rt where _hoodie_commit_time>'20200426140637'").show()  

使用hive查询

beeline -u jdbc:hive2://... -n ..  -p ..  \
  --hiveconf hive.stats.autogather=false \
  
  #快照查询
  select count(*) from dwd.test;
  #增量查询
  set hoodie.test.consume.mode=INCREMENTAL;
  set hoodie.test.consume.max.commits=3;
  set hoodie.test.consume.start.timestamp=20200427114546;
  select count(*) from  dwd.test where `_hoodie_commit_time` > '20200427114546';

注意:我们需要把和hudi-hadoop-mr-bundle-0.6.0-SNAPSHOT.jar放在hive辅助路径和hive/lib包下面。 cloud era用户需要在yarn配置界面,配置辅助路径地址(不支持hdfs目录)

实战 | 数据湖中的流式数据摄取之DeltaSteamer

实战 | 数据湖中的流式数据摄取之DeltaSteamer


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

掘金大数据

掘金大数据

程新洲、朱常波、晁昆 / 机械工业出版社 / 2019-1 / 59.00元

在数据横向融合的时代,充分挖掘数据金矿及盘活数据资产,是企业发展和转型的关键所在。电信运营商以其数据特殊性,必将成为大数据领域的领航者、生力军。各行业的大数据从业者要如何从电信业的大数据中挖掘价值呢? 本书彻底揭开电信运营商数据的神秘面纱,系统介绍了大数据的发展历程,主要的数据挖掘方法,电信运营商在网络运行及业务运营方面的数据资源特征,基于用户、业务、网络、终端及内在联系的电信运营商大数据分......一起来看看 《掘金大数据》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具