「解决方案」Kinesis 分片如何根据流量进行弹性伸缩

栏目: 数据库 · 发布时间: 6年前

内容简介:Kinesis是AWS上实时流数据处理平台,可以实时收集、处理和分析流中的数据。kinesis的相关原理 可以对标 kafka,但是比kafka相对简单,下面聊一聊分片带来的问题。Kinesis的每一个分片接收数据的大小和记录数都有相关限制:单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录。

Kinesis是AWS上实时流数据处理平台,可以实时收集、处理和分析流中的数据。

「解决方案」Kinesis 分片如何根据流量进行弹性伸缩

问题:

kinesis的相关原理 可以对标 kafka,但是比kafka相对简单,下面聊一聊分片带来的问题。

Kinesis的每一个分片接收数据的大小和记录数都有相关限制:单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录。

当遇到流量的高峰和低谷的时候,分片的大小不是固定的,当然你可以一直选择一个高峰时分片数,但这个在每天流量低谷的时候,分片就显得比较冗余,也浪费钱。

所以需要根据实时监控流量调整分片数量显得非常必要了,AWS只是提供了能修改分片的API,如何调整,就需要用户自己来调整策略了。

如果流量超过分片的限制,则会直接抛出以下异常: botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation: Rate exceeded for shard shardId-000000000008 in stream

接下来聊一聊如何根据流量弹性调整分片策略。

指标:

先来认识几个指标,Kinesis会向CloudWatch每一分钟发送分片和流的监控指标:

指标 定义
GetRecords.Bytes 在指定时段内从 Kinesis 流检索的字节数。
GetRecords.Records 在指定时段内测量的从分片中检索的记录数。
IncomingBytes 在指定时段内成功放置到 Kinesis 流的字节数。该指标包含来自 PutRecord 和 PutRecords 的字节数。
IncomingRecords 在指定时段内成功放置到 Kinesis 流的记录数。

主要是这四个指标,其中IncomingBytes可以获取Stream和ShardId两个维度的数据,即整个DataStream流的接收的字节数,以及每一个分片所接收的字节数。分片的指标需要额外单独在Kinesis界面开放,默认是不开放的。

同时每一个指标的统计数据维度有:Minimum、Maximum、Average、Sum。

可以简单理解为 IncomingRecords 和 GetRecords.Records 就是选择的间隔时间内流入和流出的记录数。而 GetRecords.Bytes 和 IncomingBytes 分别表示流出和流入的字节数。

下面代码实现如何获取相关指标:

def get_metric(self,
                   metric_name,
                   minutes_period=5,
                   statistics_model="Sum",
                   shard_id=None):
        start_time = datetime.datetime.utcnow() - datetime.timedelta(
            minutes=minutes_period)
        end_time = datetime.datetime.utcnow()
        dimensions = []
        dimensions.append(
            dict(Name="StreamName", Value=api_config.KINESIS_STREAM_NAME))
        if shard_id:
            dimensions.append(dict(Name="ShardId", Value=shard_id))
        result = self.cloudwatch_client.get_metric_statistics(
            Namespace='AWS/Kinesis',
            MetricName=metric_name,
            Dimensions=dimensions,
            StartTime=start_time,
            EndTime=end_time,
            Period=60,
            Statistics=[
                statistics_model
            ])
        max_value = 0
        for item in result.get("Datapoints", []):
            if item[statistics_model] > max_value:
                max_value = item[statistics_model]
        return max_value
复制代码

上面函数的目标就是获取指定指标 最近5分钟内 每一分钟的最大值。比如想获取 IncomingRecords 指标最近5分钟内间隔时间为1分钟的最大记录数。 get_metric("IncomingRecords") 即可。 result 会返回5条记录,拿出最大值即可。

如何获取当前分片数:

流量监控指标获取到之后,接下来如何获取当前分片数了,Kinesis 的 python 包这一点非常尴尬,居然没有可以直接获取当前打开的分片数,为什么是需要打开的分片数了,因为关闭的分片数信息在 Kinesis 包的获取分片的API里也会返回,比如最近一段时间内,如果有操作分片数量,则AWS会关掉或者打开一些分片,关掉的分片不会再接收数据,但里面有的数据依然可以被消费者消费,从而保证数据在修改分片的操作上不会丢失,这样就会导致获取分片列表的时候,关闭的分片依然会返回,返回的数据中也没有状态标注哪个分片是打开或者关闭。

上面聊到的 IncomingBytes 关于分片维度的指标就起作用了,可以通过循环枚举所有分片找出可以接收数据的分片,则说明均是打开的分片。

代码部分如下:

def get_shards_count(self, shards_data=None):
        if not shards_data:
            result = self.kinesis_client.list_shards(
                StreamName=api_config.KINESIS_STREAM_NAME)
            shards_data = result.get("Shards", [])
        shards_count = 0
        for item in shards_data:
            shard_id = item["ShardId"]
            shard_incoming_bytes = self.get_metric(
                "IncomingBytes", shard_id=shard_id)
            if shard_incoming_bytes > 0:
                shards_count += 1
        return shards_count
复制代码

相关API介绍:

AWS提供3个API,可以重新分片:

update_shard_count
split_shard
merge_shards

update_shard_count 最简单,直接调取更新分片数量即可,但此API有一些限制: 查看官网资料:

This operation has the following default limits. By default, you cannot do the following:

1.Scale more than twice per rolling 24-hour period per stream
2.Scale up to more than double your current shard count for a stream
3.Scale down below half your current shard count for a stream
复制代码

