进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。
使用多进程来并发可以使 python 程序有效利用多核 CPU。
multiprocessing
模块是 python3 推荐使用的内置模块。
在 Windows 操作系统中由于没有 fork,在创建子进程的时候会自动以 import 启动,导致递归。所以必须把创建子进程的部分写在if __name__ =='__main__'
中。
同时,由于 Windows 操作系统中由于没有 fork,子进程不能获得父进程的权限导致拒绝访问,请改变用户策略组设置(见下)
子进程
# _*_coding:utf-8_*_
from multiprocessing import Process
def f(name):
print("我单推", name)
if __name__ == "__main__":
p = Process(target=f, args=("ayame",))
p.start()
print("摸了")
子进程对象方法
.start()
启动。子进程的执行顺序不是根据启动顺序决定的。.join(timeout = None)
阻塞主进程,直至子进程运行完毕或超时。.daemon=True
设置为守护进程,要在 start 之前设置。主进程代码运行结束,守护进程随即终止。terminate()
关闭进程(不会立即关闭)
多个子进程
from multiprocessing import Process
import time
def f(name):
time.sleep(1)
print("我单推", name)
if __name__ == "__main__":
dd_list = ["shishi", "hima", "sasaki"]
dd_list_p = []
for name in dd_list:
p = Process(target=f, args=(name,))
p.start()
dd_list_p.append(p)
for p in dd_list_p:
p.join()
print("一秒单推一个也是单推(并发)")
锁和管道
锁Lock()
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改
from multiprocessing import Process, Lock
import os, time, random
def watch():
"""这里用sleep代替了对锁抢占的内存资源的读写"""
time.sleep(random.random())
def work(lock, name):
lock.acquire()
print(f"你关注的{name}开播了,房间号是{os.getpid()}")
watch()
print(f"你关注的{name}下播了,房间号是{os.getpid()}")
lock.release()
if __name__ == "__main__":
lock = Lock()
for name in ["shishi", "hima", "sasaki"]:
p = Process(target=work, args=(lock, name))
p.start()
浪费了时间,却保证了数据的安全。
除了最简单的锁Lock()
,还有
RLock()
递归锁Semaphore(value=1)
计数器锁BoundedSemaphore(value=1)
带上限的计数器锁
管道Pipe(duplex=True)
参数duplex 是否双向
返回包含两个 connection 对象的元组,connection 相当于面向消息的套接字,数据是存在于内存中的。
队列Queue([maxsize])
队列 Queue
又是基于(管道+锁)实现的内存中的共享数据,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
队列对象方法
.empty()
是否空.full()
是否满.put(obj, block=True, timeout=None)
压入队列.get(block=True, timeout=None)
弹出队列
import multiprocessing, os
def push(queue, name):
"""向queue中put数据"""
info = f"YouTube频道{os.getpid():<5d}:{name:>7s}-Channel播了"
queue.put(info)
def shift(queue, name):
"""从queue中get数据"""
info = queue.get()
print(f"bilibili房间号{os.getpid():<5d}:{name:>7s}-Official同传-->{info}")
if __name__ == "__main__":
youtube = []
bilibili = []
queue = multiprocessing.Queue(3)
# 子进程 油管开播
for name in ["shishi", "hima", "sasaki"]:
process = multiprocessing.Process(target=push, args=(queue, name))
process.start()
youtube.append(process)
# 子进程 哔哩哔哩转播
for name in ["shishi", "hima", "sasaki"]:
process = multiprocessing.Process(target=shift, args=(queue, name))
process.start()
bilibili.append(process)
for p in youtube:
p.join()
for p in bilibili:
p.join()
用于生产者消费者模型的JoinableQueue([maxsize])
JoinableQueue
类是 Queue
的子类,额外添加了 task_done()
和 join()
方法。
.task_done()
指出之前进入队列的一个任务已经完成.join()
阻塞,直到队列中所有任务均被task_done()
from multiprocessing import Process, JoinableQueue
import time, random, os
def consumer(q):
while True:
res = q.get() # 阻塞 直到get到 可以设定timeout抛出异常
time.sleep(random.randint(1, 3))
print(f" DD - id{os.getpid():<5d}:{res}看完了")
q.task_done() # 每次get都需要task_done
print("consumer ok")
def producer(name, q):
for i in range(10): # 实际运用中不知道数目(如爬虫翻页),需要get同时put
time.sleep(random.randint(1, 3))
res = f"{name}-{i}回目"
q.put(res)
print(f"Official- id{os.getpid():<5d}:{res}烤好了")
q.join() # 阻塞 q为空且get次数=task_done次数即可解除阻塞 期间可追加put
# q.close()# join之后不再put和get了可以close
print(f"{name}组收益了")
if __name__ == "__main__":
q = JoinableQueue()
# 生产者们(producer):烤肉man
nana_offic = Process(target=producer, args=("nana", q))
aqua_offic = Process(target=producer, args=("aqua", q))
peko_offic = Process(target=producer, args=("peko", q))
officials = [nana_offic, aqua_offic, peko_offic]
# 消费者们(consumer):DD
dd = [Process(target=consumer, args=(q,)) for i in range(2)]
for p in officials + dd:
p.daemon = True # 完成的时候consumer里还在get阻塞着需要强制结束
p.start()
for p in officials:
p.join()
input("熬死DD了") # 等输出完毕再回车结束
进程池
进程池Pool
控制同一时间最多有固定数量的进程在运行。
它支持带有超时和回调的异步结果,以及一个并行的 map 实现。
创建进程池 multiprocessing.Pool([numprocess [,initializer [, initargs]]])
创建时参数
numprocess
要创建的进程数,如果省略,将默认使用 cpu_count()的值initializer
每个工作进程启动时要执行的可调用对象,默认为 Noneinitargs
是要传给 initializer 的参数组
进程池对象方法
.apply(func [, args [, kwargs]])
排队执行func(*args,**kwargs)
返回结果.apply_async(func [, args [, kwargs]])
并行执行,返回 AsyncResul 的实例.map(func, iterable)
内置函数 map 的并行版本 (只支持一个 iterable 参数).map_async(func, iterable)
同 map,返回的是 AsyncResul 的实例.imap(func, iterable)
延迟执行的 map,所需的内存小, chunksize参数可调速
_async
的方法支持callback , error_callback 参数,表示将结果.get
传入回调和错误回调
.close()
关闭进程池.jion()
等待退出。要在 close 之后调用
AsyncResul 对象方法
.get([timeout])
获取执行结果。可选 timeout 抛出异常。.ready()
是否调用完成。.successful()
是否调用完成且没有引发异常。如果就绪之前调用将引发异常。.wait([timeout])
等待结果变为可用。.terminate()
立即终止所有工作进程,不执行任何挂起工作。
用例
import urllib.request as request # 标准库 同步网络请求
from multiprocessing import Pool
import os, re
User_Agent = (
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0"
)
headers = dict.fromkeys(("User-Agent",), User_Agent)
def get_page(url):
req = request.Request(url=url, headers=headers)
try:
res = request.urlopen(req).read().decode("utf-8")
except:
print(f"{os.getpid():<6d}请求失败")
return url
else:
print(f"{os.getpid():<6d}请求成功")
return res
def get_title(res):
if title := re.search(r"(?<=\<title\>).*?(?=</title>)", res):
print(f"{os.getpid():<6d}解析网页标题为 {title.group()}")
else:
print(f"{os.getpid():<6d}解析不到 {res} 的网页标题")
if __name__ == "__main__":
p = Pool(3)
urls = [
"https://www.baidu.com",
"https://www.bilibili.com",
"https://twitter.com",
"https://cn.bing.com/",
"https://mail.163.com/",
"https://dict.hjenglish.com/",
"https://github.com/",
]
for url in urls:
res = p.apply_async(get_page, args=(url,), callback=get_title)
p.close()
p.join()
print("完成")
数据共享
进程池 callback,队列 put 等都只能接受可序列化的对象作为参数
否则会抛出异常:AttributeError: Cant pickle local object