asyncio是官方提供的協(xié)程的類庫,從python3.4開始支持該模塊
創(chuàng)新互聯(lián)公司長期為上1000+客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為水城企業(yè)提供專業(yè)的成都做網(wǎng)站、網(wǎng)站建設(shè),水城網(wǎng)站改版等技術(shù)服務(wù)。擁有十余年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。
async awiat是python3.5中引入的關(guān)鍵字,使用async關(guān)鍵字可以將一個函數(shù)定義為協(xié)程函數(shù),使用awiat關(guān)鍵字可以在遇到IO的時候掛起當(dāng)前協(xié)程(也就是任務(wù)),去執(zhí)行其他協(xié)程。
await + 可等待的對象(協(xié)程對象、Future對象、Task對象 - IO等待)
注意:在python3.4中是通過asyncio裝飾器定義協(xié)程,在python3.8中已經(jīng)移除了asyncio裝飾器。
事件循環(huán),可以把他當(dāng)做是一個while循環(huán),這個while循環(huán)在周期性的運行并執(zhí)行一些協(xié)程(任務(wù)),在特定條件下終止循環(huán)。
loop = asyncio.get_event_loop():生成一個事件循環(huán)
loop.run_until_complete(任務(wù)):將任務(wù)放到事件循環(huán)
Tasks用于并發(fā)調(diào)度協(xié)程,通過asyncio.create_task(協(xié)程對象)的方式創(chuàng)建Task對象,這樣可以讓協(xié)程加入事件循環(huán)中等待被調(diào)度執(zhí)行。除了使用 asyncio.create_task() 函數(shù)以外,還可以用低層級的 loop.create_task() 或 ensure_future() 函數(shù)。不建議手動實例化 Task 對象。
本質(zhì)上是將協(xié)程對象封裝成task對象,并將協(xié)程立即加入事件循環(huán),同時追蹤協(xié)程的狀態(tài)。
注意:asyncio.create_task() 函數(shù)在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用 asyncio.ensure_future() 函數(shù)。
下面結(jié)合async awiat、事件循環(huán)和Task看一個示例
示例一:
*注意:python 3.7以后增加了asyncio.run(協(xié)程對象),效果等同于loop = asyncio.get_event_loop(),loop.run_until_complete(協(xié)程對象) *
示例二:
注意:asyncio.wait 源碼內(nèi)部會對列表中的每個協(xié)程執(zhí)行ensure_future從而封裝為Task對象,所以在和wait配合使用時task_list的值為[func(),func()] 也是可以的。
示例三:
asyncio 是 Python 中的異步IO庫,用來編寫并發(fā)協(xié)程,適用于IO阻塞且需要大量并發(fā)的場景,例如爬蟲、文件讀寫。
asyncio 在 Python3.4 被引入,經(jīng)過幾個版本的迭代,特性、語法糖均有了不同程度的改進,這也使得不同版本的 Python 在 asyncio 的用法上各不相同,顯得有些雜亂,以前使用的時候也是本著能用就行的原則,在寫法上走了一些彎路,現(xiàn)在對 Python3.7+ 和 Python3.6 中 asyncio 的用法做一個梳理,以便以后能更好的使用。
協(xié)程,又稱微線程,它不被操作系統(tǒng)內(nèi)核所管理,而完全是由程序控制,協(xié)程切換花銷小,因而有更高的性能。
協(xié)程可以比作子程序,不同的是,執(zhí)行過程中協(xié)程可以掛起當(dāng)前狀態(tài),轉(zhuǎn)而執(zhí)行其他協(xié)程,在適當(dāng)?shù)臅r候返回來接著執(zhí)行,協(xié)程間的切換不需要涉及任何系統(tǒng)調(diào)用或任何阻塞調(diào)用,完全由協(xié)程調(diào)度器進行調(diào)度。
Python 中以 asyncio 為依賴,使用 async/await 語法進行協(xié)程的創(chuàng)建和使用,如下 async 語法創(chuàng)建一個協(xié)程函數(shù):
在協(xié)程中除了普通函數(shù)的功能外最主要的作用就是:使用 await 語法等待另一個協(xié)程結(jié)束,這將掛起當(dāng)前協(xié)程,直到另一個協(xié)程產(chǎn)生結(jié)果再繼續(xù)執(zhí)行:
asyncio.sleep() 是 asyncio 包內(nèi)置的協(xié)程函數(shù),這里模擬耗時的IO操作,上面這個協(xié)程執(zhí)行到這一句會掛起當(dāng)前協(xié)程而去執(zhí)行其他協(xié)程,直到sleep結(jié)束,當(dāng)有多個協(xié)程任務(wù)時,這種切換會讓它們的IO操作并行處理。
注意,執(zhí)行一個協(xié)程函數(shù)并不會真正的運行它,而是會返回一個協(xié)程對象,要使協(xié)程真正的運行,需要將它們加入到事件循環(huán)中運行,官方建議 asyncio 程序應(yīng)當(dāng)有一個主入口協(xié)程,用來管理所有其他的協(xié)程任務(wù):
在 Python3.7+ 中,運行這個 asyncio 程序只需要一句: asyncio.run(main()) ,而在 Python3.6 中,需要手動獲取事件循環(huán)并加入?yún)f(xié)程任務(wù):
事件循環(huán)就是一個循環(huán)隊列,對其中的協(xié)程進行調(diào)度執(zhí)行,當(dāng)把一個協(xié)程加入循環(huán),這個協(xié)程創(chuàng)建的其他協(xié)程都會自動加入到當(dāng)前事件循環(huán)中。
其實協(xié)程對象也不是直接運行,而是被封裝成一個個待執(zhí)行的 Task ,大多數(shù)情況下 asyncio 會幫我們進行封裝,我們也可以提前自行封裝 Task 來獲得對協(xié)程更多的控制權(quán),注意,封裝 Task 需要 當(dāng)前線程有正在運行的事件循環(huán) ,否則將引 RuntimeError,這也就是官方建議使用主入口協(xié)程的原因,如果在主入口協(xié)程之外創(chuàng)建任務(wù)就需要先手動獲取事件循環(huán)然后使用底層方法 loop.create_task() ,而在主入口協(xié)程之內(nèi)是一定有正在運行的循環(huán)的。任務(wù)創(chuàng)建后便有了狀態(tài),可以查看運行情況,查看結(jié)果,取消任務(wù)等:
asyncio.create_task() 是 Python3.7 加入的高層級API,在 Python3.6,需要使用低層級API asyncio.ensure_future() 來創(chuàng)建 Future,F(xiàn)uture 也是一個管理協(xié)程運行狀態(tài)的對象,與 Task 沒有本質(zhì)上的區(qū)別。
通常,一個含有一系列并發(fā)協(xié)程的程序?qū)懛ㄈ缦拢≒ython3.7+):
并發(fā)運行多個協(xié)程任務(wù)的關(guān)鍵就是 asyncio.gather(*tasks) ,它接受多個協(xié)程任務(wù)并將它們加入到事件循環(huán),所有任務(wù)都運行完成后會返回結(jié)果列表,這里我們也沒有手動封裝 Task,因為 gather 函數(shù)會自動封裝。
并發(fā)運行還有另一個方法 asyncio.wait(tasks) ,它們的區(qū)別是:
協(xié)程函數(shù):async def?函數(shù)名。3.5+
協(xié)程對象:執(zhí)行協(xié)程函數(shù)()得到的協(xié)程對象。
3.5之后的寫法:
3.7之后的寫法:更簡便
await后面?跟?可等待的對象。(協(xié)程對象,F(xiàn)uture,Task對象?約等于IO等待)
await實例2:串行執(zhí)行。 一個協(xié)程函數(shù)里面可以支持多個await ,雖然會串行,但是如果有其他協(xié)程函數(shù),任務(wù)列表也在執(zhí)行,依然會切換。只是案例中的main對應(yīng)執(zhí)行的others1和others2串行 。 await會等待對象的值得到之后才繼續(xù)往下走。
C10k是一個在1999年被提出來的技術(shù)挑戰(zhàn),如何在一顆1GHz CPU,2G內(nèi)存,1gbps網(wǎng)絡(luò)環(huán)境下,讓單臺服務(wù)器同時為1萬個客戶端提供FTP服務(wù)
阻塞式I/O(使用最多)、非阻塞式I/O、I/O復(fù)用、信號驅(qū)動式I/O(幾乎不使用)、異步I/O(POSIX的aio_系列函數(shù))
select、poll、epoll都是IO多路復(fù)用的機制。I/O多路復(fù)用就是通過一種機制,一個進程可以監(jiān)聽多個描述符,一旦,某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應(yīng)的讀寫操作。但select、poll、epoll本質(zhì)上都是同步I/O,因為他們都需要在讀寫時間就緒后負責(zé)進行讀寫,也就是說讀寫過程是阻塞的,而異步I/O無需自己負責(zé)進行讀寫,異步I/O的實現(xiàn)會負責(zé)把數(shù)據(jù)從內(nèi)核拷貝到用戶空間
(1)select
select函數(shù)監(jiān)視的文件描述符分3類,分別是writefds、readfds、exceptfds。調(diào)用select函數(shù)會阻塞,直到有描述符就緒(有數(shù)據(jù)可讀、可寫或者有except),或者超時函數(shù)返回。當(dāng)select函數(shù)返回后可以通過遍歷fdset來找到就緒的描述符。
select目前幾乎在所有的平臺上支持,其良好的跨平臺支持也是它的一個優(yōu)點。select的一個缺點在于單個進程能夠監(jiān)視的文件描述符的數(shù)量存在最大限制,在Linux上一般為1024,可以通過修改宏定義甚至重新編譯內(nèi)核的方式提升這一限制,但是這樣也會降低效率。
(2)poll
不同于select使用三個位圖來表示三個fdset的方式,poll使用一個pollfd的指針實現(xiàn)。
pollfd結(jié)構(gòu)包含了要監(jiān)視的event和發(fā)生的event,不再使用select"參數(shù)-值"傳遞的方式。同時pollfd并沒有最大數(shù)量限制(但是數(shù)量過大后性能也會下降)。和select函數(shù)一樣,poll返回后,需要輪詢pollfd來獲取就緒的描述符。
從上面看,select和poll都需要在返回后通過遍歷文件描述符來獲取已經(jīng)就緒的socket。事實上同時連接的大量客戶端在同一時刻可能只有很少的處于就緒的狀態(tài),因此隨著監(jiān)視的描述符數(shù)量的增長,其效率也會線性下降
(3)epoll
epoll是在2.6內(nèi)核中提出的,是之前的select和poll的增強版本。相對于select和poll來說,epoll更加領(lǐng)靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關(guān)系的文件描述符的事件存放到內(nèi)核的一個事件表中,這樣在用戶空間和內(nèi)核空間的copy只需一次。
在前面的例子里學(xué)習(xí)了并發(fā)地執(zhí)行多個協(xié)程來下載圖片,也許其中一個協(xié)程永遠下載不了,一直阻塞,這時怎么辦呢?
碰到這種需求時不要驚慌,可以使用wait()里的timeout參數(shù)來設(shè)置等待時間,也就是從這個函數(shù)開始運行算起,如果時間到達協(xié)程沒有執(zhí)行完成,就可以不再等它們了,直接從wait()函數(shù)里返回,返回之后就可以判斷那些沒有執(zhí)行成功的,可以把這些協(xié)程取消掉。例子如下:
[python]?view plain?copy
import?asyncio
async?def?phase(i):
print('in?phase?{}'.format(i))
try:
await?asyncio.sleep(0.1?*?i)
except?asyncio.CancelledError:
print('phase?{}?canceled'.format(i))
raise
else:
print('done?with?phase?{}'.format(i))
return?'phase?{}?result'.format(i)
async?def?main(num_phases):
print('starting?main')
phases?=?[
phase(i)
for?i?in?range(num_phases)
]
print('waiting?0.1?for?phases?to?complete')
completed,?pending?=?await?asyncio.wait(phases,?timeout=0.1)
print('{}?completed?and?{}?pending'.format(
len(completed),?len(pending),
))
#?Cancel?remaining?tasks?so?they?do?not?generate?errors
#?as?we?exit?without?finishing?them.
if?pending:
print('canceling?tasks')
for?t?in?pending:
t.cancel()
print('exiting?main')
event_loop?=?asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
結(jié)果輸出如下:
starting main
waiting 0.1 for phases to complete
in phase 0
in phase 2
in phase 1
done with phase 0
1 completed and 2 pending
canceling tasks
exiting main
phase 1 canceled
phase 2 canceled
改進之前
之前,我的查詢步驟很簡單,就是:
前端提交查詢請求 -- 建立數(shù)據(jù)庫連接 -- 新建游標(biāo) -- 執(zhí)行命令 -- 接受結(jié)果 -- 關(guān)閉游標(biāo)、連接
這幾大步驟的順序執(zhí)行。
這里面當(dāng)然問題很大:
建立數(shù)據(jù)庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。
在“執(zhí)行命令”和“接受結(jié)果”兩個步驟中,線程在阻塞在數(shù)據(jù)庫內(nèi)部的運行過程中,數(shù)據(jù)庫連接和游標(biāo)都處于閑置狀態(tài)。
這樣一來,每一次查詢都要順序的新建數(shù)據(jù)庫連接,都要阻塞在數(shù)據(jù)庫返回結(jié)果的過程中。當(dāng)前端提交大量查詢請求時,查詢效率肯定是很低的。
第一次改進
之前的模塊里,問題最大的就是第一步——建立數(shù)據(jù)庫連接套接字了。如果能夠一次性建立連接,之后查詢能夠反復(fù)服用這個連接就好了。
所以,首先應(yīng)該把數(shù)據(jù)庫查詢模塊作為一個單獨的守護進程去執(zhí)行,而前端app作為主進程響應(yīng)用戶的點擊操作。那么兩條進程怎么傳遞消息呢?翻了幾天Python文檔,終于構(gòu)思出來:用隊列queue作為生產(chǎn)者(web前端)向消費者(數(shù)據(jù)庫后端)傳遞任務(wù)的渠道。生產(chǎn)者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務(wù)完成后,回傳結(jié)果的渠道。確保,任務(wù)的接收方與發(fā)送方保持一致。
作為第二個問題的解決方法,可以使用線程池來并發(fā)獲取任務(wù)隊列中的task,然后執(zhí)行命令并回傳結(jié)果。
第二次改進
第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應(yīng)速度有很明顯的加快。
但是對于第二個問題,使用線程池還是有些欠妥當(dāng)。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調(diào)度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至?xí)霈F(xiàn)“抖動”問題(也就是剛剛喚醒一個線程,就進入掛起狀態(tài),剛剛換到棧幀或內(nèi)存的上下文,又被換回內(nèi)存或者磁盤),效率大大降低。也就是說,線程池的并發(fā)量很有限。
試過了多進程、多線程,只能在單個線程里做文章了。
Python中的asyncio庫
Python里有大量的協(xié)程庫可以實現(xiàn)單線程內(nèi)的并發(fā)操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現(xiàn)協(xié)程并發(fā)。asyncio庫大大降低了Python中協(xié)程的實現(xiàn)難度,就像定義普通函數(shù)那樣就可以了,只是要在def前面多加一個async關(guān)鍵詞。async def函數(shù)中,需要阻塞在其他async def函數(shù)的位置前面可以加上await關(guān)鍵詞。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函數(shù)的執(zhí)行稍微麻煩點。需要首先獲取一個loop對象,然后由這個對象代為執(zhí)行async def函數(shù)。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在執(zhí)行execute(task)函數(shù)時,如果遇到await關(guān)鍵字,就會暫時掛起當(dāng)前協(xié)程,轉(zhuǎn)而去執(zhí)行其他阻塞在await關(guān)鍵詞的協(xié)程,從而實現(xiàn)協(xié)程并發(fā)。
不過需要注意的是,run_until_complete()函數(shù)本身是一個阻塞函數(shù)。也就是說,當(dāng)前線程會等候一個run_until_complete()函數(shù)執(zhí)行完畢之后,才會繼續(xù)執(zhí)行下一部函數(shù)。所以下面這段代碼并不能并發(fā)執(zhí)行。
for task in task_list:
loop.run_until_complete(task)
對與這個問題,asyncio庫也有相應(yīng)的解決方案:gather函數(shù)。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
當(dāng)然了,async def函數(shù)的執(zhí)行并不只有這兩種解決方案,還有call_soon與run_forever的配合執(zhí)行等等,更多內(nèi)容還請參考官方文檔。
Python下的I/O多路復(fù)用
協(xié)程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復(fù)用則完全不存在這個問題。
目前,Linux上比較火的I/O多路復(fù)用API要算epoll了。Tornado,就是通過調(diào)用C語言封裝的epoll庫,成功解決了C10K問題(當(dāng)然還有Pypy的功勞)。
在Linux里查文檔,可以看到epoll只有三類函數(shù),調(diào)用起來比較方便易懂。
創(chuàng)建epoll對象,并返回其對應(yīng)的文件描述符(file descriptor)。
int epoll_create(int size);
int epoll_create1(int flags);
控制監(jiān)聽事件。第一個參數(shù)epfd就對應(yīng)于前面命令創(chuàng)建的epoll對象的文件描述符;第二個參數(shù)表示該命令要執(zhí)行的動作:監(jiān)聽事件的新增、修改或者刪除;第三個參數(shù),是要監(jiān)聽的文件對應(yīng)的描述符;第四個,代表要監(jiān)聽的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候。這是一個阻塞函數(shù),調(diào)用者會等候內(nèi)核通知所注冊的事件被觸發(fā)。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select庫里:
select.epoll()對應(yīng)于第一類創(chuàng)建函數(shù);
epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數(shù)epoll_ctl的封裝;
epoll.poll()則是對等候函數(shù)epoll_wait的封裝。
Python里epoll相關(guān)API的最大問題應(yīng)該是在epoll.poll()。相比于其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是后者的第二個參數(shù)struct epoll_event *events。沒法實現(xiàn)精確控制。因此只能使用替代方案:select.select()函數(shù)。
根據(jù)Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統(tǒng)中select函數(shù)的直接調(diào)用,與C語言API的傳參很接近。前三個參數(shù)都是列表,其中的元素都是要注冊到內(nèi)核的文件描述符。如果想用自定義類,就要確保實現(xiàn)了fileno()方法。
其分別對應(yīng)于:
rlist: 等候直到可讀
wlist: 等候直到可寫
xlist: 等候直到異常。這個異常的定義,要查看系統(tǒng)文檔。
select.select(),類似于epoll.poll(),先注冊文件和事件,然后保持等候內(nèi)核通知,是阻塞函數(shù)。
實際應(yīng)用
Psycopg2庫支持對異步和協(xié)程,但和一般情況下的用法略有區(qū)別。普通數(shù)據(jù)庫連接支持不同線程中的不同游標(biāo)并發(fā)查詢;而異步連接則不支持不同游標(biāo)的同時查詢。所以異步連接的不同游標(biāo)之間必須使用I/O復(fù)用方法來協(xié)調(diào)調(diào)度。
所以,我的大致實現(xiàn)思路是這樣的:首先并發(fā)執(zhí)行大量協(xié)程,從任務(wù)隊列中提取任務(wù),再向連接池請求連接,創(chuàng)建游標(biāo),然后執(zhí)行命令,并返回結(jié)果。在獲取游標(biāo)和接受查詢結(jié)果之前,均要阻塞等候內(nèi)核通知連接可用。
其中,連接池返回連接時,會根據(jù)引用連接的協(xié)程數(shù)量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。
我的代碼位于:bottle-blog/dbservice.py
存在問題
當(dāng)然了,這個流程目前還一些問題。
首先就是每次輪詢拿到任務(wù)之后,都會走這么一個流程。
獲取連接 -- 新建游標(biāo) -- 執(zhí)行任務(wù) -- 關(guān)閉游標(biāo) -- 取消連接引用
本來,最好的情況應(yīng)該是:在輪詢之前,就建好游標(biāo);在輪詢時,直接等候內(nèi)核通知,執(zhí)行相應(yīng)任務(wù)。這樣可以減少輪詢時的任務(wù)量。但是如果協(xié)程提前對應(yīng)好連接,那就不能保證在獲取任務(wù)時,保持各連接負載均衡了。
所以這一塊,還有工作要做。
還有就是epoll沒能用上,有些遺憾。
以后打算寫點C語言的內(nèi)容,或者用Python/C API,或者用Ctypes包裝共享庫,來實現(xiàn)epoll的調(diào)用。
最后,請允許我吐槽一下Python的epoll相關(guān)文檔:簡直太弱了?。。”仨毧丛创a才能弄清楚功能。
本文名稱:python協(xié)程阻塞函數(shù),python 非阻塞線程
文章轉(zhuǎn)載:http://jinyejixie.com/article16/hsehgg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站設(shè)計、定制開發(fā)、網(wǎng)站排名、、外貿(mào)網(wǎng)站建設(shè)、App開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)