Windows主机漏洞扫描工具

栏目: IT技术 · 发布时间: 4年前

内容简介:这是一款基于主机的漏洞扫描工具,采用多线程确保可以快速的请求数据,采用线程锁可以在向sqlite数据库中写入数据避免database is locked的错误,采用md5哈希算法确保数据不重复插入。本工具查找是否有公开exp的网站为shodan,该网站限制网络发包的速度,因而采用了单线程的方式,且耗时较长。功能:

0×00 说明:

这是一款基于主机的漏洞扫描工具,采用多线程确保可以快速的请求数据,采用线程锁可以在向 sqlite 数据库中写入数据避免database is locked的错误,采用md5哈希算法确保数据不重复插入。

工具 查找是否有公开exp的网站为shodan,该网站限制网络发包的速度,因而采用了单线程的方式,且耗时较长。

功能:

查找主机上具有的CVE

查找具有公开EXP的CVE

Windows主机漏洞扫描工具

0×01 起因:

因为需要做一些主机漏洞扫描方面的工作,因而编写了这个简单的工具。之前也查找了几款类似的工具,如下:

vulmap :

vulmon开发的一款开源工具,原理是根据软件的名称和版本号来确定,是否有CVE及公开的EXP。这款 Linux 的工具挺好用,但是对于Windows系统层面不太适用。

windows-exp-suggester :

这款和本工具的原理一样,尝试使用了之后,发现它的CVEKB数据库只更新到2017年的,并且没有给出CVE是否有公开的EXP信息。

基于以上所以写了这个简单的工具,该项目在 https://github.com/chroblert/WindowsVulnScan

0×02 原理:

1. 搜集CVE与KB的对应关系。首先在微软官网上收集CVE与KB对应的关系,然后存储进数据库中
2. 查找特定CVE网上是否有公开的EXP
3. 利用powershell脚本收集主机的一些系统版本与KB信息
4. 利用系统版本与KB信息搜寻主机上具有存在公开EXP的CVE

0×03 参数:

 # author: JC0o0l
 # GitHub: https://github.com/chroblert/
 可选参数:
   -h, --help            show this help message and exit
   -u, --update-cve      更新CVEKB数据
   -U, --update-exp      更新CVEEXP数据
   -C, --check-EXP       检索具有EXP的CVE
   -f FILE, --file FILE  ps1脚本运行后产生的.json文件

0×04 示例:

1. 首先运行powershell脚本 KBCollect.ps 收集一些信息

 .\KBCollect.ps1

2. 将运行后产生的 KB.json 文件移动到 cve-check.py 所在的目录

3. 安装一些 python 3模块

 python3 -m pip install requirements.txt

4. 运行 cve-check.py -u 创建CVEKB数据库

5. 运行 cve-check.py -U 更新CVEKB数据库中的 hasPOC 字段

6. 运行 cve-check.py -C -f KB.json 查看具有公开EXP的CVE,如下:

Windows主机漏洞扫描工具

0×05 源码:

KBCollect.ps1 :

 function Get-CollectKB(){
     # 1. 搜集所有的KB补丁
     $KBArray = @()
     $KBArray = Get-HotFix|ForEach-Object {$_.HotFixId}
     $test = $KBArray|ConvertTo-Json
     return $test
 }
 function Get-BasicInfo(){
     # 1. 操作系统
     # $windowsProductName = (Get-ComputerInfo).WindowsProductName
     $windowsProductName = (Get-CimInstance Win32_OperatingSystem).Caption
     # 2. 操作系统版本
     $windowsVersion = (Get-ComputerInfo).WindowsVersion
     $basicInfo = "{""windowsProductName"":""$windowsProductName"",""windowsVersion"":""$windowsVersion""}"
     return $basicInfo
     
 }
 $basicInfo = Get-BasicInfo
 $KBList = Get-CollectKB
 $KBResult = "{""basicInfo"":$basicInfo,""KBList"":$KBList}"
 $KBResult|Out-File KB.json -encoding utf8

