Python实现SQL注入检测插件

栏目: Python · 发布时间: 5年前

内容简介:首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用Python的请安装这些库实验环境是Linux,创建一个

扫描器需要实现的功能思维导图

Python实现 <a href='https://www.codercto.com/topics/18630.html'>SQL</a> 注入检测插件

爬虫编写思路

首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用 Pythonset() 就可以解决,大概流程是:

  1. 输入URL
  2. 下载解析出URL
  3. URL去重,判断是否为本站
  4. 加入到待爬列表
  5. 重复循环

SQL判断思路

  • 通过在URL后面加上 AND %d=%d 或者 OR NOT (%d>%d)
  • %d 后面的数字是随机可变的
  • 然后搜索网页中特殊关键词,比如:
MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...
  • 通过这些关键词就可以判断出所用的数据库
  • 还需要判断一下waf之类的东西,有这种东西就直接停止。简单的方法就是用特定的URL访问,如果出现了像 IP bannedfierwall 之类的关键词,可以判断出是 waf 。具体的正则表达式是 (?i)(\A|\b)IP\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)

开发准备

请安装这些库

pip install requests
pip install beautifulsoup4

实验环境是Linux,创建一个 Code 目录,在其中创建一个 work 文件夹,将其作为工作目录

目录结构

/w8ay.py  // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script   // 插件存放
/exp      // exp和poc存放

步骤

SQL检测脚本编写

