协程
协程是单线程下,用户态的并发。英文名 Coroutine,又称微线程,纤程。
早期 Python 通过yield
,.send()
提供了对协程的基本支持,但是不完全。
在 python 中,由于 GIL 锁的存在,并发编程的最佳实践是多进程+协程的方式。
greenlet 模块
第三方模块greenlet
已经实现了协程,无需写生成器,.switch()
代替了.send()
。
from greenlet import greenlet
def eat(name):
print("%s eat 1" % name)
g2.switch("egon")
print("%s eat 2" % name)
g2.switch()
def play(name):
print("%s play 1" % name)
g1.switch()
print("%s play 2" % name)
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch("egon") # 可以在第一次switch时传入参数,以后都不需要
gevent 模块
第三方库gevent
实现并发同步或异步编程,内部以greenlet
方式自动调度它自己的 IO,它是以 C 扩展模块形式接入 Python 的轻量级协程。是 python2 版本协程的首选方案。它的 API 与进程,线程相近。
import gevent
g1 = gevent.spawn(func, 2, 3, x=4, y=5)
g2 = gevent.spawn(func2)
g1.join() # 等待g1结束
g2.join() # 等待g2结束
# 或者上述两步合作一步:gevent.joinall([g1,g2])
print(g1.value) # 拿到func1的返回值
它自己的 IO 有:gevent.sleep()
,gevent.socket
等
猴子补丁
from gevent import spawn, joinall, monkey
monkey.patch_all()
在它之后导入的模块将被替换为它自己的 io,动态替换已有的标准库。
其他第三方异步库
eventlet
:同gevent
模块,内部以greenlet
方式,也有猴子补丁方法,但没gevent
好用。twisted
:基于事件驱动的网络引擎框架。tornado
:最简单的回调实现异步协程,作为 httpserver 和 httpclient,同时也是 Web 框架。
asyncio 模块
asyncio
是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持。
可等待对象(以下简写为aw),包括:
- coro:coroutine 协程对象,
async
关键字定义的函数返回的对象 - task:任务对象,包含协程对象的各种状态
- future:task 的父类,给予高层去等待 类似于 js 的 promise
注意 asyncio.Future
只是模仿 concurrent.futures.Future
。方法并不完全一样
简单运行和休眠事件循环
asyncio.run(aw)
创建和运行当前线程的 loopasyncio.sleep(秒数)
模拟 io 耗时
创建事件循环
最简单的运行
asyncio.run(aw)
import asyncio
async def say(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
await say(1, "hello")
await say(2, "world")
asyncio.run(main())
它将会打印 "start",
等待 1 秒,再打印 "hello"
等待 2 秒,再打印 "world"
由于没有注册为 Task 对象 await coro()代码依然是同步执行的,
实际上是继发执行了两个异步任务
asyncio.run(aw)
是 python3.7 以上的写法
以前的写法如下,只能先创建 loop 对象再使用 loop 上的方法。
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
其他方法创建,获取和设置事件循环
asyncio.get_running_loop()
asyncio.get_event_loop()
获取当前上下文的事件循环asyncio.set_event_loop(loop)
asyncio.new_event_loop()
一般是给新线程使用,见下
创建 Future 或 Task
asyncio.create_task(aw)
最简单的注册 coro 为 Task 对象的方法
将协程对象及其各种状态打包注册为 Task 对象,放入 asyncio.all_tasks 的集合(可以看成任务队列,先放进去的在前面,但是 main 在最后面)中,并返回。
asyncio.create_task()
是 python3.7 以上的写法,以前只能用asyncio.ensure_future()
和loop.create_task()
# 替代上文的main
async def main():
task1 = asyncio.create_task(say(1, "hello"))
task2 = asyncio.create_task(say(2, "world"))
await task1
await task2
它将会等待 1 秒,打印 "hello" ;等待 1 秒,再打印 "world"
至此,并发运行了两个异步任务,asyncio.all_tasks 的集合被清空了。
await task
必须全部注册为 task 之后再 await,否则 asyncio.all_tasks 里除了 main 始终只有一个 task,可以预见依然是继发执行的
asyncio.gather(*aws)
多个 task 或者 coro 整合为一个 task
有返回值时,可获得顺序同 aws 的返回值列表
asyncio.as_completed(aws)
返回一个 task 迭代器
async def count(m):
await asyncio.sleep(3)
return m
async def main():
tasks = [count(i) for i in range(10)]
for task in asyncio.as_completed(tasks):
result = await task
print(result)
asyncio.run(main())
上下文管理
实现了__aenter__
和__aexit__
。这两个方法都返回一个 awaitable 类型的值。
和常规的 with
表达式一样,可以在一个 async with
表达式中指定多个上下文管理器。
以下是 python asyncio 的其他生态:
import aiohttp # 第三方库 异步网络请求
async def req(url, head):
async with aiohttp.ClientSession() as session:
async with session.get(url=url, headers=head) as response:
page_html = await response.text()
import aiofiles # 第三方库 异步文件操作
async with aiofiles.open(filename, mode="r", encoding=encoding) as f:
async for line in f:
pass
上例中 async for 用来循环异步可迭代对象,其.__aiter__()
返回的异步迭代器需要实现__anext__
方法
loop 对象方法
记得把 loop 对象传进 main 中以便操作 loop 方法
loop.run_until_complete(future)
。运行事件循环,直到 future 运行结束loop.run_forever()
表示事件循环会一直运行,直到遇到 stop。loop.stop()
。停止事件循环loop.is_running()
。如果事件循环依然在运行,则返回 Trueloop.is_closed()
。如果事件循环已经 close,则返回 Trueloop.close()
。关闭事件循环loop.time()
。事件循环的时钟loop.create_future(coro)
,返回 future 对象loop.create_task(coro)
,返回 task 对象loop.run_in_executor(executor, work)
连接线程池实现同步任务异步执行 executor 是concurrent.futures.ThreadPoolExecutor
的实例对象- callback(只接受位置参数,且不是 coro 是普通函数)
loop.call_later(delay, callback, *args, context=None)
延迟回调loop.call_at(when, callback, *args, context=None)
时刻点回调 结合loop.time()
使用loop.call_soon(callback, *args, context=None)
下一个循环立即回调loop.call_soon_threadsafe(callback, *args, context=None)
下一个循环立即回调,必须在另一个线程中使用
可以在新线程中使用 loop.run_forever()
使其永远等待主线程向其发射 task
import asyncio, aiohttp, re
from threading import Thread
header = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.67"
}
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def end_loop():
loop = asyncio.get_event_loop()
loop.stop()
async def get_title(url):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url=url, headers=header) as response:
page_text = await response.text()
if title := re.search(r"(?<=\<title\>).*?(?=\</title\>)", page_text):
print(f"{url}: 解析网页标题为 {title.group()}")
else:
print(f"{url}: 解析不到网页标题")
except Exception as e:
print(f"{url}: {e}")
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
while 1:
new_loop.call_soon(print, "请输入完整url如https://t.com")
ipt = input()
if ipt == "q":
asyncio.run_coroutine_threadsafe(end_loop(), new_loop)
break
else:
asyncio.run_coroutine_threadsafe(get_title(input()), new_loop)
t.join()
上例中主线程一直等待着用户输入。用户输入后立即生成 aw 抛给子线程的事件循环。
task 对象方法
.get_coro()
获取它是由哪个协程对象注册为任务的.cancel()
取消任务.result()
获取返回.add_done_callback()
添加完成时回调
task 规划
asyncio.wait_for(aw, timeout)
超时抛出异常asyncio.wait(tasks, return_when)
阻塞等待所传 tasks 执行完毕 执行完毕可设置为任一完毕 任一异常 全部完毕
import asyncio
async def count(m):
await asyncio.sleep(2)
return m
async def main():
tasks = [count(i) for i in range(10)]
done, pending = await asyncio.wait(tasks)
for i in done:
print(i.result())
asyncio.run(main())
return_when 指定此函数应在何时返回。它必须为以下常数之一:
常数 | 描述 |
---|---|
FIRST_COMPLETED | 任一完毕 |
FIRST_EXCEPTION | 任一异常。当没有引发任何异常时它就相当于 ALL_COMPLETED 。 |
ALL_COMPLETED | 默认。全部完毕 |
与 wait_for()
,wait()
在超时发生时不会取消可等待对象。
asyncio.all_tasks()
返回事件循环所运行的未完成的 Task 对象的集合
for task in asyncio.all_tasks():
print(task.cancel())