内容简介:由于AWS流量镜像的特殊性,现阶段生产网的架构中只接入了HTTP与DNS流量,分别采用了Zeek与Suricata对现有流量进行分析与预警。Suricata负责基于签名的特征检测,Zeek负责定制化事件的脚本检测,也算是“各司其职”。近几日,某个业务接口出现了Pindom告警,经过分析发现部分IP尝试对该接口的参数进行遍历。由于遍历参数对应的值设置的都比较大,且后台并未对该参数进行深度的限制,导致了服务器会不断的进行计算,最终导致接口无响应。通过扩展
由于AWS流量镜像的特殊性,现阶段生产网的架构中只接入了HTTP与DNS流量,分别采用了Zeek与Suricata对现有流量进行分析与预警。Suricata负责基于签名的特征检测,Zeek负责定制化事件的脚本检测,也算是“各司其职”。近几日,某个业务接口出现了Pindom告警,经过分析发现部分IP尝试对该接口的参数进行遍历。由于遍历参数对应的值设置的都比较大,且后台并未对该参数进行深度的限制,导致了服务器会不断的进行计算,最终导致接口无响应。
- 检测参数遍历行为; - 访问是否存在周期性; - unique user_agent 统计; - threat intelligence 研判;
实现
通过扩展 ElastAlert 告警框架的告警模型,来实现以上需求。
规则文件
import sys import json import redis import html import datetime from multiprocessing import Process, JoinableQueue, Lock, Manager from elastalert.ruletypes import RuleType from elastalert.util import elastalert_logger try: import pandas as pd except: print("Please make sure you have pandas installed. pip install pandas") sys.exit(0) try: from tqdm import tqdm except: print("Please make sure you have tqdm module installed. pip install tqdm") sys.exit(0) def conn(host='localhost', port=6379, password=None, db=0): pool = redis.ConnectionPool(host=host, port=port, password=password, db=db) conn = redis.Redis(connection_pool=pool) return conn def put_data(conn, q, data): with conn.pipeline() as pipe: for i in data: pipe.lpush(q, i) pipe.execute() class SpiderRule(RuleType): def __init__(self, rules, args=None): super(SpiderRule, self).__init__(rules, args=None) self.MAX_ARGS_LENGTH = int(self.rules['beacon']['max_args_length']) self.MIN_HITS = int(self.rules['beacon']['min_hits']) self.MAX_UNIQUE_ARGS = int(self.rules['beacon']['max_unique_args']) self.THRESHOLD_PERCENT = int(self.rules['beacon']['threshold_percent']) self.NUM_PROCESSES = int(self.rules['beacon']['threads']) self.UA_PROCESSES = int(self.rules['beacon']['user_agent']) self.TIMESTAMP = '@timestamp' self.FORMAT_TIMESTAMP = self.rules['timestamp'].get('format', None) self.beacon_module = self.rules['beacon']['beacon_module'] self.WINDOW = int(self.rules['beacon']['window']) self.MIN_INTERVAL = int(self.rules['beacon']['min_interval']) buffer_time = str(self.rules['buffer_time']) self.PERIOD = ':'.join(buffer_time.split(':')[:2]) self.fields = self.normalized_field(self.rules['field']) self.src_ip = self.fields['aliases']['src_ip'] self.url = self.fields['aliases']['url'] self.url_path = self.fields['aliases']['url_path'] self.http_host = self.fields['aliases']['http_host'] self.user_agent = self.fields['aliases']['user_agent'] self.json = self.rules['output']['json'].get('enable', None) self.redis = self.rules['output']['redis'].get('enable', None) self.q_job = JoinableQueue() self.l_df = Lock() self.l_list = Lock() def normalized_field(self, d): fields = {'hash': [], 'output': [], 'aliases': {}} for field, info in d.items(): alias = info['alias'] fields['aliases'][alias] = field for i in info.get('type', []): fields[i].append(field) return fields def add_data(self, data): # Detection of spider crawlers self.df = pd.json_normalize(data) results = self.find_spiders() d = results.to_dict(orient="records") # Output to local files if self.json: json_path = self.rules['output']['json']['path'] with open(json_path, 'a') as out_file: for i in d: out_file.write(json.dumps(i) + '\n') # Output to Redis Server if self.redis: try: host = self.rules['output']['redis']['host'] port = self.rules['output']['redis']['port'] password = self.rules['output']['redis']['password'] db = self.rules['output']['redis']['db'] key = self.rules['output']['redis']['key'] ioc = self.rules['output']['redis']['field'] redis_conn = conn(host=host, port=port, password=password, db=db) IoC = results[ioc].unique().tolist() put_data(redis_conn, key, IoC) except: elastalert_logger.error("Output Redis configuration errors.") self.add_match(d) # The results of get_match_str will appear in the alert text def get_match_str(self, match): return json.dumps(match) def add_match(self, results): for result in results: super(SpiderRule, self).add_match(result) def get_args_hash(self, args, session_id): return hash(tuple(args + [session_id])) def get_query_str(self, request): query = request.split('?')[-1] query_str = dict([i.split("=", 1) for i in query.split( "&") if i if len(i.split("=", 1)) == 2]) query_str['args_list'] = list(query_str.keys()) query_str['max_length'] = len(query_str) query_str['url_sample'] = request return query_str def percent_grouping(self, d, total): interval = 0 # Finding the key with the largest value (interval with most events) mx_key = int(max(iter(list(d.keys())), key=(lambda key: d[key]))) mx_percent = 0.0 for i in range(mx_key - self.WINDOW, mx_key + 1): current = 0 # Finding center of current window curr_interval = i + int(self.WINDOW / 2) for j in range(i, i + self.WINDOW): if j in d: current += d[j] percent = float(current) / total * 100 if percent > mx_percent: mx_percent = percent interval = curr_interval return interval, mx_percent def find_beacon(self, session_data): beacon = {} if not self.FORMAT_TIMESTAMP: session_data[self.TIMESTAMP] = pd.to_datetime( session_data[self.TIMESTAMP]) else: session_data[self.TIMESTAMP] = pd.to_datetime( session_data[self.TIMESTAMP], format=self.FORMAT_TIMESTAMP) session_data[self.TIMESTAMP] = ( session_data[self.TIMESTAMP].astype(int) / 1000000000).astype(int) session_data = session_data.sort_values([self.TIMESTAMP]) session_data['delta'] = ( session_data[self.TIMESTAMP] - session_data[self.TIMESTAMP].shift()).fillna(0) session_data = session_data[1:] d = dict(session_data.delta.value_counts()) for key in list(d.keys()): if key < self.MIN_INTERVAL: del d[key] # Finding the total number of events total = sum(d.values()) if d and total > self.MIN_HITS: window, percent = self.percent_grouping(d, total) if percent > self.THRESHOLD_PERCENT and total > self.MIN_HITS: beacon = { 'percent': int(percent), 'interval': int(window), } return beacon def find_spider(self, q_job, spider_list): while not q_job.empty(): session_id = q_job.get() self.l_df.acquire() session_data = self.df[self.df['session_id'] == session_id] self.l_df.release() query_str = session_data[self.url].apply( lambda req: self.get_query_str(req)).tolist() query_data = pd.DataFrame(query_str) # get args_hash query_data['args_hash'] = query_data['args_list'].apply( lambda args: self.get_args_hash(args, session_id)) for i in query_data['args_hash'].unique(): result = { "detail": { 'percent': {}, 'unique': {} }, "tags": [], "src_ip": session_data[self.src_ip].tolist()[0], "url_path": session_data[self.url_path].tolist()[0], "http_host": session_data[self.http_host].tolist()[0], "unique_ua": session_data[self.user_agent].unique().shape[0], "alert": False, } df = query_data[query_data['args_hash'] == i] count_args_length = df['max_length'].iloc[0] if count_args_length > self.MAX_ARGS_LENGTH: continue total_hits = df.shape[0] if total_hits < self.MIN_HITS: continue args_list = df['args_list'].iloc[0] for i in args_list: unique_args = len(df[i].unique()) if unique_args == 1: continue # Calculate the percentage based on the number of changes in the parameters current_percent = int((unique_args / total_hits) * 100) if current_percent < self.THRESHOLD_PERCENT: continue result['detail']['percent'][i] = current_percent result['detail']['unique'][i] = unique_args # Number of parameters with changes count_unique_args = len(result['detail']['unique']) if count_unique_args <= self.MAX_UNIQUE_ARGS: result['alert'] = True if not result['detail']['unique']: continue # Beacon analysis if self.beacon_module: result['beacon'] = self.find_beacon( session_data.reset_index(drop=True)) result['args_list'] = args_list result['total_hits'] = total_hits result['url_sample'] = df['url_sample'].iloc[0] result['period'] = self.PERIOD if result['alert']: result['tags'].append('enumerate') if result['beacon']: result['tags'].append('beacon') if result['unique_ua'] >= self.UA_PROCESSES: result['tags'].append('suspicious-ua') self.l_list.acquire() spider_list.append(result) self.l_list.release() q_job.task_done() def find_spiders(self): if self.df.empty: raise Exception( "Elasticsearch did not retrieve any data. Please ensure your settings are correct inside the config file.") tqdm.pandas(desc="Detection of Spider Crawlers.") # get url_path self.df[self.url_path] = self.df[self.url].str.split('?').str.get(0) # add session_id from hash fields self.df['session_id'] = self.df[self.fields['hash'] ].progress_apply(lambda row: hash(tuple(row)), axis=1) # split url self.df = self.df[self.df[self.url].apply(lambda request: True if len( request.split('?')) == 2 else False)].reset_index(drop=True) # normalized url self.df[self.url] = self.df[self.url].apply( lambda request: html.unescape(request)) # unique session_id unique_session = self.df['session_id'].unique() for session in unique_session: self.q_job.put(session) mgr = Manager() spider_list = mgr.list() processes = [Process(target=self.find_spider, args=( self.q_job, spider_list,)) for thread in range(self.NUM_PROCESSES)] # Run processes for p in processes: p.start() # Exit the completed processes for p in processes: p.join() results = pd.DataFrame(list(spider_list)) # add timestamp now = datetime.datetime.now().isoformat() results['timestamp'] = now if not results.empty: results = results[results['alert'] == True] match_log = "Queried rule %s matches %s crawl events" % ( self.rules['name'], results.shape[0] ) elastalert_logger.info(match_log) return results
配置文件
Web.yaml
name: "Detection of Spider Crawlers" es_host: "es_server" es_port: 9200 type: "elastalert_modules.spider.my_rules.SpiderRule" index: "zeek-other-%Y.%m.%d" use_strftime_index: true filter: - term: host: "canon88.github.io" - term: method.keyword: "GET" include: ["true_client_ip", "host", "uri", "uri_path", "user_agent"] timestamp: format: false timestamp_field: "@timestamp" buffer_time: hours: 12 run_every: minutes: 10 max_query_size: 10000 scroll: true beacon: max_args_length: 10 # 最大检测参数个数 min_hits: 120 # 最小命中事件数 max_unique_args: 2 # 最大动态变化参数 threshold_percent: 70 # 请求阈值百分比 threads: 16 # 多进程 beacon_module: true # 开启周期性检测 min_interval: 1 # 最小周期 window: 2 # 抖动窗口 user_agent: 20 # 唯一UA个数 field: true_client_ip: alias: src_ip type: [hash] host: alias: http_host type: [hash] uri_path: alias: url_path type: [hash] uri: alias: url user_agent: alias: user_agent output: json: enable: yes # 本地输出 path: /var/log/spider/spider_detect.json redis: enable: no # 输出至Redis,联动情报数据进行研判。 host: redis_server port: 6379 db: 0 password: redis_password key: spider:feeds field: src_ip alert: - debug
告警输出
{ "detail": { "percent": { "cookieid": 81 }, "unique": { "cookieid": 133 } }, "tags": [ "enumerate", // 存在参数遍历行为 "suspicious-ua" // user_agent 超过阈值 ], "src_ip": "54.160.169.250", "url_path": "/image/cookieId.html", "http_host": "canon88.github.io", "unique_ua": 47, "alert": true, "beacon": {}, "args_list": [ "cookieid" ], "total_hits": 164, "url_sample": "/image/cookieId.html?cookieid=E99A3E54-5A81-2907-1372-339FFB70A464", "period": "1:00", "timestamp": "2020-06-02T11:07:59.276581" }
告警简述
根据以上告警内容。1小时内 IP: 54.160.169.250 总共访问了该接口164次且cookieid参数更换了133次,占到总请求量的81%。并更换了47个不同的user_agent。
写在最后
find_spider: 用于检测参数遍历的行为,这里加上 find_beacon 是为了增加一个周期性的检测维度。当然很多爬虫都会「自带」时间抖动,以及使用爬虫池,所以效果并不是特别明显。
find_beacon: 更适用于检测C2连接,例如针对DNS域名的请求这种情况,这里有一个检测到的域名周期性请求的告警:
{ "src_ip": "x.x.x.228", "hostname": "entitlement.service.imperva.com", "answers": [ "joxkwsf.x.incapdns.net", "45.60.73.51" ], "percent": "100", "interval": 1800, "occurrences": 23, "timestamp": "2020-06-01T08:03:38.164363", "period": 12, "event_type": "beaconing", "num_hits": 806379, "num_matches": 3, "kibana_url": "https://canon88.github.io/goto/5f089bcc411426b854da71b9062fdc8c" }
DNS周期性请求
参考
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- WorkManager:周期性任务
- 数组常见的遍历循环方法、数组的循环遍历的效率对比
- C++拾趣——STL容器的插入、删除、遍历和查找操作性能对比(Windows VirtualStudio)——遍历和删除
- Js遍历数组总结
- 遍历
- 遍历 DOM 注意点
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
首席产品官2 从白领到金领
车马 / 机械工业出版社 / 79元
《首席产品官》共2册,旨在为产品新人成长为产品行家,产品白领成长为产品金领,最后成长为首席产品官(CPO)提供产品认知、能力体系、成长方法三个维度的全方位指导。 作者在互联网领域从业近20年,是中国早期的互联网产品经理,曾是周鸿祎旗下“3721”的产品经理,担任CPO和CEO多年。作者将自己多年来的产品经验体系化,锤炼出了“产品人的能力杠铃模型”(简称“杠铃模型”),简洁、直观、兼容性好、实......一起来看看 《首席产品官2 从白领到金领》 这本书的介绍吧!