「解决方案」Kinesis 分片如何根据流量进行弹性伸缩

栏目: 数据库 · 发布时间: 5年前

内容简介:Kinesis是AWS上实时流数据处理平台,可以实时收集、处理和分析流中的数据。kinesis的相关原理 可以对标 kafka,但是比kafka相对简单,下面聊一聊分片带来的问题。Kinesis的每一个分片接收数据的大小和记录数都有相关限制:单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录。

Kinesis是AWS上实时流数据处理平台,可以实时收集、处理和分析流中的数据。

「解决方案」Kinesis 分片如何根据流量进行弹性伸缩

问题:

kinesis的相关原理 可以对标 kafka,但是比kafka相对简单,下面聊一聊分片带来的问题。

Kinesis的每一个分片接收数据的大小和记录数都有相关限制:单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录。

当遇到流量的高峰和低谷的时候,分片的大小不是固定的,当然你可以一直选择一个高峰时分片数,但这个在每天流量低谷的时候,分片就显得比较冗余,也浪费钱。

所以需要根据实时监控流量调整分片数量显得非常必要了,AWS只是提供了能修改分片的API,如何调整,就需要用户自己来调整策略了。

如果流量超过分片的限制,则会直接抛出以下异常: botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation: Rate exceeded for shard shardId-000000000008 in stream

接下来聊一聊如何根据流量弹性调整分片策略。

指标:

先来认识几个指标,Kinesis会向CloudWatch每一分钟发送分片和流的监控指标:

指标 定义
GetRecords.Bytes 在指定时段内从 Kinesis 流检索的字节数。
GetRecords.Records 在指定时段内测量的从分片中检索的记录数。
IncomingBytes 在指定时段内成功放置到 Kinesis 流的字节数。该指标包含来自 PutRecord 和 PutRecords 的字节数。
IncomingRecords 在指定时段内成功放置到 Kinesis 流的记录数。

主要是这四个指标,其中IncomingBytes可以获取Stream和ShardId两个维度的数据,即整个DataStream流的接收的字节数,以及每一个分片所接收的字节数。分片的指标需要额外单独在Kinesis界面开放,默认是不开放的。

同时每一个指标的统计数据维度有:Minimum、Maximum、Average、Sum。

可以简单理解为 IncomingRecords 和 GetRecords.Records 就是选择的间隔时间内流入和流出的记录数。而 GetRecords.Bytes 和 IncomingBytes 分别表示流出和流入的字节数。

下面代码实现如何获取相关指标:

def get_metric(self,
                   metric_name,
                   minutes_period=5,
                   statistics_model="Sum",
                   shard_id=None):
        start_time = datetime.datetime.utcnow() - datetime.timedelta(
            minutes=minutes_period)
        end_time = datetime.datetime.utcnow()
        dimensions = []
        dimensions.append(
            dict(Name="StreamName", Value=api_config.KINESIS_STREAM_NAME))
        if shard_id:
            dimensions.append(dict(Name="ShardId", Value=shard_id))
        result = self.cloudwatch_client.get_metric_statistics(
            Namespace='AWS/Kinesis',
            MetricName=metric_name,
            Dimensions=dimensions,
            StartTime=start_time,
            EndTime=end_time,
            Period=60,
            Statistics=[
                statistics_model
            ])
        max_value = 0
        for item in result.get("Datapoints", []):
            if item[statistics_model] > max_value:
                max_value = item[statistics_model]
        return max_value
复制代码

上面函数的目标就是获取指定指标 最近5分钟内 每一分钟的最大值。比如想获取 IncomingRecords 指标最近5分钟内间隔时间为1分钟的最大记录数。 get_metric("IncomingRecords") 即可。 result 会返回5条记录,拿出最大值即可。

如何获取当前分片数:

流量监控指标获取到之后,接下来如何获取当前分片数了,Kinesis 的 python 包这一点非常尴尬,居然没有可以直接获取当前打开的分片数,为什么是需要打开的分片数了,因为关闭的分片数信息在 Kinesis 包的获取分片的API里也会返回,比如最近一段时间内,如果有操作分片数量,则AWS会关掉或者打开一些分片,关掉的分片不会再接收数据,但里面有的数据依然可以被消费者消费,从而保证数据在修改分片的操作上不会丢失,这样就会导致获取分片列表的时候,关闭的分片依然会返回,返回的数据中也没有状态标注哪个分片是打开或者关闭。

上面聊到的 IncomingBytes 关于分片维度的指标就起作用了,可以通过循环枚举所有分片找出可以接收数据的分片,则说明均是打开的分片。

代码部分如下:

def get_shards_count(self, shards_data=None):
        if not shards_data:
            result = self.kinesis_client.list_shards(
                StreamName=api_config.KINESIS_STREAM_NAME)
            shards_data = result.get("Shards", [])
        shards_count = 0
        for item in shards_data:
            shard_id = item["ShardId"]
            shard_incoming_bytes = self.get_metric(
                "IncomingBytes", shard_id=shard_id)
            if shard_incoming_bytes > 0:
                shards_count += 1
        return shards_count
复制代码

相关API介绍:

AWS提供3个API,可以重新分片:

update_shard_count
split_shard
merge_shards

update_shard_count 最简单,直接调取更新分片数量即可,但此API有一些限制: 查看官网资料:

This operation has the following default limits. By default, you cannot do the following:

1.Scale more than twice per rolling 24-hour period per stream
2.Scale up to more than double your current shard count for a stream
3.Scale down below half your current shard count for a stream
复制代码

主要是这三点,第一点最蛋疼,24小时内只能操作2次API,但这个可以提写工单向AWS客服申请多次。第二点和第三点是说伸缩的分片的时候,不能超过当前分片的2倍或不能低于当前分片的一半。