cve-check.py :

 import requests
 import sqlite3
 import json
 import hashlib
 import math
 import re
 import threading
 import time
 import argparse
 from pathlib import Path
 # 删除一些ssl 认证的warnging信息
 requests.packages.urllib3.disable_warnings()
 ​
 ThreadCount=20
 DBFileName="CVEKB.db"
 TableName="CVEKB"
 insertSQL = []
 updateSQL = []
 lock = threading.Lock()
 KBResult = {}
 parser = argparse.ArgumentParser()
 parser.add_argument("-u","--update-cve",help="更新CVEKB数据",action="store_true")
 parser.add_argument("-U","--update-exp",help="更新CVEEXP数据",action="store_true")
 parser.add_argument("-C","--check-EXP",help="检索具有EXP的CVE",action="store_true")
 parser.add_argument("-f","--file",help="ps1脚本运行后产生的.json文件")
 args = parser.parse_args()
 ​
 class CVEScanThread(threading.Thread):
     def __init__(self,func,args,name="",):
         threading.Thread.__init__(self)
         self.func = func
         self.args = args
         self.name = name 
         self.result = None
 ​
     def run(self):
         print("thread:{} :start scan page {}".format(self.args[1],self.args[0]))
         self.result = self.func(self.args[0],)
         print("thread:{} :stop scan page {}".format(self.args[1],self.args[0]))
 class EXPScanThread(threading.Thread):
     def __init__(self,func,args,name="",):
         threading.Thread.__init__(self)
         self.func = func
         self.args = args
         self.name = name 
         self.result = None
 ​
     def run(self):
         print("thread:{} :start scan CVE: {},xuehao:{}".format(self.args[1],self.args[0],self.args[2]))
         self.result = self.func(self.args[0],)
         print("thread:{} :stop scan CVE: {}".format(self.args[1],self.args[0]))
     def get_result(self):
         threading.Thread.join(self)
         try:
             return self.result
         except Exception:
             return "Error"
 def get_page_num(num=1,pageSize=100):
     url = "https://portal.msrc.microsoft.com/api/security-guidance/en-us"
     payload = "{\"familyIds\":[],\"productIds\":[],\"severityIds\":[],\"impactIds\":[],\"pageNumber\":" + str(num) + ",\"pageSize\":" + str(pageSize) + ",\"includeCveNumber\":true,\"includeSeverity\":false,\"includeImpact\":false,\"orderBy\":\"publishedDate\",\"orderByMonthly\":\"releaseDate\",\"isDescending\":true,\"isDescendingMonthly\":true,\"queryText\":\"\",\"isSearch\":false,\"filterText\":\"\",\"fromPublishedDate\":\"01/01/1998\",\"toPublishedDate\":\"03/02/2020\"}"
     headers = {
         'origin': "https//portal.msrc.microsoft.com",
         'referer': "https//portal.msrc.microsoft.com/en-us/security-guidance",
         'accept-language': "zh-CN",
         'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299",
         'accept': "application/json, text/plain, */*",
         'accept-encoding': "gzip, deflate",
         'host': "portal.msrc.microsoft.com",
         'connection': "close",
         'cache-control': "no-cache",
         'content-type': "application/json",
         }
 ​
     response = requests.request("POST", url, data=payload, headers=headers, verify = False)
     resultCount = json.loads(response.text)['count']
     return math.ceil(int(resultCount)/100)
 ​
 def update_cvekb_database(num=1,pageSize=100):
     pageCount = get_page_num()
     #for i in range(1,pageCount+1):
     i = 1
     pageCount=524
     tmpCount = ThreadCount
     while i <= pageCount:
         tmpCount = ThreadCount if (pageCount - i) >= ThreadCount else pageCount - i
         print("i:{},pageCount-i:{},ThreadCount:{},PageCount:{}".format(i,pageCount-i,ThreadCount,pageCount))
         time.sleep(0.5)    
         threads = []
         print("===============================")
         for j in range(1,tmpCount + 1):
             print("更新第{}页".format(i+j-1))
             t = CVEScanThread(update_onepage_cvedb_database,(i+j-1,j,),str(j))
             threads.append(t)
         for t in threads:
             t.start()
         for t in threads:
             t.join()
             # update_onepage_cvedb_database(num=i)
         i = i + tmpCount
         conn = sqlite3.connect(DBFileName)
         for sql in insertSQL:
             conn.execute(sql)
         conn.commit()
         conn.close()
         if tmpCount != ThreadCount:
             break
         
         
 def check_POC_every_CVE(CVEName=""):
     #apiKey = ""
     #url = "https://exploits.shodan.io/api/search?query=" + CVEName + "&key=" + apiKey
     url = "https://exploits.shodan.io/?q=" + CVEName
     try:
         response = requests.request("GET",url=url,verify=False,timeout=10)
         #total = json.loads(response.text)
     except Exception as e:
         print("Error,{}".format(CVEName))
         print(e)
         return "Error"
     if "Total Results" not in response.text:
         return "False"
     else:
         return "True"
 def update_onepage_cvedb_database(num=1,pageSize=100):
     url = "https://portal.msrc.microsoft.com/api/security-guidance/en-us"
 ​
     payload = "{\"familyIds\":[],\"productIds\":[],\"severityIds\":[],\"impactIds\":[],\"pageNumber\":" + str(num) + ",\"pageSize\":" + str(pageSize) + ",\"includeCveNumber\":true,\"includeSeverity\":false,\"includeImpact\":false,\"orderBy\":\"publishedDate\",\"orderByMonthly\":\"releaseDate\",\"isDescending\":true,\"isDescendingMonthly\":true,\"queryText\":\"\",\"isSearch\":false,\"filterText\":\"\",\"fromPublishedDate\":\"01/01/1998\",\"toPublishedDate\":\"03/02/2020\"}"
     headers = {
         'origin': "https//portal.msrc.microsoft.com",
         'referer': "https//portal.msrc.microsoft.com/en-us/security-guidance",
         'accept-language': "zh-CN",
         'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299",
         'accept': "application/json, text/plain, */*",
         'accept-encoding': "gzip, deflate",
         'host': "portal.msrc.microsoft.com",
         'connection': "close",
         'cache-control': "no-cache",
         'content-type': "application/json",
         }
     try:
         response = requests.request("POST", url, data=payload, headers=headers, verify = False)
         resultList = json.loads(response.text)['details']
     except :
         print(response.text)
     conn = sqlite3.connect(DBFileName)
     create_sql = """Create Table IF NOT EXISTS {} (
         hash TEXT UNIQUE,
         name TEXT,
         KBName TEXT,
         CVEName TEXT,
         impact TEXT,
         hasPOC TEXT)""".format(TableName)
     conn.execute(create_sql)
     conn.commit()
     conn.close()
     for result in resultList:
         KBName = result['articleTitle1'] + ";" if (result['articleTitle1'] !=  None) and result['articleTitle1'].isdigit() else ""
         KBName += result['articleTitle2'] + ";" if (result['articleTitle2'] != None) and result['articleTitle2'].isdigit() else ""
         KBName += result['articleTitle3'] + ";" if (result['articleTitle3'] != None) and result['articleTitle3'].isdigit() else ""
         KBName += result['articleTitle4'] + ";" if (result['articleTitle4'] != None) and result['articleTitle4'].isdigit() else ""
         if KBName == "":
             continue
         h1 = hashlib.md5()
         metaStr = result['name'] + KBName + result['cveNumber'] + result['impact']
         h1.update(metaStr.encode('utf-8'))
         #hasPOC = check_POC_every_CVE(result['cveNumber'])
         # 收集到所有的KB后再搜索有没有公开的EXP
         hasPOC = ""
         sql = "INSERT OR IGNORE INTO "+TableName+" VALUES ('" + h1.hexdigest() + "','" + result['name'] + "','" + KBName + "','" + result['cveNumber'] + "','" + result['impact'] + "','" + hasPOC+"')"
         with lock:
             global insertSQL
             insertSQL.append(sql)
         # conn.execute(sql)
     # conn.commit()
     # conn.close()
     # pass
 ​
 def select_CVE(tmpList=[],windowsProductName="",windowsVersion=""):
     conn = sqlite3.connect(DBFileName)
     con = conn.cursor()
     intersectionList = []
     count = 0
     for i in tmpList:
         sql = 'select distinct(CVEName) from '+ TableName+' where (name like "'+ windowsProductName+'%'+ windowsVersion + '%") and ("'+ i +'" not in (select KBName from '+ TableName +' where name like "'+ windowsProductName+'%'+windowsVersion+'%")); '
         cveList = []
         for cve in con.execute(sql):
             cveList.append(cve[0])
         if count == 0:
             intersectionList = cveList.copy()
         count +=1
         intersectionList = list(set(intersectionList).intersection(set(cveList)))
     intersectionList.sort()
     for cve in intersectionList:
         sql = "select CVEName from {} where CVEName == '{}' and hasPOC == 'True'".format(TableName,cve)
         # print(sql)
         con.execute(sql)
         if len(con.fetchall()) != 0:
             print("{} has EXP".format(cve))
     # print(intersectionList)
 def update_hasPOC(key = "Empty"):
     conn = sqlite3.connect(DBFileName)
     con = conn.cursor()
     if key == "All":
         sql = "select distinct(CVEName) from {}".format(TableName)
     else:
         sql = "select distinct(CVEName) from {} where (hasPOC IS NULL) OR (hasPOC == '')".format(TableName)
 ​
     con.execute(sql)
     cveNameList = con.fetchall()
     i = 0
     count = 1
     while i < len(cveNameList):
         print("|=========={}============|".format(i))
         # tmpCount = ThreadCount if (len(cveNameList) - i) >= ThreadCount else len(cveNameList) - i
         # threads = []
         # for j in range(1,tmpCount+1):
         #     t = EXPScanThread(check_POC_every_CVE,(cveNameList[i+j][0],j,i+j,),str(j))
         #     threads.append(t)
         # for t in threads:
         #     t.start()
         # for t in threads:
         #     t.join()
         # j = 1
         # for t in threads:
         #     hasPOC = t.get_result()
         #     print(hasPOC)
         #     update_sql = "UPDATE "+TableName+" set hasPOC = '" + hasPOC + "' WHERE cveName == '" + cveNameList[i+j][0] +"';"
         #     conn.execute(update_sql)
         #     print("[+] update:{}".format(update_sql))
         #     j += 1
         # i=i+ThreadCount
         # conn.commit()
         hasPOC = check_POC_every_CVE(cveNameList[i][0])
         time.sleep(0.3)
         update_sql = "UPDATE "+TableName+" set hasPOC = '" + hasPOC + "' WHERE cveName == '" + cveNameList[i][0] +"';"
         conn.execute(update_sql)
         print(update_sql)
         count += 1
         i += 1
         if count == 10:
             conn.commit()
             print("[+]update")
             count = 1
     conn.commit()
     conn.close()
     print("Over")
 ​
 ​
 if __name__ == "__main__":
     banner = """
     ========CVE-EXP-Check===============
     |       author:JC0o0l               |
     |       wechat:JC_SecNotes          |
     |       version:1.0                 |
     =====================================
     """
     print(banner)
     if (not args.check_EXP ) and (not args.update_cve) and (not args.update_exp) and args.file is None:
         parser.print_help()
     if args.update_cve:
         update_cvekb_database()
     if args.update_exp:
         dbfile=Path(DBFileName)
         if dbfile.exists():
             update_hasPOC(key="Empty")
         else:
             print("请先使用-u 创建数据库")
             parser.print_help()
     if args.check_EXP:
         dbfile=Path(DBFileName)
         if not dbfile.exists():
             print("请先使用-u 创建数据库,之后使用 -U 更新是否有EXP")
             parser.print_help()
             exit()
         if args.file:
             with open(args.file,"r",encoding="utf-8") as f:
                 KBResult = json.load(f)
             windowsProductName = KBResult['basicInfo']['windowsProductName']
             windowsProductName = ((re.search("\w[\w|\s]+\d+[\s|$]",windowsProductName).group()).strip()).replace("Microsoft","").strip()
             windowsVersion = KBResult['basicInfo']['windowsVersion']
             print("系统信息如下:")
             print("{} {}".format(windowsProductName,windowsVersion))
             tmpKBList = KBResult['KBList']
             KBList = []
             for KB in tmpKBList:
                 KBList.append(KB.replace("KB",""))
             print("KB信息如下:")
             print(KBList)
             print("EXP信息如下:")
             select_CVE(tmpList=KBList,windowsProductName=windowsProductName,windowsVersion=windowsVersion)
         else:
             print("请输入.json文件")    
     

*本文作者:jerrybird,转载请注明来自FreeBuf.COM


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据结构

数据结构

严蔚敏、吴伟民 / 清华大学出版社 / 2007-3-1 / 30.0

《数据结构》(C语言版)是为“数据结构”课程编写的教材,也可作为学习数据结构及其算法的C程序设计的参数教材。 本书的前半部分从抽象数据类型的角度讨论各种基本类型的数据结构及其应用;后半部分主要讨论查找和排序的各种实现方法及其综合分析比较。其内容和章节编排1992年4月出版的《数据结构》(第二版)基本一致,但在本书中更突出了抽象数据类型的概念。全书采用类C语言作为数据结构和算法的描述语言。 ......一起来看看 《数据结构》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

SHA 加密
SHA 加密

SHA 加密工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具