App下載

Python協(xié)程asyncio模塊怎么使用

猿友 2021-08-04 17:31:18 瀏覽數(shù) (2970)
反饋

python協(xié)程是實現(xiàn)多任務的另一種方式,它主要通過asyncio模塊來進行實現(xiàn)。今天小編就以asyncio模塊的使用來介紹一下python寫成的知識點。希望能給小伙伴們一些協(xié)程學習上的幫助。

Python協(xié)程及asyncio基礎知識

協(xié)程(coroutine)也叫微線程,是實現(xiàn)多任務的另一種方式,是比線程更小的執(zhí)行單元,一般運行在單進程和單線程上。因為它自帶CPU的上下文,它可以通過簡單的事件循環(huán)切換任務,比進程和線程的切換效率更高,這是因為進程和線程的切換由操作系統(tǒng)進行。

Python實現(xiàn)協(xié)程的主要借助于兩個庫:asyncio和gevent。由于asyncio已經(jīng)成為python的標準庫了無需pip安裝即可使用,這意味著asyncio作為Python原生的協(xié)程實現(xiàn)方式會更加流行。本文僅會介紹asyncio模塊。如果大家對gevent也有需求,請留言,我會單獨寫篇文章介紹這個庫的使用。

asyncio 是從Python3.4引入的標準庫,直接內置了對協(xié)程異步IO的支持。asyncio 的編程模型本質是一個消息循環(huán),我們一般先定義一個協(xié)程函數(shù)(或任務), 從 asyncio 模塊中獲取事件循環(huán)loop,然后把需要執(zhí)行的協(xié)程任務(或任務列表)扔到 loop中執(zhí)行,就實現(xiàn)了異步IO。

定義協(xié)程函數(shù)及執(zhí)行方法的演變

在最早的Python 3.4中,協(xié)程函數(shù)是通過@asyncio.coroutine 和 yeild from 實現(xiàn)的, 如下所示。

 import asyncio
 
 @asyncio.coroutine
 def func1(i):
     print("協(xié)程函數(shù){}馬上開始執(zhí)行。".format(i))
     yield from asyncio.sleep(2)
     print("協(xié)程函數(shù){}執(zhí)行完畢!".format(i))
 
 if __name__ == '__main__':
     # 獲取事件循環(huán)
     loop = asyncio.get_event_loop()
 
     # 執(zhí)行協(xié)程任務
     loop.run_until_complete(func1(1))
 
     # 關閉事件循環(huán)
     loop.close()

這里我們定義了一個func1的協(xié)程函數(shù),我們可以使用asyncio.iscoroutinefunction來驗證。定義好協(xié)程函數(shù)后,我們首先獲取事件循環(huán)loop,使用它的run_until_complete方法執(zhí)行協(xié)程任務,然后關閉loop。

 print(asyncio.iscoroutinefunction(func1(1))) # True

Python 3.5以后引入了async/await 語法定義協(xié)程函數(shù),代碼如下所示。每個協(xié)程函數(shù)都以async聲明,以區(qū)別于普通函數(shù),對于耗時的代碼或函數(shù)我們使用await聲明,表示碰到等待時掛起,以切換到其它任務。

 import asyncio
 
 # 這是一個協(xié)程函數(shù)
 async def func1(i):
     print("協(xié)程函數(shù){}馬上開始執(zhí)行。".format(i))
     await asyncio.sleep(2)
     print("協(xié)程函數(shù){}執(zhí)行完畢!".format(i))
 
 if __name__ == '__main__':
     # 獲取事件循環(huán)
     loop = asyncio.get_event_loop()
 
     # 執(zhí)行協(xié)程任務
     loop.run_until_complete(func1(1))
 
     # 關閉事件循環(huán)
     loop.close()