DBMS_ERRORS = {
    'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
    "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
    "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
    "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
    "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
    "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
    "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
    "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}

通过正则表达式就可以判断出是哪个数据库了

for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
    if (re.search(regex,_content)):
        return True

下面是我们测试语句的 payload

BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")

用报错语句返回正确的内容和错误的内容进行对比

for test_payload in BOOLEAN_TESTS:
    # Right Page
    RANDINT = random.randint(1, 255)
    _url = url + test_payload % (RANDINT, RANDINT)
    content["true"] = Downloader.get(_url)
    _url = url + test_payload % (RANDINT, RANDINT + 1)
    content["false"] = Downloader.get(_url)
    if content["origin"] == content["true"] != content["false"]:
        return "sql found: %" % url

这句

content["origin"] == content["true"] != content["false"]

意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞

完整代码:

import re, random
from lib.core import Download
def sqlcheck(url):
    if (not url.find("?")): # Pseudo-static page
        return false;
    Downloader = Download.Downloader()
    BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
    DBMS_ERRORS = {
        # regular expressions used for DBMS recognition based on error message response
        "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
        "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
        "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
        "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
        "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
        "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
        "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
        "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
    }
    _url = url + "%29%28%22%27"
    _content = Downloader.get(_url)
    for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
        if (re.search(regex,_content)):
            return True
    content = {}
    content['origin'] = Downloader.get(_url)
    for test_payload in BOOLEAN_TESTS:
        # Right Page
        RANDINT = random.randint(1, 255)
        _url = url + test_payload % (RANDINT, RANDINT)
        content["true"] = Downloader.get(_url)
        _url = url + test_payload % (RANDINT, RANDINT + 1)
        content["false"] = Downloader.get(_url)
        if content["origin"] == content["true"] != content["false"]:
            return "sql found: %" % url

将这个文件命名为 sqlcheck.py ,放在 /script 目录中。代码的第4行作用是查找URL是否包含 ? ,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉

爬虫的编写

爬虫的思路上面讲过了,先完成URL的管理,我们单独将它作为一个类,文件保存在 /lib/core/UrlManager.py

#-*- coding:utf-8 -*-

class UrlManager(object):
    def __init__(self):
        self.new_urls = set()
        self.old_urls = set()
        
    def add_new_url(self, url):
        if url is None:
            return
        if url not in self.new_urls and url not in self.old_urls:
            self.new_urls.add(url)
      
    def add_new_urls(self, urls):
        if urls is None or len(urls) == 0:
            return
        for url in urls:
            self.add_new_url(url)
        
    def has_new_url(self):
        return len(self.new_urls) != 0
     
    def get_new_url(self):
        new_url = self.new_urls.pop()
        self.old_urls.add(new_url)
        return new_url

为了方便,我们也将下载功能单独作为一个类使用,文件保存在 lib/core/Downloader.py

#-*- coding:utf-8 -*-
import requests

class Downloader(object):
    def get(self, url):
        r = requests.get(url, timeout = 10)
        if r.status_code != 200:
            return None
        _str = r.text
        return _str
    
    def post(self, url, data):
        r = requests.post(url, data)
        _str = r.text
        return _str
    
    def download(self, url, htmls):
        if url is None:
            return None
        _str = {}
        _str["url"] = url
        try:
            r = requests.get(url, timeout = 10)
            if r.status_code != 200:
                return None
            _str["html"] = r.text
        except Exception as e:
            return None
        htmls.append(_str)

特别说明,因为我们要写的爬虫是多线程的,所以类中有个 download 方法是专门为多线程下载专用的

lib/core/Spider.py 中编写爬虫

#-*- coding:utf-8 -*-

from lib.core import Downloader, UrlManager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import BeautifulSoup

class SpiderMain(object):
    def __init__(self, root, threadNum):
        self.urls = UrlManager.UrlManager()
        self.download = Downloader.Downloader()
        self.root = root
        self.threadNum = threadNum
    
    def _judge(self, domain, url):
        if (url.find(domain) != -1):
            return True
        return False
    
    def _parse(self, page_url, content):
        if content is None:
            return
        soup = BeautifulSoup(content, 'html.parser')
        _news = self._get_new_urls(page_url, soup)
        return _news
        
    def _get_new_urls(self, page_url, soup):
        new_urls = set()
        links = soup.find_all('a')
        for link in links:
            new_url = link.get('href')
            new_full_url = urljoin(page_url, new_url)
            if (self._judge(self.root, new_full_url)):
                new_urls.add(new_full_url)
        return new_urls
        
    def craw(self):
        self.urls.add_new_url(self.root)
        while self.urls.has_new_url():
            _content = []
            th = []
            for i in list(range(self.threadNum)):
                if self.urls.has_new_url() is False:
                    break
                new_url = self.urls.get_new_url()
                
                ## sql check
                try:
                    if (sqlcheck.sqlcheck(new_url)):
                        print("url:%s sqlcheck is valueable" % new_url)
                except:
                    pass
                        
                print("craw:" + new_url)
                t = threading.Thread(target = self.download.download, args = (new_url, _content))
                t.start()
                th.append(t)
            for t in th:
                t.join()
            for _str in _content:
                if _str is None:
                    continue
                new_urls = self._parse(new_url, _str["html"])
                self.urls.add_new_urls(new_urls)

爬虫通过调用 craw() 方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用 _parse 方法调用 BeautifulSoup 进行解析,之后将解析出的URL列表丢入URL管理器,这样循环,最后只要爬完了网页,爬虫就会停止

threading 库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个url进行下载,然后线程会阻塞,阻塞完毕后线程放行

爬虫和SQL检查的结合

lib/core/Spider.py 文件引用一下 from script import sqlcheck ,在 craw() 方法中,取出新的URL地方调用一下

##sql check
try:
    if(sqlcheck.sqlcheck(new_url)):
        print("url:%s sqlcheck is valueable"%new_url)
except:
    pass

try 检测可能出现的异常,绕过它,在文件 w8ay.py 中进行测试

#-*- coding:utf-8 -*-
'''
Name: w8ayScan
Author: mathor
Copyright (c) 2019
'''
import sys
from lib.core.Spider import SpiderMain
def main():
    root = "https://wmathor.com"
    threadNum = 50
    w8 = SpiderMain(root, threadNum)
    w8.craw()
 
if __name__ == "__main__":
    main()

很重要的一点!为了使得 libscript 文件夹中的 .py 文件可以可以被认作是模块,请在 liblib/corescript 文件夹中创建 __init__.py 文件,文件中什么都不需要写


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

你凭什么做好互联网

你凭什么做好互联网

曹政 / 中国友谊出版公司 / 2016-12 / 42.00元

为什么有人可以预见商机、超越景气,在不确定环境下表现更出色? 在规则之外,做好互联网,还有哪些关键秘诀? 当环境不给机会,你靠什么翻身? 本书为“互联网百晓生”曹政20多年互联网经验的总结,以严谨的逻辑思维分析个人与企业在互联网发展中的一些错误思想及做法,并给出正确解法。 从技术到商业如何实现,每个发展阶段需要匹配哪些能力、分解哪些目标、落实哪些策略都一一点出,并在......一起来看看 《你凭什么做好互联网》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具