Skip to content

进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

使用多进程来并发可以使 python 程序有效利用多核 CPU。

multiprocessing模块是 python3 推荐使用的内置模块。

在 Windows 操作系统中由于没有 fork,在创建子进程的时候会自动以 import 启动,导致递归。所以必须把创建子进程的部分写在if __name__ =='__main__' 中。

同时,由于 Windows 操作系统中由于没有 fork,子进程不能获得父进程的权限导致拒绝访问,请改变用户策略组设置(见下)

子进程

python
# _*_coding:utf-8_*_
from multiprocessing import Process


def f(name):
    print("我单推", name)


if __name__ == "__main__":
    p = Process(target=f, args=("ayame",))
    p.start()
    print("摸了")

子进程对象方法

  • .start() 启动。子进程的执行顺序不是根据启动顺序决定的。
  • .join(timeout = None) 阻塞主进程,直至子进程运行完毕或超时。
  • .daemon=True 设置为守护进程,要在 start 之前设置。主进程代码运行结束,守护进程随即终止。
  • terminate() 关闭进程(不会立即关闭)

多个子进程

python
from multiprocessing import Process
import time


def f(name):
    time.sleep(1)
    print("我单推", name)


if __name__ == "__main__":
    dd_list = ["shishi", "hima", "sasaki"]
    dd_list_p = []
    for name in dd_list:
        p = Process(target=f, args=(name,))
        p.start()
        dd_list_p.append(p)
    for p in dd_list_p:
        p.join()
    print("一秒单推一个也是单推(并发)")

锁和管道

Lock()

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改

python
from multiprocessing import Process, Lock
import os, time, random


def watch():
    """这里用sleep代替了对锁抢占的内存资源的读写"""
    time.sleep(random.random())


def work(lock, name):
    lock.acquire()
    print(f"你关注的{name}开播了,房间号是{os.getpid()}")
    watch()
    print(f"你关注的{name}下播了,房间号是{os.getpid()}")
    lock.release()


if __name__ == "__main__":
    lock = Lock()
    for name in ["shishi", "hima", "sasaki"]:
        p = Process(target=work, args=(lock, name))
        p.start()

浪费了时间,却保证了数据的安全。

除了最简单的锁Lock() ,还有

  • RLock() 递归锁
  • Semaphore(value=1) 计数器锁
  • BoundedSemaphore(value=1) 带上限的计数器锁

管道Pipe(duplex=True)

参数duplex 是否双向

返回包含两个 connection 对象的元组,connection 相当于面向消息的套接字,数据是存在于内存中的。

队列Queue([maxsize])

队列 Queue 又是基于(管道+锁)实现的内存中的共享数据,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

队列对象方法

  • .empty() 是否空
  • .full() 是否满
  • .put(obj, block=True, timeout=None) 压入队列
  • .get(block=True, timeout=None) 弹出队列
python
import multiprocessing, os


def push(queue, name):
    """向queue中put数据"""
    info = f"YouTube频道{os.getpid():<5d}:{name:>7s}-Channel播了"
    queue.put(info)


def shift(queue, name):
    """从queue中get数据"""
    info = queue.get()
    print(f"bilibili房间号{os.getpid():<5d}:{name:>7s}-Official同传-->{info}")


if __name__ == "__main__":
    youtube = []
    bilibili = []
    queue = multiprocessing.Queue(3)

    # 子进程 油管开播
    for name in ["shishi", "hima", "sasaki"]:
        process = multiprocessing.Process(target=push, args=(queue, name))
        process.start()
        youtube.append(process)
    # 子进程 哔哩哔哩转播
    for name in ["shishi", "hima", "sasaki"]:
        process = multiprocessing.Process(target=shift, args=(queue, name))
        process.start()
        bilibili.append(process)

    for p in youtube:
        p.join()
    for p in bilibili:
        p.join()

用于生产者消费者模型的JoinableQueue([maxsize])

JoinableQueue 类是 Queue 的子类,额外添加了 task_done()join()方法。

  • .task_done() 指出之前进入队列的一个任务已经完成
  • .join() 阻塞,直到队列中所有任务均被task_done()
python
from multiprocessing import Process, JoinableQueue
import time, random, os


