Skip to content

协程

协程是单线程下,用户态的并发。英文名 Coroutine,又称微线程,纤程。
早期 Python 通过yield.send()提供了对协程的基本支持,但是不完全。
在 python 中,由于 GIL 锁的存在,并发编程的最佳实践是多进程+协程的方式。

greenlet 模块

第三方模块greenlet已经实现了协程,无需写生成器,.switch()代替了.send()

python
from greenlet import greenlet


def eat(name):
    print("%s eat 1" % name)
    g2.switch("egon")
    print("%s eat 2" % name)
    g2.switch()


def play(name):
    print("%s play 1" % name)
    g1.switch()
    print("%s play 2" % name)


g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch("egon")  # 可以在第一次switch时传入参数,以后都不需要

gevent 模块

第三方库gevent实现并发同步或异步编程,内部以greenlet方式自动调度它自己的 IO,它是以 C 扩展模块形式接入 Python 的轻量级协程。是 python2 版本协程的首选方案。它的 API 与进程,线程相近。

python
import gevent

g1 = gevent.spawn(func, 2, 3, x=4, y=5)
g2 = gevent.spawn(func2)

g1.join()  # 等待g1结束
g2.join()  # 等待g2结束
# 或者上述两步合作一步:gevent.joinall([g1,g2])

print(g1.value)  # 拿到func1的返回值

它自己的 IO 有:gevent.sleep()gevent.socket

猴子补丁

python
from gevent import spawn, joinall, monkey

monkey.patch_all()

在它之后导入的模块将被替换为它自己的 io,动态替换已有的标准库。

其他第三方异步库

eventlet:同gevent模块,内部以greenlet方式,也有猴子补丁方法,但没gevent好用。
twisted:基于事件驱动的网络引擎框架。
tornado:最简单的回调实现异步协程,作为 httpserver 和 httpclient,同时也是 Web 框架。

asyncio 模块

asyncio是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持。

可等待对象(以下简写为aw),包括:

  • coro:coroutine 协程对象,async关键字定义的函数返回的对象
  • task:任务对象,包含协程对象的各种状态
  • future:task 的父类,给予高层去等待 类似于 js 的 promise

注意 asyncio.Future 只是模仿 concurrent.futures.Future 。方法并不完全一样

简单运行和休眠事件循环

  • asyncio.run(aw) 创建和运行当前线程的 loop
  • asyncio.sleep(秒数) 模拟 io 耗时

创建事件循环

最简单的运行

asyncio.run(aw)

python
import asyncio


async def say(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    await say(1, "hello")
    await say(2, "world")


asyncio.run(main())

它将会打印 "start",
等待 1 秒,再打印 "hello"
等待 2 秒,再打印 "world"

由于没有注册为 Task 对象 await coro()代码依然是同步执行的,

实际上是继发执行了两个异步任务

asyncio.run(aw)是 python3.7 以上的写法
以前的写法如下,只能先创建 loop 对象再使用 loop 上的方法。

python
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

其他方法创建,获取和设置事件循环

  • asyncio.get_running_loop()
  • asyncio.get_event_loop() 获取当前上下文的事件循环
  • asyncio.set_event_loop(loop)
  • asyncio.new_event_loop() 一般是给新线程使用,见下

创建 Future 或 Task

asyncio.create_task(aw)

最简单的注册 coro 为 Task 对象的方法

将协程对象及其各种状态打包注册为 Task 对象,放入 asyncio.all_tasks 的集合(可以看成任务队列,先放进去的在前面,但是 main 在最后面)中,并返回。

asyncio.create_task()是 python3.7 以上的写法,以前只能用asyncio.ensure_future()loop.create_task()

python
# 替代上文的main
async def main():
    task1 = asyncio.create_task(say(1, "hello"))
    task2 = asyncio.create_task(say(2, "world"))
    await task1
    await task2

它将会等待 1 秒,打印 "hello" ;等待 1 秒,再打印 "world"
至此,并发运行了两个异步任务,asyncio.all_tasks 的集合被清空了。

await task

必须全部注册为 task 之后再 await,否则 asyncio.all_tasks 里除了 main 始终只有一个 task,可以预见依然是继发执行的

asyncio.gather(*aws)

多个 task 或者 coro 整合为一个 task

有返回值时,可获得顺序同 aws 的返回值列表

asyncio.as_completed(aws)

返回一个 task 迭代器

python
async def count(m):
    await asyncio.sleep(3)
    return m


async def main():
    tasks = [count(i) for i in range(10)]
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)


