内容简介:这一节打算爬取猫眼电影的 top 100 的电影信息,我们首先可以访问一下我们需要爬取的网站,看一下我们需要的信息所处的位置和结构如何看完以后我们的思路应该就比较清晰了,我们首先使用 requests 库请求单页内容,然后我们使用正则对我们需要的信息进行匹配,然后将我们需要的每一条信息保存成一个JSON 字符串,并将其存入文件当中,然后就是开启循环遍历十页的内容或者采用 Python 多线程的方式提高爬取速度
这一节打算爬取猫眼电影的 top 100 的电影信息,我们首先可以访问一下我们需要爬取的网站,看一下我们需要的信息所处的位置和结构如何
看完以后我们的思路应该就比较清晰了,我们首先使用 requests 库请求单页内容,然后我们使用正则对我们需要的信息进行匹配,然后将我们需要的每一条信息保存成一个JSON 字符串,并将其存入文件当中,然后就是开启循环遍历十页的内容或者采用 Python 多线程的方式提高爬取速度
2.代码实现
spider.py
import requests import json from requests.exceptions import RequestException import re from multiprocessing import Pool requests.packages.urllib3.disable_warnings() def get_one_page(url): try: headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', } res = requests.get(url,headers=headers,verify=False) if res.status_code == 200: return res.text return None except RequestException: return None def parse_one_page(html): pattern = re.compile('<dd>.*?board-index.*?(\d+)</i>.*?data-src="(.*?)".*?alt="(\w+)".*?"star">' '(.*?)</p>.*?"releasetime">(.*?)</p>.*?integer">(.*?)</i>.*?fraction">(\d)</i>',re.S) items = re.findall(pattern,html) for item in items: #这里使用 yield 将该函数变成了一个可迭代对象并且每次能返回自己定义好格式的数据 yield { 'index': item[0], 'image': item[1], 'name': item[2], 'actor':item[3].strip()[3:], 'time': item[4].strip()[5:], 'score': item[5]+item[6] } def write_to_file(content): with open('result.txt','a',encoding='utf-8') as f: f.write(json.dumps(content,ensure_ascii=False) + '\n' ) def main(offset): url = "http://maoyan.com/board/4?offset=" + str(offset) html = get_one_page(url) for item in parse_one_page(html): write_to_file(item) if __name__ == '__main__': pool = Pool() pool.map(main,[i*10 for i in range(10)])
3.运行效果
0X02 模拟 Ajax 请求抓取今日头条街拍美图
1.分析网页确定思路
首先我们打开头条街拍的页面,我们发现我们看到的详细页链接直接在源代码中并不能找到,于是我们就需要去查看我们的 ajax 请求,看看是不是通过 ajax 加载的,我们可以打开浏览器控制台,我们过滤 XHR 请求有了一些发现,如下图:
在 xhr 请求中 offset 为 0 的部分,页面中的 data 为 0 的 数据部分清楚地地显示了我们想要查找的详细页的数据,然后随着我们滚动条的下拉,页面会不断发起 xhr 请求,offset 会随之不断的增大,每次增大的数目为 10 ,实际上是通过 ajax 去请求索引页,每次返回的 json 结果中有10条详细页的数据,这样我们就能不断在页面中获取到街拍新闻的信息。
有了街拍新闻,自然我们还要进入新闻中获取街拍的美图,我们看一下新闻内部的图片是怎么获取的,如下图所示:
很明显,街拍真正的图片的 URL 是通过网页中的 js 变量的方式获取的,我们考虑使用 正则 来获取,另外,页面第一个 title 标签里面有该详细页面的名称,我们可以使用 BeautifulSoup 来提取出来
思路梳理:
(1)使用 requests 库去去请求网站,并获取索引网页(ajax 请求的 url)返回的 json 代码
(2)从索引网页中提取出详细页面的 URL,并进一步抓取详细页的信息
(3)通过正则匹配详细页中的图片链接,并将其下载到本地,并将页面信息和图片的 URL 保存到本地的 MongoDB
(4)对多个索引页进行循环抓取,并开启多线程的方式提高效率
2.代码实现
config.py
MONGO_URL = 'localhost' MONGO_DB = 'toutiao' MONGO_TABLE = 'toutiao' GROUP_STATR = 0 GROUP_END = 5 KEYWORD = '街拍' IMAGE_DIR = 'DOWNLOADED'
spider.py
import requests import re from bs4 import BeautifulSoup from urllib.parse import urlencode import json from requests.exceptions import RequestException from config import * import pymongo import os from hashlib import md5 from multiprocessing import Pool # 声明 mongodb 数据库对象 client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] def get_page_index(offset,keyword): data = { 'aid': 24, 'app_name': 'web_search', 'offset': offset, 'format': 'json', 'keyword': keyword, 'autoload': 'true', 'count': 20, 'en_qc': 1, 'cur_tab': 1, 'from': 'search_tab', 'pd': 'synthesis', 'timestamp': 1556970196243, } headers = { 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Cookie':'...' } url = 'https://www.toutiao.com/api/search/content/?' + urlencode(data) try: res = requests.get(url,headers=headers) res.encoding = 'utf-8' if res.status_code == 200: return res.text return None except RequestException: print('requests index page error') return None def parse_page_index(html): data = json.loads(html) if data and 'data' in data.keys(): for item in data.get('data'): yield item.get('article_url') def get_page_detail(url): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Cookie': '...' } try: res = requests.get(url, headers=headers) res.encoding = 'utf-8' if res.status_code == 200: return res.text return None except RequestException: #print('requests detail page error',url) return None def parse_page_detail(html,url): soup = BeautifulSoup(html,'html.parser') title = soup.select('title')[0].get_text() pattern = re.compile("articleInfo: {.*?content: '(.*?);',",re.S) images = re.search(pattern,html) if images: images_pattern = re.compile("<img src="(.*?)" img_width="") res = re.findall(images_pattern,images.group(1)) for image_url in res: dir_name = re.sub(r'[\\\\/:*?|"<> ]','',title) download_image(image_url,dir_name[:10]) return { 'title': title, 'url': url, 'images': res, } def save_to_mongo(result): if db[MONGO_TABLE].insert(result): print("成功存储到 mongodb 数据库",result) return True return False def download_image(url,dir_name): print('正在下载:',url) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Cookie': '...' } try: res = requests.get(url, headers=headers) if res.status_code == 200: # 存储二进制数据的时候使用content save_image(dir_name,res.content) return None except RequestException: print('requests image error',url) return None def save_image(dir_name,content): if not os.path.exists(IMAGE_DIR + '/' + dir_name): os.makedirs(IMAGE_DIR + '/' + dir_name) file_path = '{0}\\{1}\\{2}\\{3}.{4}'.format(os.getcwd(),IMAGE_DIR,dir_name,md5(content).hexdigest(),'jpg') if not os.path.exists(file_path): with open(file_path,'wb') as f: f.write(content) def main(offset): html = get_page_index(offset,KEYWORD) #print(html) for url in parse_page_index(html): #print(url) html = get_page_detail(url) if html: result = parse_page_detail(html,url) if result: #print(result) save_to_mongo(result) if __name__ == '__main__': groups = [x*20 for x in range(GROUP_STATR,GROUP_END + 1)] pool = Pool() pool.map(main,groups)
3.运行效果
0X03 使用Selenium模拟浏览器抓取淘宝商品美食信息
众所周知,淘宝的网页是非常复杂的,我们按照上面的模拟 Ajax 的请求去获取 json 数据并且解析的方式已经不那么好用了,于是我们要祭出我们的终极杀器—-Selenium ,这个库可以调用浏览器驱动或者是 phantomjs 来模拟浏览器的请求,有了它我们就可以通过脚本去驱动浏览器,这样哪些动态加载的数据就不用我们自己去获取了,非常方便。
1.分析网页确定思路
打开淘宝,输入“美食”,回车
我们想要获取网页上加载的图片,但是我们找到页面的原始请求的页面的结果,我们会发现当我们刚一翻就已经出现页尾的代码了,实际上页面的主体还不知道在哪,我尝试翻找了一下 XHR 请求发现依然不是很明显,这种情况下为了减轻我们的抓取负担,我们可以使用 selenium 配合 Chromedriver 去获取加载好的完整页面,然后我们再使用正则去抓取图片,这样就非常轻松容易了。
思路梳理:
(1)利用 selenium 库配合chromedriver 请求淘宝并输入“美食”搜索参数,获取商品列表
(2)获取页码,并模拟鼠标点击操作获取后面页码的商品信息
(3)使用 PyQuery 分析源码,得到商品的详细信息
(4)将商品信息存储到 MongoDB 数据库中
2.代码实现
config.py
MONGO_URL = 'localhost' MONGO_DB = 'taobao' MONGO_TABLE = 'product'
spider.py
from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import re from pyquery import PyQuery as pq from config import * import pymongo client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] browser = webdriver.Chrome() wait = WebDriverWait(browser, 100) def search(): try: browser.get('https://www.taobao.com/') # 判断所需的元素是否加载成功(wait until 中会存在判断条件,因此常常用作判断) input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#q")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#J_TSearchForm > div.search-button > button")) ) #输入+点击 input.send_keys("美食") submit.click() #查看页数是否加载成功 total = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.total")) ) get_products() return total.text except TimeoutException: return search() def next_page(page_number): try: input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > input")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > span.btn.J_Submit")) ) input.clear() input.send_keys(page_number) submit.click() wait.until( EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > ul > li.item.active > span"),str(page_number)) ) get_products() except TimeoutException: next_page(page_number) def get_products(): wait.until( # 这里的 CSS 是手写的,因为从控制台复制的话只能得到一个 item EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-itemlist .items .item")) ) html = browser.page_source doc = pq(html) items = doc('#mainsrp-itemlist .items .item').items() for item in items: product = { 'title': item.find('.title').text(), 'image': item.find('.pic .img').attr('src'), 'price': item.find('.price').text(), 'deal': item.find('.deal-cnt').text()[:-3], 'shop': item.find('.shop').text(), 'location':item.find('.location').text(), } print(product) save_to_mongo(product) def save_to_mongo(result): try: if db[MONGO_TABLE].insert(result): print("存储到 MongoDB 成功",result) except Exception: print("存储到 MongoDB 失败") def main(): try: total = int(re.compile('(\d+)').search(search()).group(1)) for i in range(2,total + 1): next_page(i) except Exception: print('出错了') finally: browser.close() if __name__ == '__main__': main()
3.运行效果
4.存在问题
事实上这个脚本并不能完全实现自动化,因为由我们 selenium + chromdriver 打开的淘宝在搜索的时候回弹出登录提示框,我们还需要手动去登录一下才能进行下面的爬取工作,听起来似乎不是很要紧,现在登陆一下只要扫描以下二维码就可以了,但是这样我们就没法使用 chrome headless 模式进行静默访问,很是不爽,于是我们还需要对这段代码进行改进。
5.尝试解决
对于 headless 问题,我的解决思路是这样的,因为我们想要用二维码登录,那样的话我们必须要求出现界面,但是这个界面的作用仅仅是一个登录,于是我考虑使用两个 driver ,一个专门用来登录,然后将登录后的 cookie 保存起来,存储在文件中,另一个负责爬取数据的 driver 使用 Headless 模式,然后循环读取本地存储好的 cookie 访问网站,这样就很优雅的解决了我们的问题,下面是我改进后的代码:
spiser.py
import json from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC import re from pyquery import PyQuery as pq from config import * import pymongo from selenium.webdriver.chrome.options import Options # 数据库配置信息 client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] # 全局设置 options = Options() options.add_argument("--headless") browser = webdriver.Chrome(options=options) wait = WebDriverWait(browser, 20) def get_cookie_to_save(): try: driver = webdriver.Chrome() driver.get('https://login.taobao.com/member/login.jhtml') # 判断是否已经成功登陆 # 这里需要重新获取页面,因为页面跳转了 driver 无法识别 source = driver.page_source doc = pq(source) if(doc('#J_SiteNavMytaobao > div.site-nav-menu-hd > a > span') == u'我的淘宝'): dictCookies = driver.get_cookies() jsonCookies = json.dumps(dictCookies) # 登录完成后,将cookies保存到本地文件 with open("cookies_tao.json","w") as f: f.write(jsonCookies) except Exception: print('error') finally: driver.close() def get_the_cookie(): browser.get('https://www.taobao.com/') # 删除本地的所有cookie browser.delete_all_cookies() # 读取登录时储存到本地的cookie with open("cookies_tao.json", "r", encoding="utf8") as f: ListCookies = json.loads(f.read()) # 循环遍历添加 cookie for cookie in ListCookies: #print(cookie) browser.add_cookie(cookie) def search(): try: browser.get('https://www.taobao.com/') # 判断所需的元素是否加载成功(wait until 中会存在判断条件,因此常常用作判断) input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#q")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#J_TSearchForm > div.search-button > button")) ) #输入+点击 input.send_keys("美食") submit.click() #查看页数是否加载成功 total = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.total")) ) get_products() return total.text except TimeoutException: return search() def next_page(page_number): try: input = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > input")) ) submit = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > div.form > span.btn.J_Submit")) ) input.clear() input.send_keys(page_number) submit.click() wait.until( EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#mainsrp-pager > div > div > div > ul > li.item.active > span"),str(page_number)) ) get_products() except TimeoutException: next_page(page_number) def get_products(): wait.until( # 这里的 CSS 是手写的,因为从控制台复制的话只能得到一个 item EC.presence_of_element_located((By.CSS_SELECTOR, "#mainsrp-itemlist .items .item")) ) html = browser.page_source doc = pq(html) items = doc('#mainsrp-itemlist .items .item').items() for item in items: product = { 'title': item.find('.title').text(), 'image': item.find('.pic .img').attr('src'), 'price': item.find('.price').text(), 'deal': item.find('.deal-cnt').text()[:-3], 'shop': item.find('.shop').text(), 'location':item.find('.location').text(), } print(product) save_to_mongo(product) def save_to_mongo(result): try: if db[MONGO_TABLE].insert(result): print("存储到 MongoDB 成功",result) except Exception: print("存储到 MongoDB 失败") def main(): try: get_cookie_to_save() get_the_cookie() total = int(re.compile('(\d+)').search(search()).group(1)) for i in range(2,total + 1): next_page(i) except Exception: print('出错了') finally: browser.close() if __name__ == '__main__': main()
0X04 Flask + Redis 维护代理池
1.为什么需要维护代理池
我们知道很多网站都是由反爬虫的机制的,于是我们就需要对我们的 ip 进行伪装,也是因为这个原因,网上也有很多的免费代理 IP 可以使用,但是这些 ip 质量参差不齐,于是我们就需要对其进行进一步的过滤,所以我们需要自己维护一个自己的好用的代理池,这就是我们这一节的目的,我们使用的 Redis 就是用来存储我们的代理 ip 信息的,flask 主要为我们提供一个方便的调用接口
2.代理池的基本要求
(1)多占抓取,异步检测
(2)定时筛选持续更新
(3)提供接口,易于获取
3.代理池的架构
4.代码实现
注:
这里的代码实现来源于以下项目地址: https://github.com/Python3WebSpider/ProxyPool
(1)入口文件 run.py
import... sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') def main(): try: # 这里调用了调度器来运行起来整个代理池框架 s = Scheduler() s.run() except: main() if __name__ == '__main__': main()
(2)调度中心 scheduler.py
import... class Scheduler(): def schedule_tester(self, cycle=TESTER_CYCLE): """ 定时测试代理 """ tester = Tester() while True: print('测试器开始运行') tester.run() time.sleep(cycle) def schedule_getter(self, cycle=GETTER_CYCLE): """ 定时获取代理 """ getter = Getter() while True: print('开始抓取代理') getter.run() time.sleep(cycle) def schedule_api(self): """ 开启API """ app.run(API_HOST, API_PORT) def run(self): print('代理池开始运行') #使用多进程对三个重要函数进行调用 if TESTER_ENABLED: #调用tester 测试 ip 的可用性 tester_process = Process(target=self.schedule_tester) tester_process.start() if GETTER_ENABLED: #调用 getter 函数从网站中爬取代理 ip getter_process = Process(target=self.schedule_getter) getter_process.start() if API_ENABLED: #调用 api 函数,提供对外的接口并开启对数据库的接口 api_process = Process(target=self.schedule_api) api_process.start()
(3)代理ip获取
getter.py
import... class Getter(): def __init__(self): self.redis = RedisClient() self.crawler = Crawler() def is_over_threshold(self): """ 判断是否达到了代理池限制 """ if self.redis.count() >= POOL_UPPER_THRESHOLD: return True else: return False def run(self): print('获取器开始执行') if not self.is_over_threshold(): #通过我们元类设置的属性(方法列表和方法个数)循环调用不同的方法获取代理 ip for callback_label in range(self.crawler.__CrawlFuncCount__): callback = self.crawler.__CrawlFunc__[callback_label] # 获取代理 proxies = self.crawler.get_proxies(callback) sys.stdout.flush() for proxy in proxies: self.redis.add(proxy)
crawler.py
#定义一个元类来拦截类的创建,给类添加了一个__CrawlFunc__属性记录所有的爬虫方法名 #__CrawlFuncCount__属性记录已经设置好的爬虫方法 class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs) class Crawler(object, metaclass=ProxyMetaclass): # get_proxy 根据传入的方法名称,再通eval() 去执行从而对外统一了调用的接口 def get_proxies(self, callback): proxies = [] for proxy in eval("self.{}()".format(callback)): print('成功获取到代理', proxy) proxies.append(proxy) return proxies def crawl_daili66(self, page_count=4): """ 获取代理66 :param page_count: 页码 :return: 代理 """ start_url = 'http://www.66ip.cn/{}.html' urls = [start_url.format(page) for page in range(1, page_count + 1)] for url in urls: print('Crawling', url) html = get_page(url) if html: doc = pq(html) trs = doc('.containerbox table tr:gt(0)').items() for tr in trs: ip = tr.find('td:nth-child(1)').text() port = tr.find('td:nth-child(2)').text() yield ':'.join([ip, port]) def crawl_ip3366(self): ... yield result.replace(' ', '') def crawl_kuaidaili(self): ...
关键技术解释:
虽然我在注释中大概把关键的点都说了一下,但是这个技术非常重要,于是我还想再写一下
(1)解决很多爬虫配合运行的问题
因为我们的获取代理 ip 的网站有很多,这样我们就需要些很多的爬虫,那么这些爬虫应该怎样被我们调度就成了一个比较重要的问题,我们最好的想法就是每次调用一个网站,每次从这个网站中返回一个代理 ip 存入数据库,那我们第一个想到的应该就是 用 yield 作为每个爬虫的返回值的形式,这样不仅能实现按照我们自定义的统一格式返回的目的,而且还能完美实现我们每次返回一个然后下一次还能接着继续返回的目的
除此之外,想要配合运行我们还需要一个统一的函数调用接口,这个的实现方法是使用的 callback 回调函数作为我们函数调用的参数,然后传入我们的函数名,并通过 eval() 去执行我们的函数
(2)解决动态获取方法名和方法个数问题
这个问题就比较神奇了,也是我们需要学习的重点,这里使用的是 元类 来劫持类的构建并且为其添加对应的属性的方法来解决这个问题,Python 中一切皆对象,元类简单的说就是创建类的对象,我们还是重点再看一下代码
class ProxyMetaclass(type): def __new__(cls, name, bases, attrs): count = 0 attrs['__CrawlFunc__'] = [] for k, v in attrs.items(): if 'crawl_' in k: attrs['__CrawlFunc__'].append(k) count += 1 attrs['__CrawlFuncCount__'] = count return type.__new__(cls, name, bases, attrs)
解释
__new__
是在 __init__
之前被调用的特殊方法,它用来创建对象并返回创建后的对象,各个参数说明如下:
# cls: 当前准备创建的类 # name: 类的名字 # bases: 类的父类集合 # attrs: 类的属性和方法,是一个字典。
attrs 可以获取到类的所有属性和方法,于是我们只要给我们想要的方法一个统一的命名规范就可以了,在这里的命名规范是方法名前都有 crawl_ 这个字符串,这样我们就能快速对其进行收集并且计数
(4)测试模块 test.py
import... class Tester(object): def __init__(self): self.redis = RedisClient() #async 表示使用协程的方式运行该函数 async def test_single_proxy(self, proxy): """ 测试单个代理 :param proxy: :return: """ #定义连接器并取消ssl安全验证 conn = aiohttp.TCPConnector(verify_ssl=False) #首先我们创建一个session对象 async with aiohttp.ClientSession(connector=conn) as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') real_proxy = 'http://' + proxy print('正在测试', proxy) #使用创建的 session 对象请求具体的网站 async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response: if response.status in VALID_STATUS_CODES: self.redis.max(proxy) print('代理可用', proxy) else: self.redis.decrease(proxy) print('请求响应码不合法 ', response.status, 'IP', proxy) except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): self.redis.decrease(proxy) print('代理请求失败', proxy) def run(self): """ 测试主函数 :return: """ print('测试器开始运行') try: count = self.redis.count() print('当前剩余', count, '个代理') for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) print('正在测试第', start + 1, '-', stop, '个代理') #批量获取代理 test_proxies = self.redis.batch(start, stop) #asyncio.get_event_loop方法可以创建一个事件循环 #我们可以在事件循环中注册协程对象(async 修饰的函数) loop = asyncio.get_event_loop() #将多个任务封装到一起并发执行 tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] #run_until_complete将协程注册到事件循环,并启动事件循环。 loop.run_until_complete(asyncio.wait(tasks)) sys.stdout.flush() time.sleep(5) except Exception as e: print('测试器发生错误', e.args)
解释:
这里用到的比较关键的技术是异步网络请求,因为我们的 requests 库是同步的,请求一个必须等到结果返回才能请求另一个,这不是我们想要的,于是异步网络请求模块 aiohttp 就出现了,这是在 python3.5 以后新添加的内置功能(本质使用的是 Python 的协程)
对于类似爬虫这种延时的IO操作,协程是个大利器,优点很多,他可以在一个阻塞发生时,挂起当前程序,跑去执行其他程序,把事件注册到循环中,实现多程序并发,据说超越了10k限制,不过我没有试验过极限。
现在讲一讲协程的简单的用法,当你爬一个网站,有100个网页,正常是请求一次,回来一次,这样效率很低,但协程可以一次发起100个请求(其实也是一个一个发),不同的是协程不会死等返回,而是发一个请求,挂起,再发一个再挂起,发起100个,挂起100个,然后同时等待100个返回,效率提升了100倍。可以理解为同时做100件事,相对于多线程,做到了由自己调度而不是交给CPU,程序流程可控,节约资源,效率极大提升。
具体的使用方法,我在上面代码中的注释部分已经写了,下面对关键步骤再简单梳理一下:
1.定义连接器并取消ssl安全验证
conn = aiohttp.TCPConnector(verify_ssl=False)
2.创建一个session对象
async with aiohttp.ClientSession(connector=conn) as session:
3.使用创建的 session 对象请求具体的网站
async with session.get(TEST_URL, proxy=real_proxy, timeout=15, allow_redirects=False) as response:
4.asyncio.get_event_loop方法创建一个事件循环
loop = asyncio.get_event_loop()
5.将多个任务封装到一起
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
6.run_until_complete将协程注册到事件循环,并启动事件循环,多任务并发执行
loop.run_until_complete(asyncio.wait(tasks))
(5)对外接口 api.py
import... __all__ = ['app'] app = Flask(__name__) def get_conn(): if not hasattr(g, 'redis'): g.redis = RedisClient() return g.redis @app.route('/') def index(): return '<h2>Welcome to Proxy Pool System</h2>' #对外接口直接调用数据库返回随机值 @app.route('/random') def get_proxy(): """ Get a proxy :return: 随机代理 """ conn = get_conn() return conn.random() #对外接口调用数据库返回代理个数 @app.route('/count') def get_counts(): """ Get the count of proxies :return: 代理池总量 """ conn = get_conn() return str(conn.count()) if __name__ == '__main__': app.run()
5.代理池使用
import... dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, dir) #先用 requests 库请求一下api 获取代理ip def get_proxy(): r = requests.get('http://127.0.0.1:5000/get') proxy = BeautifulSoup(r.text, "lxml").get_text() return proxy def crawl(url, proxy): proxies = {'http': proxy} r = requests.get(url, proxies=proxies) return r.text def main(): proxy = get_proxy() html = crawl('http://docs.jinkan.org/docs/flask/', proxy) print(html) if __name__ == '__main__': main()
0X05 使用代理处理反爬抓取微信文章
1.分析网页确定思路
我们这次准备爬取搜狗的微信搜索页面的结果,以风景为例:
可以看到这和我们之前爬取过的案例几乎类似,没什么新意,但是这里有一个比较神奇的地方就是10页以后的内容需要扫码登录微信才能查看
另外,在请求次数过多的时候还会出现封禁 ip 的情况,对应我们页面的状态码就是 出现 302 跳转
思路梳理:
(1)requests 请求目标站点,得到索引页的源码,返回结果
(2)如果遇到 302 则说明 ip 被封,切换代理后重试
(3)请求详情页,分析得到文章标题和内容
(4)将结构化数据保存到 MongoDB 数据库
注意点:
我们直接看浏览器的地址栏我们能看到很多的参数,但是实际上很大一部分是不需要的,那么为了我们的写代码的方便,我们尽量对参数进行简化,只留下最核心的参数
2.代码实现
config.py
# 数据库配置 MONGO_URL = 'localhost' MONGO_DB = 'weixin' MONGO_TABLE = 'articles' #参数设置 KEYWORD = '风景' MAX_COUNT = 5 BASE_URL = 'https://weixin.sogou.com/weixin?' #代理设置 APP_KEY = "" IP_PORT = 'transfer.mogumiao.com:9001' PROXIES = {"http": "http://" + IP_PORT, "https": "https://" + IP_PORT} HEADERS = { 'Cookie':'', 'Host':'weixin.sogou.com', 'Upgrade-Insecure-Requests':'1', 'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36', 'Proxy-Authorization': 'Basic '+ APP_KEY, 'Referer':'https://weixin.sogou.com/weixin' }
spider.py
from urllib.parse import urlencode import requests from pyquery import PyQuery as pq import re import pymongo from config import * #数据库连接对象 client = pymongo.MongoClient(MONGO_URL) db = client[MONGO_DB] def get_html(url,count=1): global MAX_COUNT if count >= MAX_COUNT: print('Tried too many counts') return None try: res = requests.get(url,allow_redirects=False,headers=HEADERS,verify=False,proxies=PROXIES,timeout = 30) print(res.status_code) if res.status_code == 200: return res.text if res.status_code == 302: return get_html(url) except ConnectionError as e: print('Error Occurred',e.args) count += 1 return get_html(url,count) def get_index(keyword,page): data = { 'query':keyword, 'type':2, 'page':page, } queries = urlencode(data) url = BASE_URL + queries html = get_html(url) return html def parse_index(html): doc = pq(html) items = doc('.news-box .news-list li .txt-box h3 a').items() for item in items: yield item.attr('data-share') def get_detail(url): try: res = requests.get(url) if res.status_code == 200: return res.text return None except ConnectionError: return None def parse_detail(html): try: #print(html) doc = pq(html) title = doc('.rich_media_title').text() #date 是使用 js 变量动态加载的,我们需要使用正则匹配 js 变量 date = re.search('var publish_time = "(.*?)"',html) if date: date = date.group(1) date = None nickname = doc('#js_name').text() wechat = doc('#js_profile_qrcode > div > p:nth-child(3) > span').text() return { 'title':title, 'date':date, 'nickname ':nickname , 'wechat':wechat, } except ConnectionError: return None def save_to_mongo(data): #这里使用更新的方法,如果标题重复就不在重新插入直接更新 if db[MONGO_TABLE].update({'title':data['title']},{'$set':data},True): print('Save to MongoDB',data['title']) else: print('Save to MongoDB Failed',data['title']) def main(): for page in range(1,101): html = get_index(KEYWORD,page) if html: urls = parse_index(html) for url in urls: html = get_detail(url) if html: article_data = parse_detail(html) save_to_mongo(article_data) if __name__ == '__main__': main()
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。