pyppeteer使用

Puppeteer 是 Google 基于 Node.js 开发的一个工具,有了它我们可以通过 JavaScript 来控制 Chrome 浏览器的一些操作,当然也可以用作网络爬虫上,其 API 极其完善,功能非常强大。而 Pyppeteer 是 Puppeteer 的 Python 版本的实现。

pyppeteer官方文档: https://miyakogi.github.io/pyppeteer/reference.html
puppeteer 中文文档: https://zhaoqize.github.io/puppeteer-api-zh_CN/

爬虫项目中具体的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!  /usr/bin/env  python3
# encoding: utf-8
import asyncio
import pyppeteer
from pyppeteer import launch
import time
from lib.parse import *
from urllib import parse
from queue import Queue
#from lib.mysqllib import *
from config import *
import redis

pyppeteer.DEBUG = True
redis_hash = redis.Redis(host=redis_config['host'],password=redis_config['password'],port=redis_config['port'],db=redis_config['hash_db'])
redis_task = redis.Redis(host=redis_config['host'],password=redis_config['password'],port=redis_config['port'],db=redis_config['db'])

async def close_dialog(dialog):
await dialog.dismiss() # 关闭弹框

async def get_links(url):
browser = await launch({
'headless': False,
'dumpio': True,
'args': [
'--proxy-server={}'.format(PROXY),
'--no-sandbox',
'--no-first-run',
'--disable-gpu',
'--disable-dev-shm-usage',
'--disable-setuid-sandbox',
'--no-zygote',
'--disable-images'
]
})
page = await browser.newPage()
try:
await page.goto(url,timeout=15000)
page.on('dialog', close_dialog)
html_links = await page.evaluate('''() => {
/*doing event_links*/
/*
var i = 0;
nodes=document.all;
for(j=0;j<nodes.length; j++) {
attrs = nodes[j].attributes;
for(k=0;k<attrs.length; k++) {
if (attrs[k].nodeName.startsWith('onclick')) {
//console.log(attrs[k].nodeName, attrs[k].nodeValue);
try{
eval(attrs[k].nodeValue);
i = i+1;
if(i>30){
break;
}
}
catch(err){
break;
//continue;
}
}
}
}
*/
var urls = new Array();
links = document.links;
for(j=0;j<links.length;j++){ urls[j]=links[j].href;}
return urls;
}''')
await page.close()
await browser.close()
return html_links
except Exception as e:
await page.close()
await browser.close()
return e

async def worker(url,semaphore):
async with semaphore:
print("start : [" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + "] : " + url)
#redis_task.srem('tasks', bytes(url, encoding='utf-8'))
links = await get_links(url)
if isinstance(links,list):
for link in links :
if link.find('javascript') == -1 and link.find('http') != -1:
parses = parse.urlparse(link)
if check_host(parses.netloc) : #过滤域名
if capture_pass(parses.path) : # 过滤静态页
if parses.path == '/' or parses.path == '':
continue
hash = get_url_hash(link, '')
if bytes(parses.netloc,encoding='utf-8') not in redis_hash.keys():
redis_hash.sadd(parses.netloc,hash)
redis_task.sadd('tasks', link)
else:
#if parses.query == '': # path
if bytes(hash,encoding='utf-8') not in redis_hash.smembers(parses.netloc):
redis_hash.sadd(parses.netloc, hash)
redis_task.sadd('tasks', link)
else:
print("error: " + str(links))

async def monitor():
semaphore = asyncio.Semaphore(2) # 限制并发量为5
to_get = []
# for domain in domains :
while True:
urls = redis_task.smembers('tasks')
if urls:
for url in urls:
url = str(url, encoding='utf-8')
to_get.append(asyncio.ensure_future(worker(url,semaphore)))
await asyncio.wait(to_get)
else:
print("ending ...")
break
def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(monitor())
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_forever() # restart loop
finally:
loop.close()

if __name__ == '__main__':
main()

使用headless 检测XSS漏洞

https://github.com/neverlovelynn/chrome_headless_xss/
https://xz.aliyun.com/t/3502#toc-16
https://blog.csdn.net/nmask/article/details/99379998

安全开发