Python 3.7之前執(zhí)行協(xié)程任務都是分三步進行的,代碼有點冗余。Python 3.7提供了一個更簡便的asyncio.run方法,上面代碼可以簡化為:

 import asyncio
 
 async def func1(i):
     print(f"協(xié)程函數(shù){i}馬上開始執(zhí)行。")
     await asyncio.sleep(2)
     print(f"協(xié)程函數(shù){i}執(zhí)行完畢!")
 
 if __name__ == '__main__':
     asyncio.run(func1(1))

注:Python自3.6版本起可以使用f-string來對字符串進行格式化了,相當于format函數(shù)的簡化版。

創(chuàng)建協(xié)程任務的演變

前面的演示案例中,我們只執(zhí)行了單個協(xié)程任務(函數(shù))。實際應用中,我們先由協(xié)程函數(shù)創(chuàng)建協(xié)程任務,然后把它們加入?yún)f(xié)程任務列表,最后一起交由事件循環(huán)執(zhí)行。

根據(jù)協(xié)程函數(shù)創(chuàng)建協(xié)程任務有多種方法,其中最新的是Python 3.7版本提供的asyncio.create_task方法,如下所示:

 # 方法1:使用ensure_future方法。future代表一個對象,未執(zhí)行的任務。
 task1 = asyncio.ensure_future(func1(1))
 task2 = asyncio.ensure_future(func1(2))
 
 # 方法2:使用loop.create_task方法
 task1 = loop.create_task(func1(1))
 task2 = loop.create_task(func1(2))
 
 # 方法3:使用Python 3.7提供的asyncio.create_task方法
 task1 = asyncio.create_task(func1(1))
 task2 = asyncio.create_task(func1(2))

創(chuàng)建多個協(xié)程任務列表后,我們還要使用asyncio.wait方法收集協(xié)程任務,并交由事件循環(huán)處理執(zhí)行。

 import asyncio
 
 async def func1(i):
     print(f"協(xié)程函數(shù){i}馬上開始執(zhí)行。")
     await asyncio.sleep(2)
     print(f"協(xié)程函數(shù){i}執(zhí)行完畢!")
 
 
 async def main():
     tasks = []
     # 創(chuàng)建包含4個協(xié)程任務的列表
     for i in range(1, 5):
         tasks.append(asyncio.create_task(func1(i)))
         
     await asyncio.wait(tasks)
 
 if __name__ == '__main__':
     asyncio.run(main())

執(zhí)行效果如下所示,你會發(fā)現(xiàn)4個協(xié)程任務并不是按順序執(zhí)行的。

運行結果

對于收集多個協(xié)程任務,Python還提供了新的asyncio.gather方法,它的作用asyncio.wait方法類似,但更強大。如果列表中傳入的不是create_task方法創(chuàng)建的協(xié)程任務,它會自動將函數(shù)封裝成協(xié)程任務,如下所示:

 import asyncio
 
 async def func1(i):
     print(f"協(xié)程函數(shù){i}馬上開始執(zhí)行。")
     await asyncio.sleep(2)
     print(f"協(xié)程函數(shù){i}執(zhí)行完畢!")
 
 async def main():
     tasks = []
     for i in range(1, 5):
         # 這里未由協(xié)程函數(shù)創(chuàng)建協(xié)程任務
         tasks.append(func1(i))
         
     # 注意這里*號。gather自動將函數(shù)列表封裝成了協(xié)程任務。
     await asyncio.gather(*tasks)
 
 if __name__ == '__main__':
     asyncio.run(main())

獲取協(xié)程任務執(zhí)行結果

是的,gather方法有將函數(shù)封裝成協(xié)程任務的能力,但這還并不是兩者最主要的區(qū)別作用。兩者更大的區(qū)別在協(xié)程任務執(zhí)行完畢后對于返回結果的處理上。通常獲取任務執(zhí)行結果通常對于一個程序至關重要,因此我們有必要花更多時間詳細了解這兩個方法的使用。

