如何使 Hive 对 DynamoDB 进行并发读取?

栏目: 数据库 · 发布时间: 5年前

内容简介:DynamoDB 创建了类似如下的表:这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].

问题背景

DynamoDB 创建了类似如下的表:

{
 "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", // Primary key.
 "date": "2018-12-12",
 "when": "2018-12-12 17:22:15",
 "context": "something"
 }

这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。

我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].

hive> INSERT OVERWRITE TABLE  s3_table

Dynamodb表里存放了很多天的数据,执行以上操作会消耗很长时间,如何能加快速度?比如让 Hive 进行并发读取?

测试使用的 AWS EMR 版本为 emr-5.20.0 (Amazon 2.8.5, Ganglia 3.7.2, Hive 2.3.4, Hue 4.3.0, Mahout 0.13.0, Pig 0.17.0, Tez 0.9.1). emr-dynamodb connector 的版本是 4.7.

$ sudo rpm -qa | grep emr-ddb
 emr-ddb-hadoop-4.7.0-1.amzn1.noarch
 emr-ddb-hive-4.7.0-1.amzn1.noarch
 emr-ddb-4.7.0-1.amzn1.noarch

DynamoDB 的 Query 和 Scan 操作

DynamoDB 是个 NoSQL 数据库。在 Hive 操作中,如果 WHERE 后面的条件是 Primary Key,比如:

hive > SELECT * FROM ddb_table WHERE accessid="c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9";

这时,Hive 会进行 Query 操作,能直接拿到结果。可以理解为 O(1) 的操作。

而如果 WHERE 后面的条件只是个普通的 Attribute, 那 Hive 需要调用 Scan 操作,扫描整个表的条目,过滤出符合条件的条件。可以理解为 O(n) 的操作。比如:

hive> INSERT OVERWRITE TABLE  s3_table

Hive 需要读取整个表,对比每个条目的 date 是否为"2018-12-12". 因此,这个操作需要读取大量数据,消耗大量 DynamoDB 的 RCU.

Scan 操作是可以分 segment 的[2]。在这种情况下,如果 Hive 能在不同节点同时读取不同的 segment,这样可以加快读取速度。

Query 操作能直接读取 Key 对应的 Value, 没有并行的需要。

Hive 对 DynamoDB 的并发读取

按照上面说法,如果能让 Hive 启动多个 container 扫描 dynamodb table 的不同 segment, 可以起到并发加速的效果。

Hive 查询 DynamoDB 的动作实际上由 emr-dynamodb-connector 来实现。它会根据 DynamoDB 当前的 RCU 设定,来决定分多少个 segment 同时读取。一般来说,每 200 RCU 会启动一个 container。所以,如果我们把 DynamoDB 的 RCU 设置为 6000 (在 DynamoDB 侧设定), Hive 会同时启动 30 个 container 来读取数据。

hive> INSERT OVERWRITE TABLE s3_table
     > SELECT * FROM ddb_table;
 ----------------------------------------------------------------------------------------------
         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
 ----------------------------------------------------------------------------------------------
 Map 1 .......... container     SUCCEEDED     30         30        0        0       0       0
 ----------------------------------------------------------------------------------------------

具体逻辑可参考 emr-dynamodb-connector 相关代码[3].

使用合适的 DynamoDB Key 来用 Query 代替 Scan

尽管并发可以提高速度,但是当 DynamoDB table 增长后,scan 的耗时将会越来越长,而且加大 RCU 会产生不少的费用。

如果可以优化 DynamoDB Table 的设计,将 Scan 的操作转换为 Query,速度能极大地提升,且能节省费用。

考虑到 Hive 需要用 date 进行查询,建立 Global Secondary Index 或许是个方法[4]。简单地说,就是建立除了 Primary Key 之外的索引,使得查询这个 GSI 的时候,也能做到 O(1) 的效果。比如,在上述例子中,可以将 date 设置为 GSI. 这样,使用 date 作为查询条件,也可以调用 query 操作,不需要 scan. 可惜,现在的 emr-dynamodb-connector 还不支持 GSI,Hive 还会傻傻地去扫描整个 Table.[5]

为了绕过这个问题,我们可以把 Date 设置成 Primary Key, 把 when 设置成 sort key (因为一个 key 只能有一个 value, 所以要用一个组合 key 来避免覆盖),示例数据:

{
   "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", //
  

如此一来,使用 Hive 以 date 作为条件进行查询的时候,就会调用 query,加快速度并节省费用。

hive> INSERT OVERWRITE TABLE  ddb_table_1
     > SELECT * FROM ddb_tkio_ds WHERE ds="2018-12-18";
 ----------------------------------------------------------------------------------------------
         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
 ----------------------------------------------------------------------------------------------
 Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
 ----------------------------------------------------------------------------------------------
 VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 10.50 s
 ----------------------------------------------------------------------------------------------

几万条数据,只用了10秒就搞定。

修改 Table 后,应用程序需要相应修改来通过 GSI 提取数据。使用 GSI 需要指定 index-name。例如:

response = table.query(
     IndexName='accessid-index',  ##<---
     KeyConditionExpression=Key(''accessid").eq("c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9") 
)

具体可参考[6].

后续

前面提到,如果要使 Hive 并发进行 Scan,需要手动调大 RCU。长期调大 RCU 可能会产生大量费用。

目前,emr-dynamodb-connector 正在添加对 DynamoDB on-demand capacity 的支持[7]。如果这个功能最后能实现,用户可以把 DynamoDB table 设置成 On-Demand 模式,按实际的读写数量来收费。Hive 在进行 scan 的时候,也会自动地并发。

参考文档

[1] https://docs.amazonaws.cn/en_us/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html

[2] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan

[3] https://github.com/awslabs/emr-dynamodb-connector/blob/47a1a9f752b71767f981b284b522f6edb50c1370/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java#L98

[4] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html

[5] https://github.com/awslabs/emr-dynamodb-connector/issues/87

[6] https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query

[7] https://github.com/awslabs/emr-dynamodb-connector/issues/86


以上所述就是小编给大家介绍的《如何使 Hive 对 DynamoDB 进行并发读取?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Tangled Web

The Tangled Web

Michal Zalewski / No Starch Press / 2011-11-26 / USD 49.95

"Thorough and comprehensive coverage from one of the foremost experts in browser security." -Tavis Ormandy, Google Inc. Modern web applications are built on a tangle of technologies that have been de......一起来看看 《The Tangled Web》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

SHA 加密
SHA 加密

SHA 加密工具