内容简介:DynamoDB 创建了类似如下的表:这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].
问题背景
DynamoDB 创建了类似如下的表:
{ "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", // Primary key. "date": "2018-12-12", "when": "2018-12-12 17:22:15", "context": "something" }
这个表用来记录访问记录,以 accessid (随机字符串) 作为 Primary key. Date 记录当条记录的生成日期。
我们希望通过 Hive 每天将前一天的数据从 DynamoDB 备份到 S3 上,具体方法可参考文档[1].
hive> INSERT OVERWRITE TABLE s3_table
Dynamodb表里存放了很多天的数据,执行以上操作会消耗很长时间,如何能加快速度?比如让 Hive 进行并发读取?
测试使用的 AWS EMR 版本为 emr-5.20.0 (Amazon 2.8.5, Ganglia 3.7.2, Hive 2.3.4, Hue 4.3.0, Mahout 0.13.0, Pig 0.17.0, Tez 0.9.1). emr-dynamodb connector 的版本是 4.7.
$ sudo rpm -qa | grep emr-ddb emr-ddb-hadoop-4.7.0-1.amzn1.noarch emr-ddb-hive-4.7.0-1.amzn1.noarch emr-ddb-4.7.0-1.amzn1.noarch
DynamoDB 的 Query 和 Scan 操作
DynamoDB 是个 NoSQL 数据库。在 Hive 操作中,如果 WHERE 后面的条件是 Primary Key,比如:
hive > SELECT * FROM ddb_table WHERE accessid="c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9";
这时,Hive 会进行 Query 操作,能直接拿到结果。可以理解为 O(1) 的操作。
而如果 WHERE 后面的条件只是个普通的 Attribute, 那 Hive 需要调用 Scan 操作,扫描整个表的条目,过滤出符合条件的条件。可以理解为 O(n) 的操作。比如:
hive> INSERT OVERWRITE TABLE s3_table
Hive 需要读取整个表,对比每个条目的 date 是否为"2018-12-12". 因此,这个操作需要读取大量数据,消耗大量 DynamoDB 的 RCU.
Scan 操作是可以分 segment 的[2]。在这种情况下,如果 Hive 能在不同节点同时读取不同的 segment,这样可以加快读取速度。
Query 操作能直接读取 Key 对应的 Value, 没有并行的需要。
Hive 对 DynamoDB 的并发读取
按照上面说法,如果能让 Hive 启动多个 container 扫描 dynamodb table 的不同 segment, 可以起到并发加速的效果。
Hive 查询 DynamoDB 的动作实际上由 emr-dynamodb-connector 来实现。它会根据 DynamoDB 当前的 RCU 设定,来决定分多少个 segment 同时读取。一般来说,每 200 RCU 会启动一个 container。所以,如果我们把 DynamoDB 的 RCU 设置为 6000 (在 DynamoDB 侧设定), Hive 会同时启动 30 个 container 来读取数据。
hive> INSERT OVERWRITE TABLE s3_table > SELECT * FROM ddb_table; ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 .......... container SUCCEEDED 30 30 0 0 0 0 ----------------------------------------------------------------------------------------------
具体逻辑可参考 emr-dynamodb-connector 相关代码[3].
使用合适的 DynamoDB Key 来用 Query 代替 Scan
尽管并发可以提高速度,但是当 DynamoDB table 增长后,scan 的耗时将会越来越长,而且加大 RCU 会产生不少的费用。
如果可以优化 DynamoDB Table 的设计,将 Scan 的操作转换为 Query,速度能极大地提升,且能节省费用。
考虑到 Hive 需要用 date 进行查询,建立 Global Secondary Index 或许是个方法[4]。简单地说,就是建立除了 Primary Key 之外的索引,使得查询这个 GSI 的时候,也能做到 O(1) 的效果。比如,在上述例子中,可以将 date 设置为 GSI. 这样,使用 date 作为查询条件,也可以调用 query 操作,不需要 scan. 可惜,现在的 emr-dynamodb-connector 还不支持 GSI,Hive 还会傻傻地去扫描整个 Table.[5]
为了绕过这个问题,我们可以把 Date 设置成 Primary Key, 把 when 设置成 sort key (因为一个 key 只能有一个 value, 所以要用一个组合 key 来避免覆盖),示例数据:
{ "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", //如此一来,使用 Hive 以 date 作为条件进行查询的时候,就会调用 query,加快速度并节省费用。
hive> INSERT OVERWRITE TABLE ddb_table_1 > SELECT * FROM ddb_tkio_ds WHERE ds="2018-12-18"; ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 .......... container SUCCEEDED 1 1 0 0 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 10.50 s ----------------------------------------------------------------------------------------------几万条数据,只用了10秒就搞定。
修改 Table 后,应用程序需要相应修改来通过 GSI 提取数据。使用 GSI 需要指定 index-name。例如:
response = table.query( IndexName='accessid-index', ##<--- KeyConditionExpression=Key(''accessid").eq("c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9") )具体可参考[6].
后续
前面提到,如果要使 Hive 并发进行 Scan,需要手动调大 RCU。长期调大 RCU 可能会产生大量费用。
目前,emr-dynamodb-connector 正在添加对 DynamoDB on-demand capacity 的支持[7]。如果这个功能最后能实现,用户可以把 DynamoDB table 设置成 On-Demand 模式,按实际的读写数量来收费。Hive 在进行 scan 的时候,也会自动地并发。
参考文档
[1] https://docs.amazonaws.cn/en_us/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
[2] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan
[3] https://github.com/awslabs/emr-dynamodb-connector/blob/47a1a9f752b71767f981b284b522f6edb50c1370/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java#L98
[4] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html
[5] https://github.com/awslabs/emr-dynamodb-connector/issues/87
[6] https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
[7] https://github.com/awslabs/emr-dynamodb-connector/issues/86
以上所述就是小编给大家介绍的《如何使 Hive 对 DynamoDB 进行并发读取?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。