平时写python脚本用到的并发并行例子
一、多线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 #!/usr/bin/env python3 # -*- coding:utf-8 -*- import threading import time import queue import sys SHARE_Q = queue.Queue() # 构造一个不限制大小的的队列 _WORKER_THREAD_NUM = 30 # 设置线程的个数 class MyThread(threading.Thread): """ doc of class Attributess: func: 线程函数逻辑 """ def __init__(self, func): super(MyThread, self).__init__() # 调用父类的构造函数 self.func = func # 传入线程函数逻辑 def run(self): """ 重写基类的run方法 """ self.func() def do_something(item): """ 运行逻辑, 比如抓站 """ print(item) def worker(): """ 主要用来写工作逻辑, 只要队列不空持续处理 队列为空时, 检查队列, 由于Queue中已经包含了wait, notify和锁, 所以不需要在取任务或者放任务的时候加锁解锁 """ global SHARE_Q total = SHARE_Q.qsize() while True: if not SHARE_Q.empty(): item = SHARE_Q.get() # 获得任务 precent = "%.2f%%" % ((float(total-SHARE_Q.qsize())/float(total))*100) sys.stdout.write('\rRuning : ' + precent + '\033[K') sys.stdout.flush() do_something(item) time.sleep(1) SHARE_Q.task_done() def main(): global SHARE_Q threads = [] # 向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务 for task in range(1,5): SHARE_Q.put(task) # 开启_WORKER_THREAD_NUM个线程 for i in range(_WORKER_THREAD_NUM): thread = MyThread(worker) thread.start() # 线程开始处理任务 threads.append(thread) for thread in threads: thread.join() # 等待所有任务完成 SHARE_Q.join() if __name__ == '__main__': main()
二、多进程 需要处理返回结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from multiprocessing import Pool import time,random def work(n): time.sleep(1) return n**2 if __name__ == '__main__': p=Pool() res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) res_l.append(res) p.close() p.join() #等待进程池中所有进程执行完毕 nums=[] for res in res_l: nums.append(res.get()) #拿到所有结果 print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理
或者可以利用回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 def filter(domain): res = requests.get(domain,headers=headers).status_code return domain,res def save(domain,res): f2 = open('check_result.txt','a') f2.write(domain+'\t'+res+'\n') f2.close() def main(): pool = Pool(10) filename = "urls.txt" f = open(filename,'r') for domain in f.readlines(): pool.apply_async(filter,(domain.strip('\n'),),callback=save) f.close() pool.close() pool.join()
三、协程 1 2 3 4 5 6 7 8 9 10 11 import gevent from gevent.lock import BoundedSemaphore # 打上猴子补丁 from gevent import monkey monkey.patch_all() sem = BoundedSemaphore(4) #协程限制并发数 def worker(n): sem.acquire() sem.release() gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])
四、asyncio 协程异步用法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 #coding:utf-8 import asyncio,aiohttp url = 'https://www.baidu.com/?page={0}' async def hello(url,semaphore): async with semaphore: async with aiohttp.ClientSession() as session: async with session.get(url) as response: res = await response.read() print(res) async def run(): semaphore = asyncio.Semaphore(5) # 限制并发量为5 to_get = [hello(url.format(_),semaphore) for _ in range(100)] #总共100任务 await asyncio.wait(to_get) def main(): loop = asyncio.get_event_loop() try: loop.run_until_complete(run()) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): task.cancel() loop.run_forever() # restart loop finally: loop.close() if __name__ == '__main__': main()
另一个例子来自https://blog.csdn.net/CoolScript/article/details/89948434
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import aiohttp import requests import asyncio async def test_single_proxy(proxy): # conn = aiohttp.TCPConnector(ssl=False) # async with aiohttp.ClientSession(connector=conn) as session: async with aiohttp.ClientSession() as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf8') real_proxy = 'http://' + proxy async with session.get('http://xa.ganji.com/', proxy=real_proxy, timeout=6) as response: if response.status in [200]: print(proxy, 'valid pass ……') else: print(proxy, 'remove') except: print('请求失败') api_list = [ '60.169.217.151:46174', '27.220.121.8:54276', '180.122.145.179:33419', '59.58.209.134:36154', '123.180.209.136:41878', '123.188.102.233:38182', '60.182.33.2:23016', '171.11.138.254:33794', '123.162.151.90:23693', '125.65.91.130:24209' ] async def main(api_list): tasks = [asyncio.create_task(test_single_proxy(proxy)) for proxy in api_list] [await t for t in tasks] if __name__ == "__main__": asyncio.run(main(api_list))
参考https://www.cnblogs.com/shenh/p/9090586.html https://blog.csdn.net/CoolScript/article/details/89948434