首先分析网站图片加载流程,taeri__taeri 应该有人认识这个网红。ins照片一次只加载了一定数量的照片,往下翻又会加载,毫无疑问看 xhr
在预览栏里可以看到json数据, display_url 就是照片的链接,只要获取到这个就行了
回到 headers 看看请求用了哪些参数;就两个, quer_hash 和 variables
variables 是一个json,里面有 id、first、after 这三项;为了不麻烦。。我直接说这三个是啥玩意儿,有兴趣的可以自己分析
id:user id 即用户id first:这次请求加载照片数量 after:end cursor 这个参数是为了判断上一页的,没有这个就一直加载的第一页,而本页会带有一个end cursor参数来进行下一页请求
import json import multiprocessing import sys from urllib.parse import urljoin import aiohttp import asyncio import os import re from pathlib import Path import requests 复制代码
def __init__(self, username, maxtasks=200): self.username = username self.maxtasks = maxtasks # 最大任务数 self.queue = asyncio.Queue(maxsize=maxtasks * 2) # 配置代理,没有科学上网没法访问ins os.environ['http_proxy'] = PROXY os.environ['https_proxy'] = PROXY self.session = aiohttp.ClientSession(trust_env=True, headers=HEADERS) 复制代码
首先获取user id:
async def get_shared_data(self): """ 获取 shared data :return: """ try: async with self.session.get(ROOT_URL + self.username) as resp: html = await resp.text() if html is not None and '_sharedData' in html: shared_data = html.split("window._sharedData = ")[1].split( ";</script>")[0] if not shared_data: # 没有shared data可以直接终止程序了 print('!!!!!!!') exit(1) return json.loads(shared_data) except Exception: pass async def init(self): """ 初始化必要参数 :return: """ user = (await self.get_shared_data())['entry_data']['ProfilePage'][0]['graphql']['user'] if not user: print('user is none.') exit(1) self.user_id = user['id'] # user id self.count = user['edge_owner_to_timeline_media']['count'] # 照片数量 复制代码
async def produce_download_urls(self, max=50): """ 获取每一页的所有照片链接 :param max: 一次要获取照片数量 :return: """ end_cursor = '' # 加载第一页 while True: pic_params = { 'query_hash': 'f2405b236d85e8296cf30347c9f08c2a', # query_hash 可以固定一个值 'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format( self.user_id, max, end_cursor), } pic_url = ROOT_URL + 'graphql/query/' async with self.session.get(pic_url, params=pic_params) as resp: json = await resp.json() edge_media = json['data']['user'][ 'edge_owner_to_timeline_media'] edges = edge_media['edges'] if edges: for edge in edges: await self.queue.put(edge['node']['display_url']) # queue通信 has_next_page = edge_media['page_info']['has_next_page'] # json中有一个has next page项,其值是 true或false,用来判断是否有下一页 if has_next_page: end_cursor = edge_media['page_info']['end_cursor'] # 获取 end cursor else: break 复制代码
async def download(self): """ 下载照片 :return: """ while not (self.producer.done() and self.queue.empty()): # 生产任务是否没有完成以及queue队列是否不为空 url = await self.queue.get() # 获取照片链接 filename = PATH / url.split('?')[0].split('/')[-1] async with self.session.get(url) as resp: with filename.open('wb') as f: async for chunk in resp.content.iter_any(): f.write(chunk) self.queue.task_done() # 表示刚刚排队的任务已完成(就是用get取出的照片url下载完成了) print('.', end='', flush=True) 复制代码
async def run(self): """ :return: """ print('Preparing...') print('Initializing...') await self.init() print('User id: %r.' % self.user_id) print('Total %r photos.' % self.count) print('-'*50) self.producer = asyncio.create_task(self.produce_download_urls()) print('Downloading...', end='', flush=True) await asyncio.gather(*(self.download() for _ in range(self.maxtasks))) # asyncio.gather和asyncio.wait差不多,具体百度 复制代码
def check(_): """ 检测照片数量。。。太菜了不知道怎么停止只能这样了(逃; """ print('Start check...') with requests.get(urljoin(ROOT_URL, USERNAME), headers=HEADERS, proxies={'http': 'http://localhost:80001', 'https': 'https://localhost:8001'}) as resp: pattern = '"edge_owner_to_timeline_media":.?{"count":(.*?),"page_info"' count = int(re.findall(pattern, resp.text)[0]) while True: files = len(os.listdir(PATH)) print('Check files:%r' % files) if files == count: # print('Total %r photos download done.' % count) print('\nProduce done, Total %r photos, plz wait save done :)' % count) sys.exit(0) 复制代码
async def main(): ins = Instagram(USERNAME) try: await ins.run() finally: await ins.close() 复制代码
if __name__ == '__main__':
if __name__ == '__main__': try: p = multiprocessing.Process(target=check, args=(0,)) p.start() future = asyncio.ensure_future(main()) loop = asyncio.get_event_loop() loop.run_until_complete(future) loop.close() except KeyboardInterrupt: pass 复制代码