asyncio.wait 會返回兩個值:done 和 pending,done 為已完成的協(xié)程任務列表,pending 為超時未完成的協(xié)程任務類別,需通過task.result()方法可以獲取每個協(xié)程任務返回的結果;而asyncio.gather 返回的是所有已完成協(xié)程任務的 result,不需要再進行調用或其他操作,就可以得到全部結果。

我們來看兩個示例?,F(xiàn)在修改我們的協(xié)程函數(shù),通過return給它增加一個返回值。

通過asyncio.wait獲取協(xié)程任務執(zhí)行結果

 import asyncio
 
 async def func1(i):
     print(f"協(xié)程函數(shù){i}馬上開始執(zhí)行。")
     await asyncio.sleep(2)
     return i
 
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(asyncio.create_task(func1(i)))
         
     # 獲取任務執(zhí)行結果。
     done, pending = await asyncio.wait(tasks)
     for task in done:
         print(f"執(zhí)行結果: {task.result()}")
 
 if __name__ == '__main__':
     asyncio.run(main())

執(zhí)行結果如下所示。你可以看到協(xié)程任務執(zhí)行結果并不是按任務添加的順序返回的。

運行結果

通過asyncio.gather獲取協(xié)程任務執(zhí)行結果

繼續(xù)修改我們的代碼:

 #-*- coding:utf-8 -*-
 import asyncio
 
 async def func1(i):
     print(f"協(xié)程函數(shù){i}馬上開始執(zhí)行。")
     await asyncio.sleep(2)
     return i
 
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(func1(i))
 
     results = await asyncio.gather(*tasks)
     for result in results:
         print(f"執(zhí)行結果: {result}")
 
 if __name__ == '__main__':
     asyncio.run(main())

執(zhí)行結果如下所示。協(xié)程任務執(zhí)行結果與任務添加順序完全一致。

運行結果

現(xiàn)在你知道gather和wait方法的真正區(qū)別了嗎?

  • gather具有把普通協(xié)程函數(shù)包裝成協(xié)程任務的能力,wait沒有。wait只能接收包裝后的協(xié)程任務列表做參數(shù)。
  • 兩者返回值不一樣,wait返回的是已完成和未完成任務的列表,而gather直接返回協(xié)程任務執(zhí)行結果。
  • gather返回的任務執(zhí)行結果是有序的,wait方法獲取的結果是無序的。

asyncio高級使用方法

給任務添加回調函數(shù)

我們還可以給每個協(xié)程任務通過add_done_callback的方法給單個協(xié)程任務添加回調函數(shù),如下所示:

 #-*- coding:utf-8 -*-
 import asyncio
 
 async def func1(i):
     print(f"協(xié)程函數(shù){i}馬上開始執(zhí)行。")
     await asyncio.sleep(2)
     return i
 
 # 回調函數(shù)
 def callback(future):
     print(f"執(zhí)行結果:{future.result()}")
 
 async def main():
     tasks = []
     for i in range(1, 5):
         task = asyncio.create_task(func1(i))
         
         # 注意這里,增加回調函數(shù)
         task.add_done_callback(callback)
         tasks.append(task)
 
     await asyncio.wait(tasks)
 
 if __name__ == '__main__':
     asyncio.run(main())

設置任務超時

很多協(xié)程任務都是很耗時的,當你使用wait方法收集協(xié)程任務時,可通過timeout選項設置任務切換前單個任務最大等待時間長度,如下所示:

  # 獲取任務執(zhí)行結果,如下所示:
  done,pending = await asyncio.wait(tasks, timeout=10)

自省

  • asyncio.current_task: 返回當前運行的Task實例,如果沒有正在運行的任務則返回 None。如果 loop 為 None 則會使用 get_running_loop()獲取當前事件循環(huán)。
  • asyncio.all_tasks: 返回事件循環(huán)所運行的未完成的Task對象的集合。

以上就是Python協(xié)程asyncio模塊怎么使用的詳細內容,更多關于Python協(xié)程和asyncio模塊的資料請關注W3Cschool其它相關文章!



0 人點贊