asyncio 是 python 3.5+ 用來寫 coroutine 的 library,並提供 async, await syntax sugar 語法
coroutine
subroutine 是從某一個進入點到另一個離開的點。coroutine 則是有 entered, exited 及多個 resumed points。可用 async/await 語法實作
如要判斷函數是否為 coroutine
asyncio.iscoroutinefunction(func)
如要判斷物件是否為 coroutine
asyncio.iscoroutine(obj)
sample
python 3.5+ 可使用 asyncio library 以及 await
python 3.7+ 可使用 asyncio.run()
import asyncio
import time
now = lambda: time.time()
async def dosomething(num):
print('第 {} 任務,第一步'.format(num))
await asyncio.sleep(2)
print('第 {} 任務,第二步'.format(num))
if __name__ == "__main__":
start = now()
tasks = [dosomething(i) for i in range(5)]
asyncio.run(asyncio.wait(tasks))
print('TIME: ', now() - start)
執行結果
第 0 任務,第一步
第 1 任務,第一步
第 4 任務,第一步
第 3 任務,第一步
第 2 任務,第一步
第 0 任務,第二步
第 1 任務,第二步
第 4 任務,第二步
第 3 任務,第二步
第 2 任務,第二步
TIME: 2.0054779052734375
async await
async 用來宣告 funcition 有非同步執行的功能
- async def 的函數裡面,不能跟 yield, yield from 一起使用
await 是標記 coroutine 可 suspend/resume 的 entry points
- await 後面必須要是一個 coroutine 物件,或是 awaitable 類型的物件
- await 的目的是將控制權回傳給 event loop,實現的方法是用 yield
awaitable 特性的物件有三種
coroutines
一個 async def 函數就是 coroutine
tasks
tasks 是可被調度 coroutines 的物件,可利用
asyncio.create_task()
包裝 coroutinesfutures
是一個 asynchronous operation,處理完成回傳的結果
event loop
pythojn 3.5 用 asyncio.geteventloop 產生 event_loop,然後將 coroutine 放到 run_until_complete()
裡面,直到所有 coroutine 完成。
import asyncio
async def hello_world(x):
print('hello_world x' + str(x))
await asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world(3))
loop.close()
- loop.rununtilcomplete(coroutine)
- 執行 coroutine,完成時,就可以關閉 event loop
- loop.run_forever()
- event loop 永遠不會關閉,直到呼叫
loop.stop()
為止
- event loop 永遠不會關閉,直到呼叫
python 3.7+ 有更簡潔寫法,將 loop 封裝,然後用 asyncio.run()
執行就可以了
import asyncio
async def hello_world(x):
print('hello_world x' + str(x))
await asyncio.sleep(x)
asyncio.run(hello_world(3))
task
產生 task 的方法有兩種
asyncio.ensure_future()
在所有 python 都可以使用asyncio.create_task()
在 python 3.7+ 可使用
import asyncio
import time
async def dosomething(num):
print('start{}'.format(num))
await asyncio.sleep(num)
print('sleep{}'.format(num))
async def main():
task1 = asyncio.create_task(dosomething(1))
task2 = asyncio.create_task(dosomething(2))
task3 = asyncio.create_task(dosomething(3))
await task1
await task2
await task3
if __name__ == '__main__':
time_start = time.time()
asyncio.run(main())
print(time.time() - time_start)
執行結果
start1
start2
start3
sleep1
sleep2
sleep3
3.0059001445770264
同時執行多個 tasks
利用 asyncio.gather()
可同時放入多個 coroutines 或 awaitable 物件進入 event loop
asyncio.gather( *aws, loop=None, return_exceptions=False)
- *aws :可傳入多個 awaitable objects
- Loop:此參數將會在 Python 3.10 移除
- return_exceptions:default 是 False,當發生 exception 時會立即中斷 task,如果設定為 True 則發生錯誤的訊息會與其他成功訊息一起回傳 (最終的 results 結果裡面包含了 ValueError() 結果)
import asyncio
import time
now = lambda: time.time()
async def dosomething(num):
print('第 {} 任務,第一步'.format(num))
await asyncio.sleep(2)
print('第 {} 任務,第二步'.format(num))
return '第 {} 任務完成'.format(num)
async def raise_error(num):
raise ValueError
print('這邊不會執行到')
async def main():
tasks = [dosomething(i) for i in range(5)]
tasks1 = [raise_error(i) for i in range(5)]
results = await asyncio.gather(*tasks, *tasks1, return_exceptions=True)
print(results)
if __name__ == "__main__":
start = now()
asyncio.run(main())
print('TIME: ', now() - start)
執行結果
第 0 任務,第一步
第 1 任務,第一步
第 2 任務,第一步
第 3 任務,第一步
第 4 任務,第一步
第 0 任務,第二步
第 1 任務,第二步
第 2 任務,第二步
第 3 任務,第二步
第 4 任務,第二步
['第 0 任務完成', '第 1 任務完成', '第 2 任務完成', '第 3 任務完成', '第 4 任務完成', ValueError(), ValueError(), ValueError(), ValueError(), ValueError()]
TIME: 2.0041749477386475
chaining coroutines
可將任務分成三個 coroutines,當第一個完成,會將結果傳入第二個,然後是第三個。最終會透過 asyncio.gather
儲存 coroutine 的結果
import asyncio
import time
import random
now = lambda: time.time()
# 任務一
async def step_one(num, n):
time_sleep = random.randint(0, 1)
print('Task {},step one, sleep {}'.format(n, time_sleep))
await asyncio.sleep(time_sleep)
print('Task {},step one, wakeup'.format(n))
num += 1
return num
# 任務二
async def step_two(num, n):
time_sleep = random.randint(0, n)
print('Task {},step two, sleep {}'.format(n, time_sleep))
await asyncio.sleep(time_sleep)
print('Task {},step two, wakeup'.format(n))
num += 2
return num
# 任務三
async def step_three(num, n):
time_sleep = random.randint(0, n)
print('Task {},step three, sleep {}'.format(n, time_sleep))
await asyncio.sleep(time_sleep)
print('Task {},step three, wakeup'.format(n))
num += 3
return [n, num]
# 任務調度
async def asyncio_chain(n):
s1 = await step_one(n, n)
s2 = await step_two(s1, n)
s3 = await step_three(s2, n)
return s3
# 收集任務結果
async def main():
tasks = [asyncio_chain(n) for n in range(3)]
result = await asyncio.gather(*tasks)
print(result)
if __name__ == "__main__":
start = now()
asyncio.run(main())
print('TIME: ', now() - start)
執行結果
Task 0,step one, sleep 0
Task 1,step one, sleep 1
Task 2,step one, sleep 1
Task 0,step one, wakeup
Task 0,step two, sleep 0
Task 0,step two, wakeup
Task 0,step three, sleep 0
Task 0,step three, wakeup
Task 1,step one, wakeup
Task 1,step two, sleep 1
Task 2,step one, wakeup
Task 2,step two, sleep 2
Task 1,step two, wakeup
Task 1,step three, sleep 1
Task 2,step two, wakeup
Task 2,step three, sleep 0
Task 2,step three, wakeup
Task 1,step three, wakeup
[[0, 6], [1, 7], [2, 8]]
TIME: 3.0117080211639404
coroutine with queue
將 tasks 分為 producer 與 consumer 兩種角色,但 producer 不直接跟 cosumer 連接,而是透過 queue 傳遞資料。
producers 可任意將多個 task 填入 queue,然後由多個 cosumers 由 queue 取出並執行。
python 標準函式庫有四種 queues
- queue.Queue
- asyncio.Queue
- multiprocessing.Queue
- collections.deque
asyncio.Queue 使用時要注意,並不是 thread-safe
- q = asyncio.Queue(maxsize=num_consumers):限制 queue 的大小
- q.put()
- q.get()
- q.task_done():在完成一項工作之後,向queue發送一個信號
- q.join():等到queue為空,再執行後續操作
- q.qsize():回傳 queue size
- q.empty(): 如果 queue 為空,回傳 True,反之 False
- q.full(): 如果 queue is full,回傳 True,反之 False
import asyncio
# consumer
async def consumer(q, n):
print('consumer {}: starting...'.format(n))
while True:
print('consumer {}: prepare to get'.format(n))
item = await q.get()
print('consumer {}: get {}'.format(n, item))
await asyncio.sleep(0.15)
# 完成一項工作,發送訊號給 queue,表示可再容納一個新的 task
q.task_done()
print('consumer {}: ending'.format(n))
# producer A
async def producer_a(q, num_products):
print('producer A: starting producer A...')
for i in range(num_products):
await q.put('A ' + str(i))
print('producer A: create A {}'.format(i))
await asyncio.sleep(0.01)
# producer B
async def producer_b(q, num_products):
print('producer B: starting producer B...')
for i in range(num_products):
await q.put('B ' + str(i))
print('producer B: create B {}'.format(i))
await asyncio.sleep(0.02)
# 任務調度
async def main(num_consumers, num_products):
# 以 consumers 的數量為 queue size 上限,當 queue 裡面的 items 數量到達上限,就無法再放入 item
q = asyncio.Queue(maxsize=num_consumers)
# 產生兩個 producer tasks,每一個 producer 都會產生 num_products 個 products,放入 q
prod_a = [asyncio.create_task(producer_a(q, num_products))]
prod_b = [asyncio.create_task(producer_b(q, num_products))]
# 產生
consumers = [
asyncio.create_task(consumer(q, i)) for i in range(num_consumers)
]
# 測試 coroutine 的 cancel
# await asyncio.sleep(0.2)
# for c in consumers:
# c.cancel()
await asyncio.gather(*prod_a, *prod_b)
print('producers All: ending')
# 表示 queue 裡面已經沒有 product tasks
await q.join()
print('consumers All: ending')
# 以 cancel 確認將所有 consumers 取消,否則 consumer 會一直等待 queue
for c in consumers:
c.cancel()
# main(消費者數量, products 數量)
asyncio.run(main(3, 4))
執行結果
producer A: starting producer A...
producer A: create A 0
producer B: starting producer B...
producer B: create B 0
consumer 0: starting...
consumer 0: prepare to get
consumer 0: get A 0
consumer 1: starting...
consumer 1: prepare to get
consumer 1: get B 0
consumer 2: starting...
consumer 2: prepare to get
producer A: create A 1
consumer 2: get A 1
producer B: create B 1
producer A: create A 2
producer A: create A 3
consumer 0: prepare to get
consumer 0: get B 1
consumer 1: prepare to get
consumer 1: get A 2
producer B: create B 2
consumer 2: prepare to get
consumer 2: get A 3
producer B: create B 3
producers All: ending
consumer 0: prepare to get
consumer 0: get B 2
consumer 1: prepare to get
consumer 1: get B 3
consumer 2: prepare to get
consumer 0: prepare to get
consumer 1: prepare to get
consumers All: ending
futures
future 是一個特殊的 awaitable 物件,代表某個 asynchronous operation 的 eventual result
當有一個 Future 物件在 awaited 狀態,表示 coroutine 會一直 wait,直到 Future is resolved
Future 物件用在 async/wait 的 callback-based code
通常在 application level 不需要自行產生 Future object
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
這是自行產生 future 物件的 sample
def mark_done(future, result):
print('setting future result to {!r}'.format(result))
future.set_result(result)
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
print('scheduling mark_done')
# 在 event loop 呼叫 mark_done,mark_done 完成時,
# 會透過 future 物件的 set_result 將結果傳回來
loop.call_soon(mark_done, future, 'the result')
print('suspending the coroutine')
result = await future
print('awaited result: {!r}'.format(result))
print('future result: {!r}'.format(future.result()))
return result
if __name__ == '__main__':
print('entering the event loop')
result = asyncio.run(main())
print('returned result: {!r}'.format(result))