split_shard 和 merge_shards 属于 Kinesis 的高级主题,这一篇文章暂时不涉及详细讲解,简单提一下场景,主要用于单个分片的流量超过阈值,可以对此分片进行一分为二,能够分当然也能够合,里面也有一些限制,以后再聊这个话题。

弹性伸缩策略:

重点来了,先来了解一下分片的计算方式:

  • 写入流的数据记录的平均大小(以 KB 为单位,向上取整为 1 KB),数据大小 (average_data_size_in_KiB)。

  • 每秒写入流和从流读取的数据记录数 (records_per_second)。

  • 并发且独立使用流中数据的 Kinesis Data Streams 应用程序的数量,即使用者数量 (number_of_consumers)。

  • 以 KB 为单位的传入写入带宽 (incoming_write_bandwidth_in_KiB),等于 average_data_size_in_KiB 乘以 records_per_second。

  • 以 KB 为单位的传出读取带宽 (outgoing_read_bandwidth_in_KiB),等于 incoming_write_bandwidth_in_KiB 乘以 number_of_consumers。

可使用以下公式中的输入值来计算流所需的分片的数量 (number_of_shards):

number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)
复制代码

一一来获取上面提到的指标参数:

  • average_data_size_in_KiB 就是写入流数据记录的平均大小,即通过 IncomingBytes ,维度是 Average 即可获取。

  • records_per_second 通过 GetRecords.Records 和 IncomingRecords 一分钟记录之和 除以 60s 即可。

  • number_of_consumers 就是消费者,这个可以通过函数获取,这个我们默认为1.

根据公式 number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048) , 因为我们消费者数目是1 ,那么其实 outgoing_read_bandwidth_in_KiB / 2048 其实就是 ncoming_write_bandwidth_in_KiB / 2048。 显然:

number_of_shards = incoming_write_bandwidth_in_KiB/1024

这是第一种获取当前需要的分片数的方案。

第二种方案,也可以简单粗暴的根据 当前流中的 总字节数 除以 每一个分片所能容纳的1M即可。

公式:

(SUM(IncomingBytes) + SUM(GetRecords.Bytes)) / 1024 / 1024 / Period 其中Period 这里就是 60s。

代码实现部分:

def scaling(self):
        average_incoming_bytes = self.get_metric(
            "IncomingBytes", statistics_model="Average")
        outgoing_records = self.get_metric("GetRecords.Records")
        incoming_records = self.get_metric("IncomingRecords")
        records_per_secord = int(
            (outgoing_records + incoming_records) / self.METRIC_PERIOD)
        number_of_shards_1 = round(
            average_incoming_bytes * records_per_secord / 1024 / 1000, 4)

        sum_incoming_bytes = self.get_metric("IncomingBytes")
        sum_outgoing_bytes = self.get_metric("GetRecords.Bytes")
        sum_bytes = sum_incoming_bytes + sum_outgoing_bytes
        number_of_shards_2 = round(
            sum_bytes / 1024 / 1024 / 60, 4)

        return max(number_of_shards_1, number_of_shards_2)
复制代码

number_of_shards_1 和 number_of_shards_2 就是方案一和方案二的实现。

拿到了当前所需要的分片数,接下来就是根据实际分片数来调整了,代码如下:

def execute(self):
        cur_shards_count = self.get_shards_count()
        number_of_shards = self.scaling()
        need_shards = int(number_of_shards) + 1
        shard_ratio = round(number_of_shards * 100 % 100, 2)
        
        if shard_ratio > int(api_config.TRIGGER_SCALING_UP_SHARD_RATIO):
            need_shards += 1
        elif shard_ratio < int(api_config.TRIGGER_SCALING_DOWN_SHARD_RATIO):
            need_scaling_down = True

        if need_scaling_down and cur_shards_count > need_shards:
            need_shards = cur_shards_count - 1
复制代码

need_shards就是我们需要更新的分片数,调取API update_shard_count 即可。

上面代码描述叫简单,提供2个环境变量,TRIGGER_SCALING_UP_SHARD_RATIO 触发增加分片的比率, TRIGGER_SCALING_DOWN_SHARD_RATIO 降低分片的比率。比如 cur_shards_count = 3, number_of_shards = 2.7, 那么 shard_ratio = 70% 如果设置 TRIGGER_SCALING_UP_SHARD_RATIO = 60%,那么 need_shards = 4,则需要扩展分片为4个,缩小的方案也是如此。

上面描述简单,实际过程中需要控制伸缩的频率,而且AWS也有频率限制,如何在合适的时机伸缩,需要根据业务的实际过程进行分析。同时还要上报更新的日志,存储上一次更新的记录数据等等。

Kinesis 根据流量弹性伸缩分片就分享到这里,欢迎大家在评论区一起讨论更优方案。

更多精彩文章请关注公众号: 『天澄技术杂谈』

「解决方案」Kinesis 分片如何根据流量进行弹性伸缩

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

实战Java高并发程序设计

实战Java高并发程序设计

葛一鸣、郭超 / 电子工业出版社 / 2015-10-1 / CNY 69.00

在过去单核CPU时代,单任务在一个时间点只能执行单一程序,随着多核CPU的发展,并行程序开发就显得尤为重要。 《实战Java高并发程序设计》主要介绍基于Java的并行程序设计基础、思路、方法和实战。第一,立足于并发程序基础,详细介绍Java中进行并行程序设计的基本方法。第二,进一步详细介绍JDK中对并行程序的强大支持,帮助读者快速、稳健地进行并行程序开发。第三,详细讨论有关“锁”的优化和提高......一起来看看 《实战Java高并发程序设计》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具