内容简介:在之前筆者的這篇文章中:在們學習了如何使用AWS 的相關工具來建立一個用戶行為的LOG 分析系統。
在之前筆者的這篇文章中:
一個基於 AWS Elasticsearch 的用戶行為 log 系統建立
在們學習了如何使用AWS 的相關 工具 來建立一個用戶行為的LOG 分析系統。
但是這篇文章中所提到的架構有個問題。
這個版本有什麼問題呢?
那就是在某些情況下它會一直噴以下錯誤 :
ServiceUnavailableException: Slow down.
那為什麼會一直噴 Slow down 呢 ?
會發生這個的原因在於,我們有採到 aws firehose 的限制,如下:
Amazon Kinesis Data Firehose 有以下限制。
如果将 Direct PUT 配置为数据源,每个 Kinesis Data Firehose 传输流 都会受以下限制的约束: * 对于 美国东部(弗吉尼亚北部)、美国西部(俄勒冈) 和 欧洲(爱尔兰):5,000 条记录/秒;2,000 个事务/秒;5 MB/秒。 * 对于 欧洲 (巴黎)、亚太地区(孟买)、美国东部(俄亥俄州)、欧洲(法兰克福)、南美洲(圣保罗)、亚太区域(首尔)、欧洲 (伦敦)、亚太区域(东京)、美国西部(加利福尼亚北部)、亚太区域(新加坡)、亚太区域(悉尼) 和 加拿大 (中部):1000 条记录/秒;1000 个事务/秒;1 MB/秒。 ! 注意 当 Kinesis Data Streams 配置为数据源时,此限制不适用,Kinesis Data Firehose 可无限扩展和缩小。
加強版
原本的版本如下圖。
然後我們會將它修改成如下圖,就是在資料源與 firehose 之間多增加了 data stream。
使用 AWS data stream 有以下幾個好處 :
- 可以自由的調整傳輸限制。(這樣就可以解決上述的問題)
- 未來如果有其它單位想要接受這個資料源,那只要請對方接上這個 data stream,它就可以受到資料了。
AWS Kinesis Data Stream 申請
事實上就只有兩個東西要填寫 Stream Name
與 Shard Number
。
其中這裡簡單的說一下 Shard 概念。
Stream Shard (碎片)
在 AWS kinesis data stream 中有個 shard 的概念,它就是指 stream 的子集合。
每條 stream 都是由 1 至 n 個 shard 所組合成,這樣有幾個好處 :
- 在傳輸資料給 stream 時,可以將傳輸量平均的分散給不同 shard,這樣可以避免觸碰到每個 shard 的傳輸限制。
- 你可以指定那一些類型的資料傳輸到 A Shard,那些類型的資料傳輸到 B Shard,這樣有助於你放便管理資料流。
Shard 的限制
上面有提到每個 stream 都有傳輸限制,這裡我們就來看一下它的限制有那些。
以下從 Aws 官網 擷取 :
- 單一碎片每秒可擷取多達 1 MiB 的資料 (包括分割區索引鍵) 或每秒寫入 1,000 筆記錄。同樣地,如果您將串流擴展到 5,000 個碎片,串流每秒即可擷取多達 5 GiB 或每秒 500 萬筆記錄。若您需要更多的擷取容量,可以使用 AWS Management Console 或 UpdateShardCount API 輕鬆擴展串流中的碎片數目。
- GetRecords 每次呼叫可從單一碎片擷取最多 10 MiB 的資料,每次呼叫最多 10,000 筆記錄。每呼叫一次 GetRecords 即計為一筆讀取交易。
- 每個碎片每秒可支援最多 5 筆讀取交易。每筆讀取交易可提供多達 10,000 筆記錄,每筆交易的上限為 10 MiB。
- 每個碎片透過 GetRecords 每秒可支援最多 2 MiB 的總資料讀取速率。如果呼叫 GetRecords 傳回 10 MiB,在接下來的 5 秒內發出的後續呼叫將擲回例外狀況。
如何將資料傳輸到指定的 Shard
下面為一段 nodejs 寫入資料到 stream 的範例碼,其中注意到 PartitionKey
這個東東,它就是可以幫助你指定到想要的 Shard。
'use strict'; const AWS = require('aws-sdk'); const streamName = process.env['AWS_KINESIS_STREAM']; const uuidv1 = require('uuid/v1'); const kinesis = new AWS.Kinesis({region: process.env['AWS_KINESIS_REGION']}); module.exports = { putRecord: (packet) => { return new Promise((resolve, reject) => { // 多加換行符號是因為這樣才能在 aws athena 進行解析 const recordParams = { Data: JSON.stringify(packet) + '\n', StreamName: streamName, PartitionKey: uuidv1() }; kinesis.putRecord(recordParams, (err) => { if (err) { reject(err); } resolve(); }); }); } };
PartitionKey
基本上就是用來讓 AWS kinesis 來決定你要去那一個 Shard。
假設你的文件 A 傳輸時 PartitionKey 設為 GroupA 這個文字,那它就會跑到某個 Shard A 去,如果這時再傳輸個文件 B 並且 PartitionKey 也設定為 GroupA,那這一份文件也會傳輸到 Shard A。
所以當你想將同一類型的文件,都傳輸到同一個 Shard 時,記得將 PartitionKey 設為相同。
但如果是想將它平均分散到每一個 Shard 呢 ?
事實上有兩個方法,首先第一種方法就是每一丟資料時,先去抓這個 stream 看它有幾個 shards,然後再根據它的數量,來隨機產生個數字,例如有 4 個 shards 那你每次丟資料時,就從 1 ~ 4 隨機產生一個數字,然後再將它設到 PartitionKey 中,那這樣基本上就會平均分配。
而另一種方法就是每一次的 PartitionKey 都使用 uid 來設定,這樣也可以將他平均的進行分配。
不過我是比較建議用第二種,因為第一種每一次都要去 AWS 那抓取 stream 裡的 shards 大小,這樣太耗時間了。
參考資料
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。