内容简介:by LittleHann & cdxy from team 阿里云安全-能力建设团队.Presentation:方案特点:
by LittleHann & cdxy from team 阿里云安全-能力建设团队.
Presentation: PDF
id | task | description | score | rank |
---|---|---|---|---|
1 | DNS Malicious Traffic Identification | 100.0 | top 1 | |
2 | DGA Domain Identification and Family Clustering | 98.83 | top 1 |
Q1 DNS恶意流量检测
- 解题思路:结合专家经验在多个维度做统计特征,滤出超越统计基线3sigma的异常行为,人工检验异常数据确认攻击,然后编写规则滤出该类攻击全部数据包。
方案特点:
- 使用云环境大数据分析组件,高效完成题目。
- 使用异常检测方法,所使用的特征空间能够对数据集做完全线性二分类,达到100% precision和recall。
1.1 数据结构化处理
tshark -r q1_final.pcap -T ek > output_ek.json
import json path = 'output_ek.json' output = 'output_ek_index.json' with open(path) as f: with open(output, 'w') as w: index = 1 sep = '$$$$$' while True: line = f.readline() if not line: break if 'timestamp' in line: out_line = str(index) + sep + line.strip() + '\n' w.write(out_line) if index % 10000 == 0: print index index += 1 print 'total:', index
- 数据上传到阿里云大数据分析服务 MaxCompute 做包解析和结构化分析。
- 解析后的数据阿里云机器学习平台 PAI 做算法分析和可视化。
1.2 解题策略
通过对数据的初步人工浏览和简单可视化分析发现:
- 数据经过脱敏,因此部分字符分布、语义、信息熵等特征会受到影响。
- 时间区间很短,因此并不适合用"对历史行为建模以检测未知"的思路来做。
- 数据完整,不存在缺失值填充的问题。
- 题中说明存在五种攻击方式,且提交的是DNS query的packet id,表明出题人自信已经100%吃透了这1kw数据包。因此本次五种攻击模式不会太复杂,每种攻击流量都是"干净"的(可以用规则搞定答案全集,不存在模棱两可、特征模糊、人工难辨的情况),猜测攻击包有可能是出题人自己造的。
- eth层、frame层、UDP层、TCP层的特征高度统一,出题人没有留下漏洞,因此重点分析DNS层即可。
据此,我的解题策略为:
- 原始日志->特征工程->异常检测->人工验证(得到部分答案)->pattern提取->规则匹配->全部答案。
1.3 特征工程
接下来开始思考本题的特征维度。根据我的安全经验,将DNS攻击分为三种建模:
- 密集请求型:例如随机子域名DDoS、反射型DDoS。其特征为QPS高、时序特征强,一般能够可视化观察到波峰。
- 漏洞攻击型:例如针对DNS server的已知漏洞攻击。其特征为数量少、受DNS type影响,适合分类统计。如果批量PoC的话,则特征同1。
- 数据传输型:例如DNS Tunnel、Malware DGA、PoC中的DNS回显、SSRF重绑定等。其特征在于域名文本特征明显、适用于规则匹配。
将DNS日志的Request和Response join到一起,然后做统计特征和文本特征:
- DNS请求时序分布
- QPS min/max/avg
- QPS均值
- QPS波动性
- 连接成功率
- DNS响应率
- TCP报文占比
- 请求响应比
- 单域名平均访问次数
- 单目标高频访问
- 多级子域名变化率
- DNS type时序分布
- DNS type源IP分布
- 长随机域名
- DNS Tunnel特征
- 部分DNS RCE
- 心跳包
代码示例:SRC_IP维度的部分统计特征
select src_ip, max(index_rsp_cnt) as max_index_rsp_cnt, avg(index_rsp_cnt) as avg_index_rsp_cnt, stddev(index_rsp_cnt) as stddev_index_rsp_cnt, count(distinct index) as all_req_cnt, count(distinct text_dns_qry_name) as domain_req_cnt, count(distinct timestamp_sec) as alive_seconds, abs(max(timestamp_sec)-min(timestamp_sec)) as alive_period, cast(count(distinct timestamp_sec) as double)/cast(abs(max(timestamp_sec)-min(timestamp_sec)+1) as double) as alive_density, count(distinct text_dns_qry_name) as uniq_domain_cnt, count(distinct text_dns_qry_type) as uniq_qr_type_cnt, case when count(index_rqs_success) > 0 then sum(index_rqs_success)/cast(count(index_rqs_success) as double) else 0 end as rqs_success_rate, case when count(index_resp_success) > 0 then sum(index_resp_success)/cast(count(index_resp_success) as double) else 0 end as resp_success_rate, max(dns_dns_count_queries) as max_dns_count_queries, avg(dns_dns_count_queries) as avg_dns_count_queries, stddev(dns_dns_count_queries) as stddev_dns_count_queries from ( select *, unix_timestamp(_time) timestamp_sec, trim(json_extractor(layer_dns,'text_dns_qry_type')) as text_dns_qry_type, trim(json_extractor(layer_dns,'dns_dns_count_queries')) as dns_dns_count_queries, count(distinct r_index) over (partition by index) as index_rsp_cnt, case when r_index is not null then '1' else '0' end as index_rqs_success, case when trim(json_extractor(r_layer_dns,'dns_flags_dns_flags_rcode')) = '3' then '0' else '1' end as index_resp_success from ${t1} where layer_dns <> '' -- 去除TCP握手包 ) _ group by src_ip
1.4 异常检测
- 将以上统计特征通过全量数据建立基线,然后在每个特征维度滤出超越3sigma的异常值。
以下是针对时频异常的基线(stddev)和过滤示例代码:
-- 分母拉长到全量时间线 select /*+mapjoin(a)*/ _time, src_ip from ( select _time from ${t1} ) a join ( select src_ip from ${t2} ) b on 1=1 -- stddev计算 select *, sqrt(pow_sum/16273.0) as stddev_qps from ( select *, sum(pow_qps) over (PARTITION by src_ip) as pow_sum from ( select a.*, b.avg_qps, pow(abs(a.normalized_qry_cnt-b.avg_qps),2) as pow_qps from ( select * from ${t1} ) a join ( select * from ${t2} ) b on a.src_ip = b.src_ip ) _ ) __ -- 3sigma过滤 select * from ${t1} where qry_qps_stddev > 0.08776535453791778*3 order by qry_qps_stddev desc limit 999
- 做题过程中,每一轮在确认了每种攻击流量之后,将其从全量流量中去除并重新计算baseline和异常。
1.5 人工验证及过滤
将以上异常检测滤出的IP按照异常维度数量排序,依次人工确认是否为攻击行为,然后通过规则滤出存在攻击的数据包。
xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",5" | wc -l 72 xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",4" | wc -l 33200 xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",3" | wc -l 5055 xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",2" | wc -l 5292 xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",1" | wc -l 34184
这里前四种攻击(子域名爆破、域传送、非法域更新、反射DDoS)人工识别之后提交答案获得80分,最后一种攻击没有找到,此时还剩3次check机会,于是 排序 出三种最明显的数据提交进行fuzz,碰撞出最后一种答案。
1.6 总结
从结果来看,本题最高效的特征如下:
- DNS type。
- src_ip维度的统计分析特征(QPS、域名数量、请求响应数),因为出题人将src_ip的行为做的非常干净,找到了IP就找到了攻击。
分析方法只用了3sigma异常基线一种,人工排序观察Top的异常结果,确认攻击后写规则捞出全部同类攻击。
本题由于没有给标注数据,更考验选手的安全知识,异常流量摆在面前要能看出是攻击才行。可以把单个攻击源的行为多样化,加入一些正常行为,同时把某种攻击拆成多源、多次,以加大解题难度。
Q2 DGA域名检测与家族聚类
- 解题思路:首先通过专家经验做强关联社区发现洗出一部分DGA域名,以此为正样本训练二分类模型识别DGA域名,然后对结果分别进行社区发现、社区聚合、标签传播扩展与降噪,最终得到结果。
方案特点:
- 使用改进的强社区发现方法和专家知识,将无监督问题转化为有监督问题。
- 从种子样本到二分类、社区发现、降噪,最终结果回流到样本集,不断递归收敛最终获取准确结果。
- 通过DGA公开家族特征、whois特征、http响应特征等扩展数据增强结果。
解题策略
-
解题前没有label数据,属于零先验知识数据挖掘,因此不能直接设计单个model进行multiclass多分类一步得到所有DGA家族分类。
-
针对这种场景,我们设计了一个 种子样本生成-二分类-社区发现-降噪与传播-更新种子样本 的循环工作流,在每次迭代过程中的"降噪与传播"部分引入专家经验,在每次迭代中提高精度和召回,最终逼近正确答案。
- 上述过程可以基于阿里云的MaxCompute大数据组件进行pipeline编程,自动串接成一个循环工作流,得到dga家族聚类结果。
DGA域名发现(二分类)
-
本题中,并没有全网视角的DNS-IP网状信息,因此不适合用deep-walk、graph embedding等方式进行关联聚类。出题人提供的是一个pcap压缩包,内部数据是一个gateway/网络流量旁路镜像设备在一段时间内的抓包。
-
整个时序上的特征比较弱,不适合进行时序和深度网络建模,因此,特征工程向量化的思路还是进行expert feature engineering
-
DGA中的检测对象是text_dns_qry_name,因此我们以text_dns_qry_name为group by聚类主体进行特征向量化,后续的二分类建模(binary classification)和无监督聚类(unsupervised cluster)在此基础上进行。
特征维度
- 稀有TLD
- DGA家族互斥假设
- WHOIS特征
- 域名访问频次
- 域名长度统计
- 源IP计数
- 信息熵
- 域名可读性
- Markov概率
- N-Gram平均排名
对域名可读性的解释:
http://fryjntzfvti.biz
特征选择
为了提高模型训练效率,需要进行feature selection。我们在截图过程中尝试了PCA、基尼指数/熵增益评估、XBboost特征重要性评估等方法。
从结果上看,前40%的特征贡献了95%以上的分类熵增益,特征裁剪后,可以提高20%计算效率,但本题数据集很小,提高时间有限。
下列为部分排序结果如下:
colname feature_importance dns_count_labels_avg 7.098254584511219e-7 dns_resp_name_len 0.48685101592518043 dns_flags_rcode_min 0.09380669054958125 dns_qry_name_len 0.08286261820522951 dns_qry_name_consecutive_consonant 0.05165039925351584 dns_qry_name_count_digits 0.046638077601526357 dns_resp_type_min 0.03279234892125366 dns_qry_name_markov_p 0.03226292786968624 dns_flags_rcode_avg 0.03052527051736504 dns_flags_recavail_min 0.028058550159497347 dns_qry_name_entropy 0.02484754211205459 dns_flags_recavail_avg 0.01629423237968943 dns_count_auth_rr_min 0.015600932601233563 dns_qry_name_digits_rate 0.009893560714853046 src_ip_cn 0.008147846032435051 text_dns_qry_name_ngram_freq_rank 0.007359642440535176 dns_qry_name_vowels_rate 0.004839688721567736 dns_qry_name_count_repeat_letter 0.004262471145899436 dns_resp_type_max 0.0038532152488795776 dns_resp_type_avg 0.0023972406346633437 dns_qry_name_count_vowels 0.001883909214152896 ngram_op_rate_avg 0.0017827386086246898 dns_flags_rcode_max 0.0016479222857123728 ...
排名top头部头部的特征也符合我们的专家经验。也可以理解为基于专家经验抽取的特征,从PAC可学习理论的角度来说,已经相当于代入了大量的先验知识,先验知识相当于一种约束,缩小了待搜索的参数空间。这导致模型拟合能力的提升,但是牺牲了泛化能力。
在比赛的前中期我们通过该方法拿到90%左右的recall预估,但是再往上提高就十分困难,即发生了over-fitting,我们在比赛的中后期开始调整为肉鸡IP在短时间窗口的行为视角,在每轮的迭代中加入肉鸡ip在timewindow中的cluster聚类结果,最终将部分家族的dga recall优化到100%。
XGBoost模型训练
模型训练参数:
-DmaxLeafCount="32" -DsampleRatio="0.7" -DfeatureRatio="0.7" -DtreeCount="800" -DminLeafSampleCount="500" -Dshrinkage="0.02"
二分类结果
初步打标出1.4w DGA域名,距离目标12443左右的数据只有2k+的差距,为下一步做社区发现提供了比较好的条件。
域名社区发现
-
域名和肉鸡组成的2元关系,可以抽象为一个社交网络,社交网络中的节点就是domain域名,社交节点之间的联结就是是否以及拥有多少共同的肉鸡。
-
需要注意的是,同一个dga家族的域名会被同一批src ip访问,可能因为malware运行时间的关系,每个域名的count(src_ip)不一定完全相同,但是不会差很多,因此适合用pylouvain进行社区发现。在进行pylouvain之前,要做的一件很重要的事情是:降噪!尽量将非dga域名访问记录剔除只留下不同家族dga域名的访问记录,然后跑pylouvain才能得到较好的效果。如果不做降噪就直接进行社区发现会遇到很严重的over-community cluster的问题。
-
本题最理想情况下,降噪后的节点已经满足“社区内部内聚、社区之间极度稀疏”,这样pylouvain只要运行一次就可以直接得到结果。但是降噪不能做到100%纯净,因此还需要后续的社区dga节点提纯步骤。
图节点与边的定义
节点(node):domain 关系边(edge):count(distinct src_ip) group by domain_in, domain_out -- 基于"共同src_ip"这一共现逻辑关系,建立节点matrix set odps.sql.mapjoin.memory.max=8096; drop table if exists xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain_matrix; create table xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain_matrix as select /*+mapjoin(b)*/ a.src_ip as ip, a.domain as a_domain, b.domain as b_domain from ( select src_ip,domain from xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain ) a join ( select src_ip,domain from xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain ) b on a.src_ip=b.src_ip ; -- 去除自连接,去除双向连接的重复 drop table if exists xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain_matrix_delete_selfloop; create table xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain_matrix_delete_selfloop as select a_domain as node1_id, b_domain as node2_id, count(distinct ip) as weight -- 重复qry的IP数量代表权重 from xiaohan_datacon_q2_dns_data_suspicious_dga_for_pylouvain_matrix where a_domain > b_domain -- 去除自连接,去除双向连接的重复 group by a_domain, b_domain ;
社区发现结果
经过一轮社区发现后会得到54个dga family,部分社区如下图:
- 显然本题中不可能有这么多家族。这是现实中一个非常正常的情况,由于botnet活跃的时间断不一致,有可能感染了同一种DGA的bot出现在不同时间,因此其访问的DGA没有重合。接下来要对多个社区做聚合和提纯。
社区合并
对上文中每个louvain产出的社区做特征工程,然后聚类以达到"合并同一家族社区"的效果。
社区合并特征:
- TLD one-hot分布 [com, org, net, biz, info]
- DNS SLD字符集 [a-zA-Z0-9]
- DNS SLD子域名长度变化区间 [16,32]
结果优化
这一步通过引入其他维度数据进一步提高精度和召回。
域名维度特征:
- Only SLD+TLD && length(SLD) > 5
- Number of rdata < 5
- HTTP alive
- NXDOMAIN rate
- WHOIS existed
- WHOIS sinkhole
- Alexa rank < 1M
- IP malicious request rate
- DGA Generate Algorithm
社区维度特征:
- NXDOMAIN≥ 20%
- Number of Domain names ≥ 4
- Query in a short times
最终结果:
线上得分 98.8300
主要问题和待提高的地方
-
结合malware reverse engine进行辅助和分析确认。我们队在比赛过程中针对这道题的现实意义进行了讨论,dga检测与识别,毫无疑问是要进行实时防御,或者说是准实时防御,即dns sinkhole,这就是一个典型的“双百场景”,即“recall 100% + precision 100%”。
-
dga C&C本质上是黑客的一种隐蔽通信手段,如果不能100% recall识别,漏报一个等于防御失败。反过来,dns域名是一个互联网核心基础设施,如果在骨干网设备上产生拦截误报,影响是非常巨大的。这和Xorddos马的自我繁殖防御类似。从这个角度来说,这道题我们没有拿到100%,等于防御失败了。
-
在工程化中,这道题最有效的方法是是对dga malware进行监控和逆向分析,通过精确的dga generate function,提前预知未来可能产生的dga域名,从而进行提前防御,当然也要关注dga生成算法与常规域名的碰撞问题。
-
社区节点间边权重计算方式需要优化:在实际场景中,一台机器可能中多个木马,而且在中马的同时可能正在进行其他的高频业务访问行为,因此只基于简单的共享同一个肉鸡ip的社区边定义很容易引入像msn.com这种误报,对边权重的计算需要引入更多肉鸡-域名行为时序上的特征。
计算效率
- IO:从pcap中读取dns数据过程较慢,将数据导入MaxCompute之后,效率大大提高。
- FE:对domain列进行groupby,然后对其他列进行操作,这里groupby一次需要10min,跑2-gram需要20min。
- Xgboost:训练模型,15min完成,预测10s内完成。
- pylouvain:200w edge,1.6w vertex,1min内完成收敛。
- LPA:1min完成收敛。
- nxNetwork可视化:30s完成数据加载,2min完成图形渲染。
比赛结果
Ref
- https://github.com/rrenaud/Gibberish-Detector
- https://pc.nanog.org/static/published/meetings/NANOG71/1444/20171004_Gong_A_Dga_Odyssey__v1.pdf
- https://cloud.tencent.com/developer/article/1142855
- https://github.com/360netlab/DGA/tree/master/code
- https://www.botconf.eu/wp-content/uploads/2015/12/OK-P06-Plohmann-DGArchive.pdf
- https://www.usenix.org/sites/default/files/conference/protected-files/security16_slides_plohmann.pdf
- https://github.com/rmariko/security-ids/blob/0696255b7f2600429a3129bdc1b271d3c4db20ae/ids.py
- https://github.com/LittleHann/DGA-1/blob/master/dga_algorithms/Conficker.cpp
- https://github.com/andrewaeva/DGA/blob/master/dga_algorithms/Matsnu.py
- https://github.com/LittleHann/dga-collection/tree/master/dgacollection
- https://github.com/360netlab/DGA/tree/master/code
- https://github.com/baderj/domain_generation_algorithms/tree/e2ed68a9813b2265652a79291a74b4c23fc13bf0
- https://www.cdxy.me/?p=805
- https://github.com/shyoshyo/Datacon-9102-DNS
- http://momomoxiaoxi.com/数据分析/2019/04/24/datacondns1/
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python基础教程(第2版)
Magnus Lie Hetland / 司维、曾军崴、谭颖华 / 人民邮电出版社 / 2010-7 / 69.00元
本书是经典教程的全新改版,作者根据Python 3.0版本的种种变化,全面改写了书中内容,做到既能“瞻前”也能“顾后”。本书层次鲜明、结构严谨、内容翔实,特别是在最后几章,作者将前面讲述的内容应用到了10个引人入胜的项目中,并以模板的形式介绍了项目的开发过程。本书既适合初学者夯实基础,又能帮助Python程序员提升技能,即使是 Python方面的技术专家,也能从书里找到令你耳目一新的东西。一起来看看 《Python基础教程(第2版)》 这本书的介绍吧!
XML、JSON 在线转换
在线XML、JSON转换工具
HEX HSV 转换工具
HEX HSV 互换工具