python并发并行

平时写python脚本用到的并发并行例子

一、多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import threading
import time
import queue
import sys

SHARE_Q = queue.Queue() # 构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 30 # 设置线程的个数
class MyThread(threading.Thread):
"""
doc of class
Attributess:
func: 线程函数逻辑
"""
def __init__(self, func):
super(MyThread, self).__init__() # 调用父类的构造函数
self.func = func # 传入线程函数逻辑
def run(self):
"""
重写基类的run方法

"""
self.func()
def do_something(item):
"""
运行逻辑, 比如抓站
"""
print(item)
def worker():
"""
主要用来写工作逻辑, 只要队列不空持续处理
队列为空时, 检查队列, 由于Queue中已经包含了wait,
notify和锁, 所以不需要在取任务或者放任务的时候加锁解锁
"""
global SHARE_Q
total = SHARE_Q.qsize()
while True:
if not SHARE_Q.empty():
item = SHARE_Q.get() # 获得任务
precent = "%.2f%%" % ((float(total-SHARE_Q.qsize())/float(total))*100)
sys.stdout.write('\rRuning : ' + precent + '\033[K')
sys.stdout.flush()
do_something(item)
time.sleep(1)
SHARE_Q.task_done()
def main():
global SHARE_Q
threads = []
# 向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
for task in range(1,5):
SHARE_Q.put(task)
# 开启_WORKER_THREAD_NUM个线程
for i in range(_WORKER_THREAD_NUM):
thread = MyThread(worker)
thread.start() # 线程开始处理任务
threads.append(thread)
for thread in threads:
thread.join()
# 等待所有任务完成
SHARE_Q.join()
if __name__ == '__main__':
main()

二、多进程

需要处理返回结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Pool
import time,random

def work(n):
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool()

res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,))
res_l.append(res)

p.close()
p.join() #等待进程池中所有进程执行完毕

nums=[]
for res in res_l:
nums.append(res.get()) #拿到所有结果
print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理

或者可以利用回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def filter(domain):
res = requests.get(domain,headers=headers).status_code
return domain,res
def save(domain,res):
f2 = open('check_result.txt','a')
f2.write(domain+'\t'+res+'\n')
f2.close()

def main():
pool = Pool(10)
filename = "urls.txt"
f = open(filename,'r')
for domain in f.readlines():
pool.apply_async(filter,(domain.strip('\n'),),callback=save)
f.close()
pool.close()
pool.join()

三、协程

1
2
3
4
5
6
7
8
9
10
11
import gevent
from gevent.lock import BoundedSemaphore
# 打上猴子补丁
from gevent import monkey
monkey.patch_all()
sem = BoundedSemaphore(4) #协程限制并发数

def worker(n):
sem.acquire()
sem.release()
gevent.joinall([gevent.spawn(worker, i) for i in xrange(0, 6)])

四、asyncio 协程异步用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#coding:utf-8
import asyncio,aiohttp

url = 'https://www.baidu.com/?page={0}'
async def hello(url,semaphore):
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
res = await response.read()
print(res)

async def run():
semaphore = asyncio.Semaphore(5) # 限制并发量为5
to_get = [hello(url.format(_),semaphore) for _ in range(100)] #总共100任务
await asyncio.wait(to_get)

def main():
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.run_forever() # restart loop
finally:
loop.close()

if __name__ == '__main__':
main()

另一个例子来自https://blog.csdn.net/CoolScript/article/details/89948434

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import aiohttp
import requests
import asyncio

async def test_single_proxy(proxy):
# conn = aiohttp.TCPConnector(ssl=False)
# async with aiohttp.ClientSession(connector=conn) as session:
async with aiohttp.ClientSession() as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf8')
real_proxy = 'http://' + proxy
async with session.get('http://xa.ganji.com/', proxy=real_proxy, timeout=6) as response:
if response.status in [200]:
print(proxy, 'valid pass ……')
else:
print(proxy, 'remove')
except:
print('请求失败')

api_list = [
'60.169.217.151:46174',
'27.220.121.8:54276',
'180.122.145.179:33419',
'59.58.209.134:36154',
'123.180.209.136:41878',
'123.188.102.233:38182',
'60.182.33.2:23016',
'171.11.138.254:33794',
'123.162.151.90:23693',
'125.65.91.130:24209'
]


async def main(api_list):
tasks = [asyncio.create_task(test_single_proxy(proxy))
for proxy in api_list]
[await t for t in tasks]

if __name__ == "__main__":
asyncio.run(main(api_list))

参考
https://www.cnblogs.com/shenh/p/9090586.html
https://blog.csdn.net/CoolScript/article/details/89948434

安全开发