asyncio.run(main())

上下文管理

实现了__aenter____aexit__。这两个方法都返回一个 awaitable 类型的值。

和常规的 with 表达式一样,可以在一个 async with 表达式中指定多个上下文管理器。

以下是 python asyncio 的其他生态:

python
import aiohttp  # 第三方库 异步网络请求


async def req(url, head):
    async with aiohttp.ClientSession() as session:
        async with session.get(url=url, headers=head) as response:
            page_html = await response.text()
python
import aiofiles  # 第三方库 异步文件操作

async with aiofiles.open(filename, mode="r", encoding=encoding) as f:
    async for line in f:
        pass

上例中 async for 用来循环异步可迭代对象,其.__aiter__()返回的异步迭代器需要实现__anext__方法

loop 对象方法

记得把 loop 对象传进 main 中以便操作 loop 方法

  • loop.run_until_complete(future)。运行事件循环,直到 future 运行结束
  • loop.run_forever() 表示事件循环会一直运行,直到遇到 stop。
  • loop.stop()。停止事件循环
  • loop.is_running()。如果事件循环依然在运行,则返回 True
  • loop.is_closed()。如果事件循环已经 close,则返回 True
  • loop.close()。关闭事件循环
  • loop.time()。事件循环的时钟
  • loop.create_future(coro) ,返回 future 对象
  • loop.create_task(coro) ,返回 task 对象
  • loop.run_in_executor(executor, work) 连接线程池实现同步任务异步执行 executor 是 concurrent.futures.ThreadPoolExecutor 的实例对象
  • callback(只接受位置参数,且不是 coro 是普通函数)
    • loop.call_later(delay, callback, *args, context=None) 延迟回调
    • loop.call_at(when, callback, *args, context=None) 时刻点回调 结合 loop.time() 使用
    • loop.call_soon(callback, *args, context=None) 下一个循环立即回调
    • loop.call_soon_threadsafe(callback, *args, context=None) 下一个循环立即回调,必须在另一个线程中使用

可以在新线程中使用 loop.run_forever() 使其永远等待主线程向其发射 task

python
import asyncio, aiohttp, re
from threading import Thread

header = {
    "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.67"
}


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def end_loop():
    loop = asyncio.get_event_loop()
    loop.stop()


async def get_title(url):
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url=url, headers=header) as response:
                page_text = await response.text()
                if title := re.search(r"(?<=\<title\>).*?(?=\</title\>)", page_text):
                    print(f"{url}: 解析网页标题为 {title.group()}")
                else:
                    print(f"{url}: 解析不到网页标题")
    except Exception as e:
        print(f"{url}: {e}")


new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
while 1:
    new_loop.call_soon(print, "请输入完整url如https://t.com")
    ipt = input()
    if ipt == "q":
        asyncio.run_coroutine_threadsafe(end_loop(), new_loop)
        break
    else:
        asyncio.run_coroutine_threadsafe(get_title(input()), new_loop)
t.join()

上例中主线程一直等待着用户输入。用户输入后立即生成 aw 抛给子线程的事件循环。

task 对象方法

  • .get_coro() 获取它是由哪个协程对象注册为任务的
  • .cancel() 取消任务
  • .result() 获取返回
  • .add_done_callback() 添加完成时回调

task 规划

  • asyncio.wait_for(aw, timeout) 超时抛出异常
  • asyncio.wait(tasks, return_when) 阻塞等待所传 tasks 执行完毕 执行完毕可设置为任一完毕 任一异常 全部完毕
python
import asyncio


async def count(m):
    await asyncio.sleep(2)
    return m


async def main():
    tasks = [count(i) for i in range(10)]
    done, pending = await asyncio.wait(tasks)
    for i in done:
        print(i.result())


asyncio.run(main())

return_when 指定此函数应在何时返回。它必须为以下常数之一:

常数描述
FIRST_COMPLETED任一完毕
FIRST_EXCEPTION任一异常。当没有引发任何异常时它就相当于 ALL_COMPLETED
ALL_COMPLETED默认。全部完毕

wait_for()wait() 在超时发生时不会取消可等待对象。

asyncio.all_tasks()

返回事件循环所运行的未完成的 Task 对象的集合

python
for task in asyncio.all_tasks():
    print(task.cancel())