如何使 Hive 对 DynamoDB 进行并发读取?

栏目: 数据库 · 发布时间: 6年前

内容简介:DynamoDB 创建了类似如下的表:这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].

问题背景

DynamoDB 创建了类似如下的表:

{
 "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", // Primary key.
 "date": "2018-12-12",
 "when": "2018-12-12 17:22:15",
 "context": "something"
 }

这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。

我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].

hive> INSERT OVERWRITE TABLE  s3_table

Dynamodb表里存放了很多天的数据,执行以上操作会消耗很长时间,如何能加快速度?比如让 Hive 进行并发读取?

测试使用的 AWS EMR 版本为 emr-5.20.0 (Amazon 2.8.5, Ganglia 3.7.2, Hive 2.3.4, Hue 4.3.0, Mahout 0.13.0, Pig 0.17.0, Tez 0.9.1). emr-dynamodb connector 的版本是 4.7.

$ sudo rpm -qa | grep emr-ddb
 emr-ddb-hadoop-4.7.0-1.amzn1.noarch
 emr-ddb-hive-4.7.0-1.amzn1.noarch
 emr-ddb-4.7.0-1.amzn1.noarch

DynamoDB 的 Query 和 Scan 操作

DynamoDB 是个 NoSQL 数据库。在 Hive 操作中,如果 WHERE 后面的条件是 Primary Key,比如:

hive > SELECT * FROM ddb_table WHERE accessid="c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9";

这时,Hive 会进行 Query 操作,能直接拿到结果。可以理解为 O(1) 的操作。

而如果 WHERE 后面的条件只是个普通的 Attribute, 那 Hive 需要调用 Scan 操作,扫描整个表的条目,过滤出符合条件的条件。可以理解为 O(n) 的操作。比如:

hive> INSERT OVERWRITE TABLE  s3_table

Hive 需要读取整个表,对比每个条目的 date 是否为"2018-12-12". 因此,这个操作需要读取大量数据,消耗大量 DynamoDB 的 RCU.

Scan 操作是可以分 segment 的[2]。在这种情况下,如果 Hive 能在不同节点同时读取不同的 segment,这样可以加快读取速度。

Query 操作能直接读取 Key 对应的 Value, 没有并行的需要。

Hive 对 DynamoDB 的并发读取

按照上面说法,如果能让 Hive 启动多个 container 扫描 dynamodb table 的不同 segment, 可以起到并发加速的效果。

Hive 查询 DynamoDB 的动作实际上由 emr-dynamodb-connector 来实现。它会根据 DynamoDB 当前的 RCU 设定,来决定分多少个 segment 同时读取。一般来说,每 200 RCU 会启动一个 container。所以,如果我们把 DynamoDB 的 RCU 设置为 6000 (在 DynamoDB 侧设定), Hive 会同时启动 30 个 container 来读取数据。

hive> INSERT OVERWRITE TABLE s3_table
     > SELECT * FROM ddb_table;
 ----------------------------------------------------------------------------------------------
         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
 ----------------------------------------------------------------------------------------------
 Map 1 .......... container     SUCCEEDED     30         30        0        0       0       0
 ----------------------------------------------------------------------------------------------

具体逻辑可参考 emr-dynamodb-connector 相关代码[3].

使用合适的 DynamoDB Key 来用 Query 代替 Scan

尽管并发可以提高速度,但是当 DynamoDB table 增长后,scan 的耗时将会越来越长,而且加大 RCU 会产生不少的费用。

如果可以优化 DynamoDB Table 的设计,将 Scan 的操作转换为 Query,速度能极大地提升,且能节省费用。

考虑到 Hive 需要用 date 进行查询,建立 Global Secondary Index 或许是个方法[4]。简单地说,就是建立除了 Primary Key 之外的索引,使得查询这个 GSI 的时候,也能做到 O(1) 的效果。比如,在上述例子中,可以将 date 设置为 GSI. 这样,使用 date 作为查询条件,也可以调用 query 操作,不需要 scan. 可惜,现在的 emr-dynamodb-connector 还不支持 GSI,Hive 还会傻傻地去扫描整个 Table.[5]

为了绕过这个问题,我们可以把 Date 设置成 Primary Key, 把 when 设置成 sort key (因为一个 key 只能有一个 value, 所以要用一个组合 key 来避免覆盖),示例数据:

{
   "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", //
  

如此一来,使用 Hive 以 date 作为条件进行查询的时候,就会调用 query,加快速度并节省费用。

hive> INSERT OVERWRITE TABLE  ddb_table_1
     > SELECT * FROM ddb_tkio_ds WHERE ds="2018-12-18";
 ----------------------------------------------------------------------------------------------
         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
 ----------------------------------------------------------------------------------------------
 Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
 ----------------------------------------------------------------------------------------------
 VERTICES: 01/01  [==========================>>] 100%  ELAPSED TIME: 10.50 s
 ----------------------------------------------------------------------------------------------

几万条数据,只用了10秒就搞定。

修改 Table 后,应用程序需要相应修改来通过 GSI 提取数据。使用 GSI 需要指定 index-name。例如:

response = table.query(
     IndexName='accessid-index',  ##<---
     KeyConditionExpression=Key(''accessid").eq("c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9") 
)

具体可参考[6].

后续

前面提到,如果要使 Hive 并发进行 Scan,需要手动调大 RCU。长期调大 RCU 可能会产生大量费用。

目前,emr-dynamodb-connector 正在添加对 DynamoDB on-demand capacity 的支持[7]。如果这个功能最后能实现,用户可以把 DynamoDB table 设置成 On-Demand 模式,按实际的读写数量来收费。Hive 在进行 scan 的时候,也会自动地并发。

参考文档

[1] https://docs.amazonaws.cn/en_us/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html

[2] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan

[3] https://github.com/awslabs/emr-dynamodb-connector/blob/47a1a9f752b71767f981b284b522f6edb50c1370/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java#L98

[4] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html

[5] https://github.com/awslabs/emr-dynamodb-connector/issues/87

[6] https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query

[7] https://github.com/awslabs/emr-dynamodb-connector/issues/86


以上所述就是小编给大家介绍的《如何使 Hive 对 DynamoDB 进行并发读取?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Text Processing in Python

Text Processing in Python

David Mertz / Addison-Wesley Professional / 2003-6-12 / USD 54.99

Text Processing in Python describes techniques for manipulation of text using the Python programming language. At the broadest level, text processing is simply taking textual information and doing som......一起来看看 《Text Processing in Python》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码