内容简介:Kinesis是AWS上实时流数据处理平台,可以实时收集、处理和分析流中的数据。kinesis的相关原理 可以对标 kafka,但是比kafka相对简单,下面聊一聊分片带来的问题。Kinesis的每一个分片接收数据的大小和记录数都有相关限制:单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录。
Kinesis是AWS上实时流数据处理平台,可以实时收集、处理和分析流中的数据。
问题:
kinesis的相关原理 可以对标 kafka,但是比kafka相对简单,下面聊一聊分片带来的问题。
Kinesis的每一个分片接收数据的大小和记录数都有相关限制:单个分片可以提取多达每秒 1 MiB 的数据 (包括分区键) 或每秒写入 1000 个记录。
当遇到流量的高峰和低谷的时候,分片的大小不是固定的,当然你可以一直选择一个高峰时分片数,但这个在每天流量低谷的时候,分片就显得比较冗余,也浪费钱。
所以需要根据实时监控流量调整分片数量显得非常必要了,AWS只是提供了能修改分片的API,如何调整,就需要用户自己来调整策略了。
如果流量超过分片的限制,则会直接抛出以下异常: botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the PutRecord operation: Rate exceeded for shard shardId-000000000008 in stream
接下来聊一聊如何根据流量弹性调整分片策略。
指标:
先来认识几个指标,Kinesis会向CloudWatch每一分钟发送分片和流的监控指标:
指标 | 定义 |
---|---|
GetRecords.Bytes | 在指定时段内从 Kinesis 流检索的字节数。 |
GetRecords.Records | 在指定时段内测量的从分片中检索的记录数。 |
IncomingBytes | 在指定时段内成功放置到 Kinesis 流的字节数。该指标包含来自 PutRecord 和 PutRecords 的字节数。 |
IncomingRecords | 在指定时段内成功放置到 Kinesis 流的记录数。 |
主要是这四个指标,其中IncomingBytes可以获取Stream和ShardId两个维度的数据,即整个DataStream流的接收的字节数,以及每一个分片所接收的字节数。分片的指标需要额外单独在Kinesis界面开放,默认是不开放的。
同时每一个指标的统计数据维度有:Minimum、Maximum、Average、Sum。
可以简单理解为 IncomingRecords 和 GetRecords.Records 就是选择的间隔时间内流入和流出的记录数。而 GetRecords.Bytes 和 IncomingBytes 分别表示流出和流入的字节数。
下面代码实现如何获取相关指标:
def get_metric(self, metric_name, minutes_period=5, statistics_model="Sum", shard_id=None): start_time = datetime.datetime.utcnow() - datetime.timedelta( minutes=minutes_period) end_time = datetime.datetime.utcnow() dimensions = [] dimensions.append( dict(Name="StreamName", Value=api_config.KINESIS_STREAM_NAME)) if shard_id: dimensions.append(dict(Name="ShardId", Value=shard_id)) result = self.cloudwatch_client.get_metric_statistics( Namespace='AWS/Kinesis', MetricName=metric_name, Dimensions=dimensions, StartTime=start_time, EndTime=end_time, Period=60, Statistics=[ statistics_model ]) max_value = 0 for item in result.get("Datapoints", []): if item[statistics_model] > max_value: max_value = item[statistics_model] return max_value 复制代码
上面函数的目标就是获取指定指标 最近5分钟内 每一分钟的最大值。比如想获取 IncomingRecords 指标最近5分钟内间隔时间为1分钟的最大记录数。 get_metric("IncomingRecords")
即可。 result 会返回5条记录,拿出最大值即可。
如何获取当前分片数:
流量监控指标获取到之后,接下来如何获取当前分片数了,Kinesis 的 python 包这一点非常尴尬,居然没有可以直接获取当前打开的分片数,为什么是需要打开的分片数了,因为关闭的分片数信息在 Kinesis 包的获取分片的API里也会返回,比如最近一段时间内,如果有操作分片数量,则AWS会关掉或者打开一些分片,关掉的分片不会再接收数据,但里面有的数据依然可以被消费者消费,从而保证数据在修改分片的操作上不会丢失,这样就会导致获取分片列表的时候,关闭的分片依然会返回,返回的数据中也没有状态标注哪个分片是打开或者关闭。
上面聊到的 IncomingBytes 关于分片维度的指标就起作用了,可以通过循环枚举所有分片找出可以接收数据的分片,则说明均是打开的分片。
代码部分如下:
def get_shards_count(self, shards_data=None): if not shards_data: result = self.kinesis_client.list_shards( StreamName=api_config.KINESIS_STREAM_NAME) shards_data = result.get("Shards", []) shards_count = 0 for item in shards_data: shard_id = item["ShardId"] shard_incoming_bytes = self.get_metric( "IncomingBytes", shard_id=shard_id) if shard_incoming_bytes > 0: shards_count += 1 return shards_count 复制代码
相关API介绍:
AWS提供3个API,可以重新分片:
update_shard_count split_shard merge_shards
update_shard_count
最简单,直接调取更新分片数量即可,但此API有一些限制: 查看官网资料:
This operation has the following default limits. By default, you cannot do the following: 1.Scale more than twice per rolling 24-hour period per stream 2.Scale up to more than double your current shard count for a stream 3.Scale down below half your current shard count for a stream 复制代码
主要是这三点,第一点最蛋疼,24小时内只能操作2次API,但这个可以提写工单向AWS客服申请多次。第二点和第三点是说伸缩的分片的时候,不能超过当前分片的2倍或不能低于当前分片的一半。
split_shard 和 merge_shards 属于 Kinesis 的高级主题,这一篇文章暂时不涉及详细讲解,简单提一下场景,主要用于单个分片的流量超过阈值,可以对此分片进行一分为二,能够分当然也能够合,里面也有一些限制,以后再聊这个话题。
弹性伸缩策略:
重点来了,先来了解一下分片的计算方式:
-
写入流的数据记录的平均大小(以 KB 为单位,向上取整为 1 KB),数据大小 (average_data_size_in_KiB)。
-
每秒写入流和从流读取的数据记录数 (records_per_second)。
-
并发且独立使用流中数据的 Kinesis Data Streams 应用程序的数量,即使用者数量 (number_of_consumers)。
-
以 KB 为单位的传入写入带宽 (incoming_write_bandwidth_in_KiB),等于 average_data_size_in_KiB 乘以 records_per_second。
-
以 KB 为单位的传出读取带宽 (outgoing_read_bandwidth_in_KiB),等于 incoming_write_bandwidth_in_KiB 乘以 number_of_consumers。
可使用以下公式中的输入值来计算流所需的分片的数量 (number_of_shards):
number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048) 复制代码
一一来获取上面提到的指标参数:
-
average_data_size_in_KiB
就是写入流数据记录的平均大小,即通过 IncomingBytes ,维度是 Average 即可获取。 -
records_per_second
通过 GetRecords.Records 和 IncomingRecords 一分钟记录之和 除以 60s 即可。 -
number_of_consumers
就是消费者,这个可以通过函数获取,这个我们默认为1.
根据公式 number_of_shards = max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048)
, 因为我们消费者数目是1 ,那么其实 outgoing_read_bandwidth_in_KiB / 2048 其实就是 ncoming_write_bandwidth_in_KiB / 2048。 显然:
number_of_shards = incoming_write_bandwidth_in_KiB/1024
。
这是第一种获取当前需要的分片数的方案。
第二种方案,也可以简单粗暴的根据 当前流中的 总字节数 除以 每一个分片所能容纳的1M即可。
公式:
(SUM(IncomingBytes) + SUM(GetRecords.Bytes)) / 1024 / 1024 / Period
其中Period 这里就是 60s。
代码实现部分:
def scaling(self): average_incoming_bytes = self.get_metric( "IncomingBytes", statistics_model="Average") outgoing_records = self.get_metric("GetRecords.Records") incoming_records = self.get_metric("IncomingRecords") records_per_secord = int( (outgoing_records + incoming_records) / self.METRIC_PERIOD) number_of_shards_1 = round( average_incoming_bytes * records_per_secord / 1024 / 1000, 4) sum_incoming_bytes = self.get_metric("IncomingBytes") sum_outgoing_bytes = self.get_metric("GetRecords.Bytes") sum_bytes = sum_incoming_bytes + sum_outgoing_bytes number_of_shards_2 = round( sum_bytes / 1024 / 1024 / 60, 4) return max(number_of_shards_1, number_of_shards_2) 复制代码
number_of_shards_1 和 number_of_shards_2 就是方案一和方案二的实现。
拿到了当前所需要的分片数,接下来就是根据实际分片数来调整了,代码如下:
def execute(self): cur_shards_count = self.get_shards_count() number_of_shards = self.scaling() need_shards = int(number_of_shards) + 1 shard_ratio = round(number_of_shards * 100 % 100, 2) if shard_ratio > int(api_config.TRIGGER_SCALING_UP_SHARD_RATIO): need_shards += 1 elif shard_ratio < int(api_config.TRIGGER_SCALING_DOWN_SHARD_RATIO): need_scaling_down = True if need_scaling_down and cur_shards_count > need_shards: need_shards = cur_shards_count - 1 复制代码
need_shards就是我们需要更新的分片数,调取API update_shard_count
即可。
上面代码描述叫简单,提供2个环境变量,TRIGGER_SCALING_UP_SHARD_RATIO 触发增加分片的比率, TRIGGER_SCALING_DOWN_SHARD_RATIO 降低分片的比率。比如 cur_shards_count = 3, number_of_shards = 2.7, 那么 shard_ratio = 70% 如果设置 TRIGGER_SCALING_UP_SHARD_RATIO = 60%,那么 need_shards = 4,则需要扩展分片为4个,缩小的方案也是如此。
上面描述简单,实际过程中需要控制伸缩的频率,而且AWS也有频率限制,如何在合适的时机伸缩,需要根据业务的实际过程进行分析。同时还要上报更新的日志,存储上一次更新的记录数据等等。
Kinesis 根据流量弹性伸缩分片就分享到这里,欢迎大家在评论区一起讨论更优方案。
更多精彩文章请关注公众号: 『天澄技术杂谈』
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 微服务实例自动弹性伸缩实践
- Jenkins + DockerSwarm 实现弹性伸缩持续集成
- 技术天地:FreeWheel 实时数据系统弹性伸缩实践
- 美团弹性伸缩系统的技术演进与落地实践
- Docker(三):利用Kubernetes实现容器的弹性伸缩
- [译] 使用自定义指标进行 K8S 自动弹性伸缩
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
实战Java高并发程序设计
葛一鸣、郭超 / 电子工业出版社 / 2015-10-1 / CNY 69.00
在过去单核CPU时代,单任务在一个时间点只能执行单一程序,随着多核CPU的发展,并行程序开发就显得尤为重要。 《实战Java高并发程序设计》主要介绍基于Java的并行程序设计基础、思路、方法和实战。第一,立足于并发程序基础,详细介绍Java中进行并行程序设计的基本方法。第二,进一步详细介绍JDK中对并行程序的强大支持,帮助读者快速、稳健地进行并行程序开发。第三,详细讨论有关“锁”的优化和提高......一起来看看 《实战Java高并发程序设计》 这本书的介绍吧!