1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| #! /usr/bin/env python3 # encoding: utf-8 import asyncio import pyppeteer from pyppeteer import launch import time from lib.parse import * from urllib import parse from queue import Queue #from lib.mysqllib import * from config import * import redis
pyppeteer.DEBUG = True redis_hash = redis.Redis(host=redis_config['host'],password=redis_config['password'],port=redis_config['port'],db=redis_config['hash_db']) redis_task = redis.Redis(host=redis_config['host'],password=redis_config['password'],port=redis_config['port'],db=redis_config['db'])
async def close_dialog(dialog): await dialog.dismiss() # 关闭弹框
async def get_links(url): browser = await launch({ 'headless': False, 'dumpio': True, 'args': [ '--proxy-server={}'.format(PROXY), '--no-sandbox', '--no-first-run', '--disable-gpu', '--disable-dev-shm-usage', '--disable-setuid-sandbox', '--no-zygote', '--disable-images' ] }) page = await browser.newPage() try: await page.goto(url,timeout=15000) page.on('dialog', close_dialog) html_links = await page.evaluate('''() => { /*doing event_links*/ /* var i = 0; nodes=document.all; for(j=0;j<nodes.length; j++) { attrs = nodes[j].attributes; for(k=0;k<attrs.length; k++) { if (attrs[k].nodeName.startsWith('onclick')) { //console.log(attrs[k].nodeName, attrs[k].nodeValue); try{ eval(attrs[k].nodeValue); i = i+1; if(i>30){ break; } } catch(err){ break; //continue; } } } } */ var urls = new Array(); links = document.links; for(j=0;j<links.length;j++){ urls[j]=links[j].href;} return urls; }''') await page.close() await browser.close() return html_links except Exception as e: await page.close() await browser.close() return e
async def worker(url,semaphore): async with semaphore: print("start : [" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + "] : " + url) #redis_task.srem('tasks', bytes(url, encoding='utf-8')) links = await get_links(url) if isinstance(links,list): for link in links : if link.find('javascript') == -1 and link.find('http') != -1: parses = parse.urlparse(link) if check_host(parses.netloc) : #过滤域名 if capture_pass(parses.path) : # 过滤静态页 if parses.path == '/' or parses.path == '': continue hash = get_url_hash(link, '') if bytes(parses.netloc,encoding='utf-8') not in redis_hash.keys(): redis_hash.sadd(parses.netloc,hash) redis_task.sadd('tasks', link) else: #if parses.query == '': # path if bytes(hash,encoding='utf-8') not in redis_hash.smembers(parses.netloc): redis_hash.sadd(parses.netloc, hash) redis_task.sadd('tasks', link) else: print("error: " + str(links)) async def monitor(): semaphore = asyncio.Semaphore(2) # 限制并发量为5 to_get = [] # for domain in domains : while True: urls = redis_task.smembers('tasks') if urls: for url in urls: url = str(url, encoding='utf-8') to_get.append(asyncio.ensure_future(worker(url,semaphore))) await asyncio.wait(to_get) else: print("ending ...") break def main(): loop = asyncio.get_event_loop() try: loop.run_until_complete(monitor()) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): task.cancel() loop.run_forever() # restart loop finally: loop.close()
if __name__ == '__main__': main()
|