内容简介:由于一些需要,和抱着学习的目的,研读了下巡风这款相当优秀的扫描器代码。主要分析了下两个扫描的模块,对web端没有跟进看,当然重点也在扫描的部分。
由于一些需要,和抱着学习的目的,研读了下巡风这款相当优秀的扫描器代码。
https://github.com/ysrc/xunfeng
主要分析了下两个扫描的模块,对web端没有跟进看,当然重点也在扫描的部分。
分析的语句都以注释的形式标注在代码中了,由于能力有限,分析中的不足和错误欢迎指出。
整体架构逻辑
文件结构
│ Config.py # 配置文件 │ README.md # 说明文档 │ Run.bat # Windows启动服务 │ Run.py # webserver │ Run.sh # Linux启动服务,重新启动前需把进程先结束掉 │ ├─aider │ Aider.py # 辅助验证脚本 │ ├─db # 初始数据库结构 │ ├─masscan # 内置编译好的Masscan程序(CentOS win64适用),需要chmod+x给执行权限(root),若无法使用请自行编译安装。 ├─nascan │ │ NAScan.py # 网络资产信息抓取引擎 │ │ │ ├─lib │ │ common.py 其他方法 │ │ icmp.py # ICMP发送类 │ │ log.py # 日志输出 │ │ mongo.py # 数据库连接 │ │ scan.py # 扫描与识别 │ │ start.py # 线程控制 │ │ │ └─plugin │ masscan.py # 调用Masscan脚本 │ ├─views │ │ View.py # web请求处理 │ │ │ ├─lib │ │ Conn.py # 数据库公共类 │ │ CreateExcel.py # 表格处理 │ │ Login.py # 权限验证 │ │ QueryLogic.py # 查询语句解析 │ │ │ ├─static #静态资源目录 │ │ │ └─templates #模板文件目录 │ └─vulscan │ VulScan.py # 漏洞检测引擎 │ └─vuldb # 漏洞库目录
Run.sh
整个程序的开始就是从 Run.sh
开始的,可以先来看下起了哪些服务
#!/bin/bash CURRENT_PATH=`dirname $0` cd $CURRENT_PATH XUNFENG_LOG=/var/log/xunfeng XUNFENG_DB=/var/lib/mongodb [ ! -d $XUNFENG_LOG ] && mkdir -p ${XUNFENG_LOG} [ ! -d $XUNFENG_DB ] && mkdir -p ${XUNFENG_DB} nohup mongod --port 65521 --dbpath=${XUNFENG_DB} --auth > ${XUNFENG_LOG}/db.log & nohup python ./Run.py > ${XUNFENG_LOG}/web.log & nohup python ./aider/Aider.py > ${XUNFENG_LOG}/aider.log & nohup python ./nascan/NAScan.py > ${XUNFENG_LOG}/scan.log & nohup python ./vulscan/VulScan.py > ${XUNFENG_LOG}/vul.log &
可以看到主要起了如下四个服务
Run.py
from views.View import app if __name__ == '__main__': #app.debug = True app.run(threaded=True, port=80,host='0.0.0.0')
webserver可以看出这个是flask起的web端,里面主要是做一些数据的展示和修改的。由于不是扫描器的重点,这里就不具体分析了,可以自己看下代码。
Aider.py
辅助验证脚本,一个50行左右的单文件,使用socket完成了一个简单的DNS log平台。
NAScan.py
网络资产信息抓取引擎主要是调用 nascan
这个模块来进行网络资产(存活主机、开发端口、服务)的扫描。
VulScan.py
漏洞检测引擎主要是调用 vulscan/vuldb
中的poc进行漏洞检测。
nascan
模块结构
─nascan │ NAScan.py # 网络资产信息抓取引擎 │ ├─lib │ common.py 其他方法 │ icmp.py # ICMP发送类 │ log.py # 日志输出 │ mongo.py # 数据库连接 │ scan.py # 扫描与识别 │ start.py # 线程控制 └─plugin masscan.py # 调用Masscan脚本
从 NAScan.py
文件入口
# coding:utf-8 # author:wolf@YSRC import thread from lib.common import * from lib.start import * if __name__ == "__main__": try: CONFIG_INI = get_config() # 读取配置 log.write('info', None, 0, u'获取配置成功') # 日志记录 STATISTICS = get_statistics() # 读取统计信息 MASSCAN_AC = [0] # 标识符 masscan是否在使用 NACHANGE = [0] # 标识符 扫描列表是否被改变 thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE)) # 心跳线程 thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC)) # 失效记录删除线程 socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2) # 设置连接超时 ac_data = [] while True: now_time = time.localtime() now_hour = now_time.tm_hour now_day = now_time.tm_mday now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day) cy_day, ac_hour = CONFIG_INI['Cycle'].split('|') log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle'])) if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: # 判断是否进入扫描时段 ac_data.append(now_date) NACHANGE[0] = 0 log.write('info', None, 0, u'开始扫描') s = start(CONFIG_INI) s.masscan_ac = MASSCAN_AC s.statistics = STATISTICS s.run() # 开始扫描 time.sleep(60) except Exception, e: print e
准备工作
一开始是获取配置信息
def get_config(): config = {} # 从 mongodb 中读取`nascan`的配置,可以从navicat中看到Config集合中有`vulscan`和`nascan`的扫描配置 config_info = mongo.na_db.Config.find_one({"type": "nascan"}) for name in config_info['config']: # 对于cms识别、组件容器、动态语言、服务 的配置存储是使用`|`进行分割存储的 # 所以在取出之前要进行简单的格式化然后放到配置中 if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']: config[name] = format_config(name, config_info['config'][name]['value']) else: config[name] = config_info['config'][name]['value'] return config
然后是进行日志记录
# coding:utf-8 import threading import time import sys reload(sys) sys.setdefaultencoding('utf8') mutex = threading.Lock() # 线程互斥锁 def write(scan_type, host, port, info): mutex.acquire() # 上锁,避免多个进程输出,导致格式混乱 port = int(port) try: # 由于Run.sh中使用了nohup,所以`print`的输出会被输出到log文件中 time_str = time.strftime('%X', time.localtime(time.time())) if scan_type == 'portscan': print "[%s] %s:%d open" % (time_str, host, port) elif scan_type == 'server': print "[%s] %s:%d is %s" % (time_str, host, port, str(info)) elif scan_type == 'web': print "[%s] %s:%d is web" % (time_str, host, port) print "[%s] %s:%d web info %s" % (time_str, host, port, info) elif scan_type == 'active': print "[%s] %s active" % (time_str, host) elif scan_type == 'info': print "[%s] %s" % (time_str, info) except Exception, e: print 'logerror',e pass mutex.release()
之后进行读取统计信息
def get_statistics(): date_ = datetime.datetime.now().strftime('%Y-%m-%d') # 获取当日的统计信息 now_stati = mongo.na_db.Statistics.find_one({"date": date_}) if not now_stati: # 没有当日的信息则返回一个初始统计信息 now_stati = {date_: {"add": 0, "update": 0, "delete": 0}} return now_stati else: # 有则返回 return {date_: now_stati['info']}
两个监测线程
之后启动了两个现场,分别对应不同的功能
thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE)) # 心跳线程 thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC)) # 失效记录删除线程
monitor
monitor
心跳线程,主要用于判断扫描配置是否发生了变化
def monitor(CONFIG_INI, STATISTICS, NACHANGE): while True: try: time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') # 记录心跳 mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}}) if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0} # 更新统计信息 mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True) new_config = get_config() # 获取最新配置 # 比较配置扫描列表的base64是否相同,不同则置NACHANGE[0]为1 if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1 CONFIG_INI.clear() CONFIG_INI.update(new_config) # 更新新配置 except Exception, e: print e time.sleep(30) # 每30秒检测一次
回到 NAScan.py
中可以看到
# 判断是否达到了一个扫描的周期,或者心跳线程是否检测到扫描列表更新 # 因为上面可以看到base64不同时会将NACHANGE[0]置于1 # 至于为什么要传入NACHANGE[0]这样一个列表,而不是一个flag的int值(因为列表是引用啊! if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]:
cruise
然后是 cruise
失效记录删除线程
def cruise(STATISTICS,MASSCAN_AC): while True: now_str = datetime.datetime.now() week = int(now_str.weekday()) hour = int(now_str.hour) if week >= 1 and week <= 5 and hour >= 9 and hour <= 18: # 非工作时间不删除 try: # 获取扫描信息记录 data = mongo.NA_INFO.find().sort("time", 1) for history_info in data: while True: # 如果masscan正在扫描即不进行清理 # 在后期可以看到在用masscan进行扫描的时候会置1 if MASSCAN_AC[0]: time.sleep(10) else: break ip = history_info['ip'] port = history_info['port'] try: # 检测端口是否存活 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((ip, int(port))) sock.close() except Exception, e: time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') # 不存活则删除改记录 mongo.NA_INFO.remove({"ip": ip, "port": port}) log.write('info', None, 0, '%s:%s delete' % (ip, port)) # 日志记录 STATISTICS[date_]['delete'] += 1 del history_info["_id"] history_info['del_time'] = time_ history_info['type'] = 'delete' # 添加一条操作历史 mongo.NA_HISTORY.insert(history_info) except: pass time.sleep(3600) # 60分钟检测一次
start.py
回到 NAScan.py
, 前期的一些工作已经做完了,后面就可以进入 while True
的扫描循环了
now_time = time.localtime() now_hour = now_time.tm_hour now_day = now_time.tm_mday now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day) # 获取资产探测周期 cy_day, ac_hour = CONFIG_INI['Cycle'].split('|') log.write('info', None, 0, u'扫描规则: ' + str(CONFIG_INI['Cycle'])) if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: # 判断是否进入扫描时段 ac_data.append(now_date) # 判断是否扫描过的列表 NACHANGE[0] = 0 # 置0, log.write('info', None, 0, u'开始扫描') s = start(CONFIG_INI) s.masscan_ac = MASSCAN_AC s.statistics = STATISTICS s.run() # 开始扫描 time.sleep(60)
s = start(CONFIG_INI)
初始化了一个 start
类
class start: def __init__(self, config): # 传入CONFIG_INI 配置,然后设置类的属性 self.config_ini = config self.queue = Queue.Queue() self.thread = int(self.config_ini['Thread']) self.scan_list = self.config_ini['Scan_list'].split('\n') self.mode = int(self.config_ini['Masscan'].split('|')[0]) self.icmp = int(self.config_ini['Port_list'].split('|')[0]) self.white_list = self.config_ini.get('White_list', '').split('\n')
然后回来额外设置了 masscan_ac
和 statistics
两个引用标识符(因为要与其他线程共享对它的修改,相当于全局变量
然后启动 s.run()
开始扫描
def run(self): # 在start.py中定义的全局变量,端口列表 global AC_PORT_LIST all_ip_list = [] for ip in self.scan_list: # 处理CIDR格式的ip, eg:192.168.0.1/24 # 就不具体跟进看了,大约40行左右,涉及一些位运算格式转换啥的 if "/" in ip: ip = cidr.CIDR(ip) if not ip:continue # 处理 192.168.0.1-192.168.0.255 这类范围ip ip_list = self.get_ip_list(ip) # 对于白名单ip进行移除 for white_ip in self.white_list: if white_ip in ip_list: ip_list.remove(white_ip) # 是否开始了masscan扫描,开启了mode置为1,否则为0 if self.mode == 1: # 使用masscan扫描 # 获取文件路径 self.masscan_path = self.config_ini['Masscan'].split('|')[2] # 获取扫描速率 self.masscan_rate = self.config_ini['Masscan'].split('|')[1] # 获取存活的ip ip_list = self.get_ac_ip(ip_list) self.masscan_ac[0] = 1 AC_PORT_LIST = self.masscan(ip_list) # 如果安装了Masscan即使用Masscan进行全端口扫描 if not AC_PORT_LIST: continue self.masscan_ac[0] = 0 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 加入队列 self.scan_start() # 开始扫描 else: all_ip_list.extend(ip_list) if self.mode == 0: # 不使用masscan扫描 if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list) for ip_str in all_ip_list: self.queue.put(ip_str) # 加入队列 self.scan_start() # TCP探测模式开始扫描
探测存活ip
self.get_ac_ip()
是通过ping请求来探测主机存活,后期只对存活主机进行扫描
def get_ac_ip(self, ip_list): try: s = icmp.Nscan() ipPool = set(ip_list) return s.mPing(ipPool) except Exception, e: print 'The current user permissions unable to send icmp packets' return ip_list
跟到 s.mPing()
中
def mPing(self, ipPool): # 获得icmp的socket Sock = self.__icmpSocket Sock.settimeout(self.timeout) # 设置icmp数据报 packet = self.__icmpPacket recvFroms = set() # 初始化一个多线程的icmp请求类 sendThr = SendPingThr(ipPool, packet, Sock, self.timeout) # 启动多线程icmp扫描 sendThr.start() while True: try: # 获取返回的ip地址 ac_ip = Sock.recvfrom(1024)[1][0] if ac_ip not in recvFroms: log.write("active", ac_ip, 0, None) # 添加存活ip到`recvForms` recvFroms.add(ac_ip) except Exception: pass finally: if not sendThr.isAlive(): break # 返回两个集合的交集 return recvFroms & ipPool
SendPingThr
类
class SendPingThr(threading.Thread): def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3): threading.Thread.__init__(self) self.Sock = icmpSocket self.ipPool = ipPool self.packet = icmpPacket self.timeout = timeout self.Sock.settimeout(timeout + 1) def run(self): for ip in self.ipPool: try: self.Sock.sendto(self.packet, (ip, 0)) except socket.timeout: break except: pass time.sleep(self.timeout)
masscan扫描全端口
这样就依次将存活的ip返回到了 start.py
中的 run()
中
# 获取到返回的存活ip ip_list = self.get_ac_ip(ip_list) # 将masscan_ac[0]置1,表示masscan正在使用 self.masscan_ac[0] = 1 # 利用masscan进行全端口扫描 AC_PORT_LIST = self.masscan(ip_list) if not AC_PORT_LIST: continue # 将masscan_ac[0]置0 self.masscan_ac[0] = 0 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 加入队列 self.scan_start() # 开始扫描
跟进 self.masscan()
函数
def masscan(self, ip): try: if len(ip) == 0: return sys.path.append(sys.path[0] + "/plugin") m_scan = __import__("masscan") result = m_scan.run(ip, self.masscan_path, self.masscan_rate) return result except Exception, e: print e print 'No masscan plugin detected'
跟进 m_scan.run()
import os def run(ip_list,path,rate): try: ip_file = open('target.log','w') # 将存活的ip列表写到target.log中 ip_file.write("\n".join(ip_list)) ip_file.close() # 进行过滤一些危险字符 #(issue中也有提到,并不能完全保证后台的安全,主要还是保证对密钥的管理 path = str(path).translate(None, ';|&`\n') rate = str(rate).translate(None, ';|&`\n') if not os.path.exists(path):return # 用系统命令进行masscan全端口扫描 os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate)) # 读取扫描结果 result_file = open('tmp.log', 'r') result_json = result_file.readlines() result_file.close() del result_json[0] del result_json[-1] open_list = {} # 对扫描结果进行格式化处理 for res in result_json: try: ip = res.split()[3] port = res.split()[2] if ip in open_list: open_list[ip].append(port) else: open_list[ip] = [port] except:pass os.remove('target.log') os.remove('tmp.log') # 返回扫描结果 return open_list except: pass
这样,再次回到 start.py
的 run()
中
# 用masscan进行全端口扫描 AC_PORT_LIST = self.masscan(ip_list) if not AC_PORT_LIST: continue # 将self.masscan_ac[0]置0,表示结束使用 self.masscan_ac[0] = 0 # 将扫描结果存入队列中 for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 开始扫描 self.scan_start()
scan.py
前期准备
self.scan_start()
def scan_start(self): for i in range(self.thread): # 开始扫描 t = ThreadNum(self.queue) t.setDaemon(True) t.mode = self.mode t.config_ini = self.config_ini t.statistics = self.statistics t.start() self.queue.join()
跟进 ThreadNum
类
class ThreadNum(threading.Thread): def __init__(self, queue): # 赋值扫描队列 threading.Thread.__init__(self) self.queue = queue def run(self): while True: try: # 非阻塞模式 task_host = self.queue.get(block=False) except: break try: if self.mode: # 开启masscan扫描则使用扫描出的存活端口 port_list = AC_PORT_LIST[task_host] else: # 否则扫描特定的端口 port_list = self.config_ini['Port_list'].split('|')[1].split('\n') _s = scan.scan(task_host, port_list) # 初始化scan _s.config_ini = self.config_ini # 提供配置信息 _s.statistics = self.statistics # 提供统计信息 _s.run() # 启动 except Exception, e: print e finally: self.queue.task_done()
跟到 scan
类的 run()
方法中
def run(self): self.timeout = int(self.config_ini['Timeout']) # 获取timeout for _port in self.port_list: self.server = '' self.banner = '' self.port = int(_port) self.scan_port() # 端口扫描 if not self.banner:continue #无banner则跳过(`NULL`表示暂未检测出,不会continue self.server_discern() # 服务识别 if self.server == '': web_info = self.try_web() # 尝试web访问 if web_info: # log记录 log.write('web', self.ip, self.port, web_info) time_ = datetime.datetime.now() # 将扫描结果存入mongodb mongo.NA_INFO.update({'ip': self.ip, 'port': self.port}, {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}})
端口扫描
先是进行了 self.scan_port()
端口扫描
def scan_port(self): try: # 进行socket连接 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.connect((self.ip, self.port)) time.sleep(0.2) except Exception, e: return try: # 获取banner信息 self.banner = sock.recv(1024) sock.close() # 小于等于2则置为'NULL' if len(self.banner) <= 2: self.banner = 'NULL' except Exception, e: # 异常情况也置为'NULL' self.banner = 'NULL' # 日志记录 log.write('portscan', self.ip, self.port, None) banner = '' hostname = self.ip2hostname(self.ip) time_ = datetime.datetime.now() date_ = time_.strftime('%Y-%m-%d') try: # 进行unicode转换 banner = unicode(self.banner, errors='replace') if self.banner == 'NULL': banner = '' # 添加一条info信息 mongo.NA_INFO.insert({"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_}) # 统计信息+1 self.statistics[date_]['add'] += 1 except: if banner: # 原子操作,删除已存在的记录 history_info = mongo.NA_INFO.find_and_modify( query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True) if history_info: # 新增info记录 mongo.NA_INFO.insert( {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_}) # 统计信息+1 self.statistics[date_]['update'] += 1 # 删除原先的_id del history_info["_id"] history_info['del_time'] = time_ history_info['type'] = 'update' # 更新type和del_time之后插入一条新历史记录 mongo.NA_HISTORY.insert(history_info)
进行socket连接的时候,例如一些ssh之类的服务,会返回一些banner信息
服务识别
def server_discern(self): # 先尝试进行利用配置中的`Discern_server`进行快速匹配识别 for mark_info in self.config_ini['Discern_server']: try: name, default_port, mode, reg = mark_info if mode == 'default': # default表示用特定端口,匹配特定服务 if int(default_port) == self.port: self.server = name elif mode == 'banner': # 利用banner信息进行正则匹配检测 matchObj = re.search(reg, self.banner, re.I | re.M) if matchObj: self.server = name if self.server:break except: continue # 对于未检测出服务并且端口不为80、443、8080的端口进行检测 if not self.server and self.port not in [80,443,8080]: for mark_info in self.config_ini['Discern_server']: # 发包识别 try: name, default_port, mode, reg = mark_info if mode not in ['default','banner']: # 进行发送特定的socket包获取banner信息,进行再次匹配 dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) dis_sock.connect((self.ip, self.port)) mode = mode.decode('string_escape') reg = reg.decode('string_escape') dis_sock.send(mode) time.sleep(0.3) dis_recv = dis_sock.recv(1024) dis_sock.close() matchObj = re.search(reg, dis_recv, re.I | re.M) if matchObj: self.server = name break except: pass if self.server: # 对于检测到的服务,进行log和info的记录 log.write("server", self.ip, self.port, self.server) mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})
config_ini['Discern_server']
中的值
config_ini['Discern_server']
中的特定socket数据包
web访问
def try_web(self): title_str, html = '', '' try:# 进行http/https请求,获取响应报文 if self.port == 443: # 对于443端口的使用https协议 info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout) else: info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout) html = info.read() header = info.headers except urllib2.HTTPError, e: html = e.read() header = e.headers except: return if not header: return # 对于gzip格式的响应,进行解压gzip if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']: html_data = StringIO.StringIO(html) gz = gzip.GzipFile(fileobj=html_data) html = gz.read() try: # 格式转码 html_code = self.get_code(header, html).strip() if html_code and len(html_code) < 12: html = html.decode(html_code).encode('utf-8') except: pass try: # 获取titile信息 title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M) if title: title_str = title.group(1) except: pass try: # 将响应的http报文设置成banner信息 web_banner = str(header) + "\r\n\r\n" + html self.banner = web_banner # 添加记录 history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port}) if 'server' not in history_info: tag = self.get_tag() web_info = {'title': title_str, 'tag': tag} return web_info else: if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner)) > len(web_banner) / 60: del history_info['_id'] history_info['del_time'] = datetime.datetime.now() mongo.NA_HISTORY.insert(history_info) tag = self.get_tag() web_info = {'title': title_str, 'tag': tag} date_ = datetime.datetime.now().strftime('%Y-%m-%d') self.statistics[date_]['update'] += 1 log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port)) return web_info except: return
get_tag
()
def get_tag(self): try: url = self.ip + ':' + str(self.port) # 对web服务进行cms、组件容器、动态语言的识别 tag = map(self.discern, ['Discern_cms', 'Discern_con', 'Discern_lang'], [url, url, url]) # 过滤掉未识别出的服务 return filter(None, tag) except Exception, e: return
discern()
def discern(self, dis_type, domain): file_tmp = {} if int(domain.split(":")[1]) == 443: # http/https处理 protocol = "https://" else: protocol = "http://" try: # http请求 req = urllib2.urlopen(protocol + domain, timeout=self.timeout) header = req.headers html = req.read() except urllib2.HTTPError, e: html = e.read() header = e.headers except Exception, e: return # 对于'Discern_cms', 'Discern_con', 'Discern_lang'在数据库中都有自己的识别判断方式 for mark_info in self.config_ini[dis_type]: if mark_info[1] == 'header': try: if not header: return # 通过header方式则对对应的http头中的值进行匹配 # 如存在PHPSSIONID之类的值判定为php if re.search(mark_info[3], header[mark_info[2]], re.I): return mark_info[0] except Exception, e: continue elif mark_info[1] == 'file': if mark_info[2] == 'index': try: if not html: return # 对于file index方式利用文件后缀,如1.php这样判断为 php 语言 if re.search(mark_info[3], html, re.I): return mark_info[0] except Exception, e: continue else: # 防止重复检测 if mark_info[2] in file_tmp: re_html = file_tmp[mark_info[2]] else: # 访问指定的robots.txt之类的文件 try: re_html = urllib2.urlopen(protocol + domain + "/" + mark_info[2], timeout=self.timeout).read() except urllib2.HTTPError, e: re_html = e.read() except Exception, e: return file_tmp[mark_info[2]] = re_html try: # 检测指定文件中是否存在特定关键字 # 如robots.txt中存在'php168'则为php168cms if re.search(mark_info[3], re_html, re.I): return mark_info[0] except Exception, e: print mark_info[3]
config_ini[Discern_lang]
中的值
config_ini[Discern_cms]
中的值
最后回到 run()
中
web_info = self.try_web() # 尝试web访问 if web_info: # 检测完web特征之后,就是进行简单的log记录,和更新数据库中info的值 log.write('web', self.ip, self.port, web_info) time_ = datetime.datetime.now() mongo.NA_INFO.update({'ip': self.ip, 'port': self.port}, {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info, 'time': time_}})
到这里,scan的扫描也就结束了,回到 start
类的 run()
中,剩下的就是不使用masscan的扫描
if self.mode == 0: # 不使用masscan扫描 # 如果设置了icmp检测,会对ip列表进行存活检测,只扫描存活ip if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list) for ip_str in all_ip_list: self.queue.put(ip_str) # 加入队列 self.scan_start() # TCP探测模式开始扫描
这里的扫描过程中将ip列表改成了 all_ip_list
,其余的扫描过程也是通过 scan_start()
来调用 scan
类进行扫描。
到这里,整个 NAScan
资产扫描过程也就完成了,每次扫描完会sleep60秒,然后再次循环这个过程。
vulscan
用于对扫出的资产进行漏洞扫描,具体的扫描过程依赖于 vuldb
中的插件形式进行扫描,做到可插拔的模式
json格式的插件
转换成json形式后就是
{ "name" : "Axis2信息泄露", "info" : "HappyAxis.jsp 页面存在系统敏感信息。", "level" : "低危", "type" : "信息泄露", "author" : "wolf@YSRC", "url": "", "keyword" : "tag:axis2", "source" : 1, "plugin" : { "url" : "/axis2/axis2-web/HappyAxis.jsp", "tag" : "敏感信息泄露", "analyzing" : "keyword", "analyzingdata" : "Axis2 Happiness Page", "data" : "", "method" : "GET" } }
python脚本格式的插件
# coding:utf-8 import ftplib def get_plugin_info(): # 插件描述信息 plugin_info = { "name": "FTP弱口令", "info": "导致敏感信息泄露,严重情况可导致服务器被入侵控制。", "level": "高危", "type": "弱口令", "author": "wolf@YSRC", "url": "", "keyword": "server:ftp", # 推荐搜索关键字 } return plugin_info def check(ip, port, timeout): # 漏洞检测代码 user_list = ['ftp', 'www', 'admin', 'root', 'db', 'wwwroot', 'data', 'web'] for user in user_list: for pass_ in PASSWORD_DIC: # 密码字典无需定义,程序会自动为其赋值。 pass_ = str(pass_.replace('{user}', user)) try: ftp = ftplib.FTP() ftp.timeout = timeout ftp.connect(ip, port) ftp.login(user, pass_) if pass_ == '': pass_ = 'null' if user == 'ftp' and pass_ == 'ftp': return u"可匿名登录" return u"存在弱口令,账号:%s,密码:%s" % (user, pass_) # 成功返回结果,内容显示在扫描结果页面。 except: pass
扫描过程较资产扫描偏简单些,一个280行左右的单文件
一开始定义了一些全局变量
# 添加系统路径 sys.path.append(sys.path[0] + '/vuldb') sys.path.append(sys.path[0] + "/../") # 获取mongodb账号配置 from Config import ProductionConfig # 进行mongodb认证连接 db_conn = pymongo.MongoClient(ProductionConfig.DB, ProductionConfig.PORT) na_db = getattr(db_conn, ProductionConfig.DBNAME) na_db.authenticate(ProductionConfig.DBUSERNAME, ProductionConfig.DBPASSWORD) # 做了几个集合的简化操作 na_task = na_db.Task na_result = na_db.Result na_plugin = na_db.Plugin na_config = na_db.Config na_heart = na_db.Heartbeat # 线程锁 lock = thread.allocate() # 一些全局变量 PASSWORD_DIC = [] THREAD_COUNT = 50 TIMEOUT = 10 PLUGIN_DB = {} TASK_DATE_DIC = {} WHITE_LIST = []
然后开始运行流程
if __name__ == '__main__': init() # 进行init初始化操作 PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() # 获取配置 thread.start_new_thread(monitor, ()) # 启动监控线程 while True: task_id, task_plan, task_target, task_plugin = queue_get() # 任务信息获取 if task_id == '': time.sleep(10) continue if PLUGIN_DB: del sys.modules[PLUGIN_DB.keys()[0]] # 清理插件缓存 PLUGIN_DB.clear() for task_netloc in task_target: while True: if int(thread._count()) < THREAD_COUNT: if task_netloc[0] in WHITE_LIST: break thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin)) break else: time.sleep(2) if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})
准备工作
init
用于获取插件的信息
def init(): # 若数据库中存在插件信息,则直接返回,否则重新获取 if na_plugin.find().count() >= 1: return script_plugin = [] json_plugin = [] # 获取vuldb中的插件 file_list = os.listdir(sys.path[0] + '/vuldb') time_ = datetime.datetime.now() for filename in file_list: try: # 插件分为json和py两种格式 if filename.split('.')[1] == 'py': script_plugin.append(filename.split('.')[0]) if filename.split('.')[1] == 'json': json_plugin.append(filename) except: pass for plugin_name in script_plugin: try: # py格式的插件直接导入,然后读取对于变量,插入到mongodb中 res_tmp = __import__(plugin_name) plugin_info = res_tmp.get_plugin_info() plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 na_plugin.insert(plugin_info) except: pass for plugin_name in json_plugin: try: # json格式的插件,用json解析后读取对应变量,插入到mongodb中 json_text = open(sys.path[0] + '/vuldb/' + plugin_name, 'r').read() plugin_info = json.loads(json_text) plugin_info['add_time'] = time_ plugin_info['filename'] = plugin_name plugin_info['count'] = 0 del plugin_info['plugin'] na_plugin.insert(plugin_info) except: pass
get_config
def get_config(): try: config_info = na_config.find_one({"type": "vulscan"}) pass_row = config_info['config']['Password_dic'] thread_row = config_info['config']['Thread'] timeout_row = config_info['config']['Timeout'] white_row = config_info['config']['White_list'] password_dic = pass_row['value'].split('\n') thread_count = int(thread_row['value']) timeout = int(timeout_row['value']) white_list = white_row['value'].split('\n') return password_dic, thread_count, timeout, white_list except Exception, e: print e
和之前nascan中的读取配置类似,只是这回读的是type为vulscan的配置
读取弱口令、线程数、timeout、白名单之类的配置参数,然后返回
monitor
新起了个 monitor
监测线程,监测插件的使用情况
def monitor(): # 引入全局变量 global PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST while True: # 获取正在执行的任务 queue_count = na_task.find({"status": 0, "plan": 0}).count() if queue_count: # 如果有正在执行的任务,则置为1 load = 1 else: # 否则根据当前线程数,来判断插件是否在被使用 ac_count = thread._count() load = float(ac_count - 4) / THREAD_COUNT if load > 1: load = 1 if load < 0: load = 0 # 更新mongodb中的heatbeat集合,有插件正在扫描 na_heart.update({"name": "load"}, {"$set": {"value": load, "up_time": datetime.datetime.now()}}) PASSWORD_DIC, THREAD_COUNT, TIMEOUT, WHITE_LIST = get_config() # 然后根据load值进行不同时间的休眠 if load > 0: time.sleep(8) else: time.sleep(60)
然后进入到 while True
的循环
通过 queue_get()
进行任务参数的获取
def queue_get(): global TASK_DATE_DIC # 获取未加载的task,更新为启动状态 task_req = na_task.find_and_modify(query={"status": 0, "plan": 0}, update={"$set": {"status": 1}}, sort={'time': 1}) if task_req: # 如果存在,在TASK_DATE_DIC记录task,然后返回任务信息 TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now() return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] else: # 获取 plan != 0 的task列表 task_req_row = na_task.find({"plan": {"$ne": 0}}) if task_req_row: for task_req in task_req_row: # 判断是否需要再次启动任务 if (datetime.datetime.now() - task_req['time']).days / int(task_req['plan']) >= int(task_req['status']): if task_req['isupdate'] == 1: # 任务更新后,需要重新从info集合中获取ip和port # 更新task集合的target task_req['target'] = update_target(json.loads(task_req['query'])) na_task.update({"_id": task_req['_id']}, {"$set": {"target": task_req['target']}}) # 更新task集合中的status自增1 na_task.update({"_id": task_req['_id']}, {"$inc": {"status": 1}}) # 在TASK_DATE_DIC记录task TASK_DATE_DIC[str(task_req['_id'])] = datetime.datetime.now() # 返回task信息 return task_req['_id'], task_req['plan'], task_req['target'], task_req['plugin'] return '', '', '', ''
回到 __main__
中
# 获取任务信息 task_id, task_plan, task_target, task_plugin = queue_get() if task_id == '': # 没有获取到task配置则sleep10秒后继续获取 time.sleep(10) continue if PLUGIN_DB: # 当有插件缓存时清理插件缓存 # 后面扫描时会导入插件模块,删除之前导入的模块 del sys.modules[PLUGIN_DB.keys()[0]] PLUGIN_DB.clear() for task_netloc in task_target: while True: # 控制线程数 if int(thread._count()) < THREAD_COUNT: # 剔除白名单ip if task_netloc[0] in WHITE_LIST: break # 启动vulscan扫描线程 thread.start_new_thread(vulscan, (task_id, task_netloc, task_plugin)) break else: time.sleep(2) # task_plan == 0 为一次性任务 # 更新 status = 2 if task_plan == 0: na_task.update({"_id": task_id}, {"$set": {"status": 2}})
vulscan
__init__
def __init__(self, task_id, task_netloc, task_plugin): self.task_id = task_id self.task_netloc = task_netloc self.task_plugin = task_plugin self.result_info = '' self.start()
设置好类变量,然后进入 start()
def start(self): self.get_plugin_info() if '.json' in self.plugin_info['filename']: # json检测模式 try: self.load_json_plugin() # 读取漏洞标示 self.set_request() # 标示符转换为请求 self.poc_check() # 检测 except Exception, e: return else: # py脚本检测模式 plugin_filename = self.plugin_info['filename'] self.log(str(self.task_netloc) + "call " + self.task_plugin) if task_plugin not in PLUGIN_DB: plugin_res = __import__(plugin_filename) setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 给插件声明密码字典 PLUGIN_DB[plugin_filename] = plugin_res try: self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT) except: return self.save_request() # 保存结果
json格式检测
先 load_json_plugin()
加载配置脚本
def get_plugin_info(self): info = na_plugin.find_one({"name": self.task_plugin}) self.plugin_info = info
然后转换为http请求,返回请求句柄
def set_request(self): # 构建url url = 'http://' + self.task_netloc[0] + ":" + str(self.task_netloc[1]) + self.plugin_info['plugin']['url'] if self.plugin_info['plugin']['method'] == 'GET': # 进行GET请求 request = urllib2.Request(url) else: # 否则进行post请求 request = urllib2.Request(url, self.plugin_info['plugin']['data']) self.poc_request = request
然后验证poc是否有效
def poc_check(self): try: # 进行http请求,获取header和body信息 res = urllib2.urlopen(self.poc_request, timeout=30) res_html = res.read(204800) header = res.headers # res_code = res.code except urllib2.HTTPError, e: # res_code = e.code header = e.headers res_html = e.read(204800) except Exception, e: return try: # 获取编码,然后转码 html_code = self.get_code(header, res_html).strip() if html_code and len(html_code) < 12: res_html = res_html.decode(html_code).encode('utf-8') except: pass an_type = self.plugin_info['plugin']['analyzing'] vul_tag = self.plugin_info['plugin']['tag'] analyzingdata = self.plugin_info['plugin']['analyzingdata'] if an_type == 'keyword': # 如果是关键词检测,判断是正则还是MD5的检测,然后进行比对 if analyzingdata.encode("utf-8") in res_html: self.result_info = vul_tag elif an_type == 'regex': if re.search(analyzingdata, res_html, re.I): self.result_info = vul_tag elif an_type == 'md5': md5 = hashlib.md5() md5.update(res_html) # 比对成功,则返回插件tag if md5.hexdigest() == analyzingdata: self.result_info = vul_tag
py脚本检测
# 获取插件文件名 plugin_filename = self.plugin_info['filename'] self.log(str(self.task_netloc) + "call " + self.task_plugin) if task_plugin not in PLUGIN_DB: # 不在PLUGIN_DB中则导入 plugin_res = __import__(plugin_filename) setattr(plugin_res, "PASSWORD_DIC", PASSWORD_DIC) # 给插件声明密码字典 PLUGIN_DB[plugin_filename] = plugin_res # 添加到PLUGIN_DB中 try: # 启用py脚本的check方法,并设置timeout self.result_info = PLUGIN_DB[plugin_filename].check(str(self.task_netloc[0]), int(self.task_netloc[1]),TIMEOUT) except: return
保存请求结果
def save_request(self): # 判断是否扫描出结果了 if self.result_info: try: time_ = datetime.datetime.now() self.log(str(self.task_netloc) + " " + self.result_info) # 没有这条扫描记录则插件扫出的记录+1 v_count = na_result.find( {"ip": self.task_netloc[0], "port": self.task_netloc[1], "info": self.result_info}).count() if not v_count: na_plugin.update({"name": self.task_plugin}, {"$inc": {'count': 1}}) vulinfo = {"vul_name": self.plugin_info['name'], "vul_level": self.plugin_info['level'], "vul_type": self.plugin_info['type']} w_vul = {"task_id": self.task_id, "ip": self.task_netloc[0], "port": self.task_netloc[1], "vul_info": vulinfo, "info": self.result_info, "time": time_, "task_date": TASK_DATE_DIC[str(self.task_id)]} # 添加扫描结果记录 na_result.insert(w_vul) except Exception, e: pass
到此也就完成了vulscan的扫描过程。
最后
巡风中对于扫描的分工,多线程的处理都有很多值得学习和借鉴的地方。而且几乎都有增加一些心跳线程,用于监测。
分析中难免有些不足或者错误,欢迎大佬们指出!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Refactoring
Martin Fowler、Kent Beck、John Brant、William Opdyke、Don Roberts / Addison-Wesley Professional / 1999-7-8 / USD 64.99
Refactoring is about improving the design of existing code. It is the process of changing a software system in such a way that it does not alter the external behavior of the code, yet improves its int......一起来看看 《Refactoring》 这本书的介绍吧!