主要是这三点,第一点最蛋疼,24小时内只能操作2次API,但这个可以提写工单向AWS客服申请多次。第二点和第三点是说伸缩的分片的时候,不能超过当前分片的2倍或不能低于当前分片的一半。

split_shard 和 merge_shards 属于 Kinesis 的高级主题,这一篇文章暂时不涉及详细讲解,简单提一下场景,主要用于单个分片的流量超过阈值,可以对此分片进行一分为二,能够分当然也能够合,里面也有一些限制,以后再聊这个话题。

弹性伸缩策略:

重点来了,先来了解一下分片的计算方式:

  • 写入流的数据记录的平均大小(以 KB 为单位,向上取整为 1 KB),数据大小 (average_data_size_in_KiB)。

  • 每秒写入流和从流读取的数据记录数 (records_per_second)。

  • 并发且独立使用流中数据的 Kinesis Data Streams 应用程序的数量,即使用者数量 (number_of_consumers)。

  • 以 KB 为单位的传入写入带宽 (incoming_write_bandwidth_in_KiB),等于 average_data_size_in_KiB 乘以 records_per_second。

  • 以 KB 为单位的传出读取带宽 (outgoing_read_bandwidth_in_KiB),等于 incoming_write_bandwidth_in_KiB 乘以 number_of_consumers。

可使用以下公式中的输入值来计算流所需的分片的数量 (number_of_shards):

number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)
复制代码

一一来获取上面提到的指标参数:

  • average_data_size_in_KiB 就是写入流数据记录的平均大小,即通过 IncomingBytes ,维度是 Average 即可获取。

  • records_per_second 通过 GetRecords.Records 和 IncomingRecords 一分钟记录之和 除以 60s 即可。

  • number_of_consumers 就是消费者,这个可以通过函数获取,这个我们默认为1.

根据公式 number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048) , 因为我们消费者数目是1 ,那么其实 outgoing_read_bandwidth_in_KiB / 2048 其实就是 ncoming_write_bandwidth_in_KiB / 2048。 显然:

number_of_shards = incoming_write_bandwidth_in_KiB/1024

这是第一种获取当前需要的分片数的方案。

第二种方案,也可以简单粗暴的根据 当前流中的 总字节数 除以 每一个分片所能容纳的1M即可。

公式:

(SUM(IncomingBytes) + SUM(GetRecords.Bytes)) / 1024 / 1024 / Period 其中Period 这里就是 60s。

代码实现部分:

def scaling(self):
        average_incoming_bytes = self.get_metric(
            "IncomingBytes", statistics_model="Average")
        outgoing_records = self.get_metric("GetRecords.Records")
        incoming_records = self.get_metric("IncomingRecords")
        records_per_secord = int(
            (outgoing_records + incoming_records) / self.METRIC_PERIOD)
        number_of_shards_1 = round(
            average_incoming_bytes * records_per_secord / 1024 / 1000, 4)

        sum_incoming_bytes = self.get_metric("IncomingBytes")
        sum_outgoing_bytes = self.get_metric("GetRecords.Bytes")
        sum_bytes = sum_incoming_bytes + sum_outgoing_bytes
        number_of_shards_2 = round(
            sum_bytes / 1024 / 1024 / 60, 4)

        return max(number_of_shards_1, number_of_shards_2)
复制代码

number_of_shards_1 和 number_of_shards_2 就是方案一和方案二的实现。

拿到了当前所需要的分片数,接下来就是根据实际分片数来调整了,代码如下:

def execute(self):
        cur_shards_count = self.get_shards_count()
        number_of_shards = self.scaling()
        need_shards = int(number_of_shards) + 1
        shard_ratio = round(number_of_shards * 100 % 100, 2)
        
        if shard_ratio > int(api_config.TRIGGER_SCALING_UP_SHARD_RATIO):
            need_shards += 1
        elif shard_ratio < int(api_config.TRIGGER_SCALING_DOWN_SHARD_RATIO):
            need_scaling_down = True

        if need_scaling_down and cur_shards_count > need_shards:
            need_shards = cur_shards_count - 1
复制代码

need_shards就是我们需要更新的分片数,调取API update_shard_count 即可。

上面代码描述叫简单,提供2个环境变量,TRIGGER_SCALING_UP_SHARD_RATIO 触发增加分片的比率, TRIGGER_SCALING_DOWN_SHARD_RATIO 降低分片的比率。比如 cur_shards_count = 3, number_of_shards = 2.7, 那么 shard_ratio = 70% 如果设置 TRIGGER_SCALING_UP_SHARD_RATIO = 60%,那么 need_shards = 4,则需要扩展分片为4个,缩小的方案也是如此。

上面描述简单,实际过程中需要控制伸缩的频率,而且AWS也有频率限制,如何在合适的时机伸缩,需要根据业务的实际过程进行分析。同时还要上报更新的日志,存储上一次更新的记录数据等等。

Kinesis 根据流量弹性伸缩分片就分享到这里,欢迎大家在评论区一起讨论更优方案。

更多精彩文章请关注公众号: 『天澄技术杂谈』

「解决方案」Kinesis 分片如何根据流量进行弹性伸缩

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Clean Architecture

Clean Architecture

Robert C. Martin / Prentice Hall / 2017-9-20 / USD 34.99

Practical Software Architecture Solutions from the Legendary Robert C. Martin (“Uncle Bob”) By applying universal rules of software architecture, you can dramatically improve developer producti......一起来看看 《Clean Architecture》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码