def consumer(q):
    while True:
        res = q.get()  # 阻塞 直到get到 可以设定timeout抛出异常
        time.sleep(random.randint(1, 3))
        print(f"   DD   - id{os.getpid():<5d}{res}看完了")
        q.task_done()  # 每次get都需要task_done
    print("consumer ok")


def producer(name, q):
    for i in range(10):  # 实际运用中不知道数目(如爬虫翻页),需要get同时put
        time.sleep(random.randint(1, 3))
        res = f"{name}-{i}回目"
        q.put(res)
        print(f"Official- id{os.getpid():<5d}{res}烤好了")
    q.join()  # 阻塞 q为空且get次数=task_done次数即可解除阻塞 期间可追加put
    # q.close()# join之后不再put和get了可以close
    print(f"{name}组收益了")


if __name__ == "__main__":
    q = JoinableQueue()
    # 生产者们(producer):烤肉man
    nana_offic = Process(target=producer, args=("nana", q))
    aqua_offic = Process(target=producer, args=("aqua", q))
    peko_offic = Process(target=producer, args=("peko", q))
    officials = [nana_offic, aqua_offic, peko_offic]

    # 消费者们(consumer):DD
    dd = [Process(target=consumer, args=(q,)) for i in range(2)]

    for p in officials + dd:
        p.daemon = True  # 完成的时候consumer里还在get阻塞着需要强制结束
        p.start()
    for p in officials:
        p.join()

    input("熬死DD了")  # 等输出完毕再回车结束

进程池

进程池Pool控制同一时间最多有固定数量的进程在运行。

它支持带有超时和回调的异步结果,以及一个并行的 map 实现。

创建进程池 multiprocessing.Pool([numprocess [,initializer [, initargs]]])

创建时参数

  • numprocess 要创建的进程数,如果省略,将默认使用 cpu_count()的值
  • initializer 每个工作进程启动时要执行的可调用对象,默认为 None
  • initargs 是要传给 initializer 的参数组

进程池对象方法

  • .apply(func [, args [, kwargs]]) 排队执行func(*args,**kwargs)返回结果
  • .apply_async(func [, args [, kwargs]]) 并行执行,返回 AsyncResul 的实例
  • .map(func, iterable) 内置函数 map 的并行版本 (只支持一个 iterable 参数)
  • .map_async(func, iterable) 同 map,返回的是 AsyncResul 的实例
  • .imap(func, iterable) 延迟执行的 map,所需的内存小, chunksize参数可调速

_async的方法支持callback , error_callback 参数,表示将结果.get传入回调和错误回调

  • .close() 关闭进程池
  • .jion() 等待退出。要在 close 之后调用

AsyncResul 对象方法

  • .get([timeout]) 获取执行结果。可选 timeout 抛出异常。
  • .ready() 是否调用完成。
  • .successful() 是否调用完成且没有引发异常。如果就绪之前调用将引发异常。
  • .wait([timeout]) 等待结果变为可用。
  • .terminate() 立即终止所有工作进程,不执行任何挂起工作。

用例

python
import urllib.request as request  # 标准库 同步网络请求
from multiprocessing import Pool
import os, re

User_Agent = (
    "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0"
)
headers = dict.fromkeys(("User-Agent",), User_Agent)


def get_page(url):
    req = request.Request(url=url, headers=headers)
    try:
        res = request.urlopen(req).read().decode("utf-8")
    except:
        print(f"{os.getpid():<6d}请求失败")
        return url
    else:
        print(f"{os.getpid():<6d}请求成功")
        return res


def get_title(res):
    if title := re.search(r"(?<=\<title\>).*?(?=</title>)", res):
        print(f"{os.getpid():<6d}解析网页标题为 {title.group()}")
    else:
        print(f"{os.getpid():<6d}解析不到 {res} 的网页标题")


if __name__ == "__main__":
    p = Pool(3)

    urls = [
        "https://www.baidu.com",
        "https://www.bilibili.com",
        "https://twitter.com",
        "https://cn.bing.com/",
        "https://mail.163.com/",
        "https://dict.hjenglish.com/",
        "https://github.com/",
    ]

    for url in urls:
        res = p.apply_async(get_page, args=(url,), callback=get_title)

    p.close()
    p.join()
    print("完成")

数据共享

进程池 callback,队列 put 等都只能接受可序列化的对象作为参数

否则会抛出异常:AttributeError: Cant pickle local object