?
創(chuàng)新互聯(lián)專業(yè)為企業(yè)提供房縣網(wǎng)站建設(shè)、房縣做網(wǎng)站、房縣網(wǎng)站設(shè)計(jì)、房縣網(wǎng)站制作等企業(yè)網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)與制作、房縣企業(yè)網(wǎng)站模板建站服務(wù),十多年房縣做網(wǎng)站經(jīng)驗(yàn),不只是建網(wǎng)站,更提供有價(jià)值的思路和整體網(wǎng)絡(luò)服務(wù)。
?
?
線程同步:
線程間協(xié)同,通過(guò)某種技術(shù),讓一個(gè)線程訪問(wèn)某些數(shù)據(jù)時(shí),其它線程不能訪問(wèn)這些數(shù)據(jù),直到該線程完成對(duì)數(shù)據(jù)的操作;
?
結(jié)果要能預(yù)知;
?
critical section,臨界區(qū);
mutex,互斥量,threading.Lock類;?? #即鎖,你用我不用,我用你不用
semaphore,信號(hào)量;
event,事件,threading.Event類;
barrier,屏障、柵欄,threading.Barrier;
?
幫助:
Python 3.5.3 documentation-->Library Reference-->Concurrent Execution-->threading
?
?
?
目錄
threading.Event類:...1
theading.Lock類:...4
阻塞鎖:...5
非阻塞鎖:...10
?
?
?
event,事件,是線程間通信機(jī)制中最簡(jiǎn)單的實(shí)現(xiàn),使用一個(gè)內(nèi)部的flag標(biāo)記,通過(guò)flag的True或False的變化來(lái)進(jìn)行操作;
?
e.set(),標(biāo)記設(shè)置為T(mén)rue;
e.clear(),標(biāo)記設(shè)置為False;
e.is_set(),標(biāo)記是否為T(mén)rue;
e.wait(timeout=None),Block until the internal flag is true,設(shè)置等待標(biāo)記為T(mén)rue的時(shí)長(zhǎng),None為無(wú)限等待,等到返回True,未等到超時(shí)了返回False;
?
e.wait()的使用:
wait優(yōu)于sleep,wait會(huì)主動(dòng)讓出時(shí)間片,其它線程可以被調(diào)度,而sleep會(huì)占用時(shí)間片不讓出;
?
例:
def do(interval, e: threading.Event):
??? while not e.wait(interval):
??????? logging.info('do sth')
?
e = threading.Event()
threading.Thread(target=do, args=(3,e)).start()
e.wait(5)?? #可用time.sleep(5),e.wait(5)優(yōu)于time.sleep(5)
e.set()
print('main exit')
輸出:
2018-07-30-14:58:08?????? Thread info: 10268 Thread-1 do sth
main exit
?
例:
老板雇傭了一個(gè)工人,讓他生產(chǎn)杯子,老板一直等著工人,直到生產(chǎn)了10個(gè)杯子;
import threading
import time
import logging
FORMAT = '%(asctime)-15s\tThread info: %(thread)d %(threadName)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt='%Y-%m-%d-%H:%M:%S')
?
cups = []?? #當(dāng)前,如果多個(gè)工人做則有問(wèn)題
event = threading.Event()?? #實(shí)例,主線程、boss、worker都要看event,注意其作用域,使用同一個(gè)Event對(duì)象的flag
?
def boss(e: threading.Event):
??? logging.info("I'm boss,waiting for U.")
??? e.wait()
??? logging.info('good job.')
?
def worker(n, e: threading.Event):
??? while True:
??????? time.sleep(0.5)
??????? cups.append(1)
??????? logging.info('make 1')
??????? if len(cups) >= n:
??????????? # logging.info('I finished my job. {}'.format(len(cups)))?? #此處打印也可放到下面,老板在主線程中等待,if event.wait():中
??????????? e.set()
?? ?????????break
???????? logging.info('I finished my job. {}'.format(len(cups)))
?
b = threading.Thread(target=boss, args=(event,))
w = threading.Thread(target=worker, args=(10, event))
w.start()
b.start()
?
# if event.wait():?? #方1,老板在主線程中等待
# ????logging.info('I finished my job. {}'.format(len(cups)))
?
# while not event.wait(1):?? #方2,1秒看1次,老板在主線程中等待
#???? pass
# logging.info('I finished my job. {}'.format(len(cups)))
?
# while not event.is_set():?? #方3,老板在主線程中等待
#???? event.wait()?? #誰(shuí)wait就是等到flag變?yōu)門(mén)rue,或等到超時(shí)返回False,不限制等待的個(gè)數(shù)
# logging.info('I finished my job. {}'.format(len(cups)))
?
輸出:
2018-07-30-14:47:12?????? Thread info: 11032 Thread-1 I'm boss,waiting for U.
2018-07-30-14:47:13?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:13?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:14?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:14?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:15?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:15?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:16?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:16?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:17?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:17?????? Thread info: 6996 Thread-2 make 1
2018-07-30-14:47:17?????? Thread info: 6996 Thread-2 I finished my job. 10
2018-07-30-14:47:17?????? Thread info: 11032 Thread-1 good job.
?
例:
Event練習(xí):
實(shí)現(xiàn)Timer,延時(shí)執(zhí)行的線程;
延時(shí)計(jì)算add(x,y);
思路:
Timer的構(gòu)造函數(shù)中參數(shù)得有哪些?
如何實(shí)現(xiàn)start啟動(dòng)一個(gè)線程執(zhí)行函數(shù)?
如何cancel取消待執(zhí)行任務(wù)?
?
class Timer:
??? def __init__(self, interval, fn, *args, **kwargs):
??????? self.interval = interval
??????? self.fn = fn
??????? self.args = args
??????? self.kwargs = kwargs
??????? self.event = threading.Event()
?
??? def start(self):
??????? threading.Thread(target=self.__do).start()
?
??? def cancel(self):
??????? self.event.set()
?
??? def __do(self):
??????? self.event.wait(self.interval)?? #等待超時(shí)后返回False
?????????????????? # print(self.event.__dict__)?? # {'_flag': False, '_cond': <Condition(<unlocked _thread.lock object at 0x00000000012435D0>, 0)>}
??????? print(self.event.is_set())?? #False
??????? if not self.event.is_set():
??????????? self.fn(*self.args, **self.kwargs)
?
def add(x, y):
??? logging.info(x+y)
?
t = Timer(3, add, 4, 5)
t.start()
#t.cancel()?? #有此句self.is_set()為T(mén)rue
輸出:
False
2018-07-31-09:29:45?????? Thread info: 5156 Thread-1 9
?
?
?
解決多線程中數(shù)據(jù)同步;
?
凡是存在共享資源爭(zhēng)搶的地方都可使用鎖,從而保證只有一個(gè)使用者可完全使用這個(gè)資源;
?
程序=數(shù)據(jù)+算法,要保證數(shù)據(jù)的結(jié)果是可預(yù)知的,若不可預(yù)知該程序是沒(méi)用的,為得到正確的結(jié)果,性能是其次已不重要了;
?
一旦某一線程獲得鎖,其它試圖獲取的線程將被阻塞;
?
acquire(blocking=True,timeout=-1),默認(rèn)阻塞,阻塞可設(shè)置超時(shí)時(shí)間;非阻塞時(shí),timeout禁止設(shè)置;成功獲取鎖,返回True,否則返回False;
release(),釋放鎖,可從任何線程調(diào)用釋放;該方法執(zhí)行后,已上鎖的鎖會(huì)被重置為unlocked;在未上鎖的鎖上調(diào)用release(),拋RuntimeError: release unlocked lock;
?
注:
鎖將acquire()和release()中間的代碼管?。?/p>
大多數(shù)情況下用的是阻塞鎖;
鎖,類似ATM機(jī)的小房子;
加鎖,應(yīng)在分析業(yè)務(wù)基礎(chǔ)之上,在合適的地方加,不能從前到后加一把大鎖,這樣效率低下;
?
加鎖、解鎖:
一般來(lái)說(shuō),加鎖后還要有一些代碼實(shí)現(xiàn),在釋放鎖之前還有可能拋異常,一旦出現(xiàn)異常,鎖無(wú)法釋放,但當(dāng)前線程可能因?yàn)檫@個(gè)異常終止了,這就產(chǎn)生了死鎖;
一般誰(shuí)(某個(gè)線程)加的鎖誰(shuí)解鎖;
常用語(yǔ)句:
try...finally,保證鎖的釋放;
with,Lock類中有__enter__()和__exit__(),鎖對(duì)象支持上下文管理;
?
鎖的應(yīng)用場(chǎng)景:
鎖,適用于訪問(wèn)和修改同一個(gè)共享資源的時(shí)候,即讀寫(xiě)同一個(gè)資源的時(shí)候;
如果全部都是讀取同一個(gè)共享資源需要鎖嗎?不需要,因?yàn)檫@時(shí)可認(rèn)為共享資源是不可變的,每一次讀取它都是一樣的值,所以不用加鎖;
不使用鎖,雖有了效率,但結(jié)果是錯(cuò)的;
使用了鎖,雖效率低下,但結(jié)果是對(duì)的,所以為了對(duì)的結(jié)果,讓計(jì)算機(jī)計(jì)算去吧!
?
使用鎖的注意事項(xiàng):
少用鎖,必要時(shí)加鎖,使用了鎖,多線程訪問(wèn)被鎖的資源時(shí),就成了串行,要么排隊(duì)執(zhí)行,要么爭(zhēng)搶執(zhí)行;
例如,高速公路上的車(chē)并行跑,到了省界只開(kāi)放了一個(gè)收費(fèi)口,過(guò)了這個(gè)口,車(chē)輛依然可在多車(chē)道上并行跑;過(guò)收費(fèi)口時(shí),如果排隊(duì)一輛一輛過(guò),加不加鎖一樣,效率相當(dāng),但一旦出現(xiàn)爭(zhēng)搶,就必須加鎖一輛輛過(guò);
加鎖時(shí)間越短越好,不需要就立即釋放鎖;
一定要避免死鎖,盡量用with語(yǔ)句;
?
?
lock.acquire()
?
例:
lock = threading.Lock()
lock.acquire()
print('get locker1')
lock.acquire()?? #block,再次獲取同一個(gè)鎖時(shí)阻塞(獨(dú)占),必須等別人將該鎖釋放才能獲取到,此行之后的語(yǔ)句不能執(zhí)行;即,某一線程獲得鎖后,另一線程只要執(zhí)行到lock.acquire()就會(huì)阻塞?。ㄗ。?/p>
print('get locker2')
lock.release()?? #一旦某一線程release,等著的線程爭(zhēng)搶,誰(shuí)搶到誰(shuí)上鎖;一般情況,有多少個(gè)acquire就有多少個(gè)release;要保證一定release,用到上下文(with語(yǔ)句)或try...finally
print('release locker')
?
注:
死鎖(解不開(kāi)的鎖,就是死鎖):
同一個(gè)鎖,你等我釋放、我等你釋放、自己等自己釋放;
兩個(gè)鎖,你等我放一把,我等你放一把,互相等待,都不釋放,都為阻塞狀態(tài);
?
例:
def work():
??? time.sleep(3)
??? lock.release()?? #任何一個(gè)線程都可對(duì)同一個(gè)鎖操作,此處釋放鎖
?
lock = threading.Lock()
lock.acquire()
print('get locker1')
threading.Thread(target=work).start()
time.sleep(5)
lock.acquire()
print('get locker2')
lock.release()
print('release locker')
?
例:
需求:10個(gè)工人完成100個(gè)杯子;
注:臨界點(diǎn)問(wèn)題:
當(dāng)生產(chǎn)到99個(gè)時(shí),若每個(gè)線程都可看到,每個(gè)線程看到轉(zhuǎn)頭去生產(chǎn)了,導(dǎo)致結(jié)果會(huì)多;
只允許一個(gè)線程可看到當(dāng)前生產(chǎn)杯子的數(shù)量,其它線程不可見(jiàn),這個(gè)線程看到后生產(chǎn)完1個(gè)杯子時(shí),其它線程均可看到,杯子數(shù)量達(dá)到要求就不生產(chǎn)了;
?
cups = []
?
def worker(task=100):
??? while True:
??????? count = len(cups)
??????? logging.info('current count: {}'.format(count))
??????? if count >= task:
??????????? break
??????? cups.append(1)
??????? logging.info('{} make 1'.format(threading.current_thread()))
??? logging.info('total: {}'.format(count))
?
for _ in range(10):
??? threading.Thread(target=worker).start()
輸出:
……
2018-08-06-09:13:14?????? Thread info: 10208 Thread-2 current count: 104
2018-08-06-09:13:14?????? Thread info: 10208 Thread-2 total: 104
2018-08-06-09:13:14?????? Thread info: 10252 Thread-6 <Thread(Thread-6, started 10252)> make 1
2018-08-06-09:13:14?????? Thread info: 10252 Thread-6 current count: 104
2018-08-06-09:13:14?????? Thread info: 10252 Thread-6 total: 104
?
解決,加鎖:
cups = []
lock = threading.Lock()
?
def worker(lock:threading.Lock, task=100):
??? while True:
??????? lock.acquire()
??????? count = len(cups)
??????? logging.info('current count: {}'.format(count))
??????? lock.release()
??????? if count >= task:?? #雖不是大鎖,仍有問(wèn)題,在臨界點(diǎn)時(shí),其它線程仍可看到杯子總數(shù),繼而再做;解決,把中間代碼全部放到鎖里
??????????? break
??????? lock.acquire()
??????? cups.append(1)
??????? lock.release()
??????? logging.info('{} make 1'.format(threading.current_thread()))
??? logging.info('total: {}'.format(count))
?
for _ in range(10):
??? threading.Thread(target=worker, args=(lock,)).start()
輸出:
……
2018-08-06-09:19:11?????? Thread info: 4092 Thread-4 current count: 100
2018-08-06-09:19:11?????? Thread info: 4092 Thread-4 total: 100
2018-08-06-09:19:11?????? Thread info: 11024 Thread-6 current count: 100
2018-08-06-09:19:11?????? Thread info: 11024 Thread-6 total: 100
?
例:
cups = []
lock = threading.Lock()
?
def worker(lock:threading.Lock, task=100):
??? while True:
??????? lock.acquire()?? #該鎖為互斥鎖,你有我沒(méi)有,我有你沒(méi)有
??????? count = len(cups)
??????? logging.info('current count: {}'.format(count))
??????? # lock.release()
??????? if count >= task:
??????????????????????????? lock.release()
???? ???????break?? #仍有問(wèn)題,break后未釋放鎖,解決try...finally或上下文
??????? # lock.acquire()
??????? cups.append(1)
??????? lock.release()
??????? logging.info('{} make 1'.format(threading.current_thread()))
??? logging.info('total: {}'.format(count))
?
for _ in range(10):
??? threading.Thread(target=worker, args=(lock,)).start()
輸出:
……
2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 current count: 99
2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 <Thread(Thread-5, started 10148)> make 1
2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 current count: 100
2018-08-06-09:23:38?????? Thread info: 10148 Thread-5 total: 100
?
例:
計(jì)數(shù)器類,可以加、可以減;
class Counter:
??? def __init__(self):
??????? self.__val = 0
?
??? def inc(self):
??????? self.__val += 1
?
??? def dec(self):
??????? self.__val -= 1
?
??? @property
??? def value(self):
??????? return self.__val
?
def do(c:Counter, count=100):
??? for _ in range(count):
??????? for i in range(-50,50):
??????????? if i < 0:
??????????????? c.dec()
??????????? else:
??????????????? c.inc()
?
c = Counter()
threadcount = 10?? #依次10,100,1000查看,結(jié)果沒(méi)變化
c2 = 1000?? #依次10,100,1000查看,當(dāng)1000時(shí),每個(gè)線程耗cpu時(shí)間長(zhǎng),結(jié)果將不可預(yù)知
for i in range(threadcount):
??? threading.Thread(target=do, args=(c,c2)).start()
# time.sleep(5)
# print(c.value)?? #多線程情況下,每個(gè)線程耗cpu時(shí)間長(zhǎng)的情況下,此處打印的值不是最終結(jié)果
while True:
??? time.sleep(3)
??? print(threading.enumerate())
??? print(c.value)
輸出:
[<_MainThread(MainThread, started 7424)>]
81
[<_MainThread(MainThread, started 7424)>]
81
?
例,解決,加鎖:
class Counter:
??? def __init__(self):
??????? self.__val = 0
??????? self.__lock = threading.Lock()
?
??? def inc(self):
??????? try:?? #同with self.__lock: self.__val += 1;
??????????? self.__lock.acquire()
??????????? self.__val += 1
??????? finally:
??????????? self.__lock.release()
?
??? def dec(self):
??????? with self.__lock:
??????????? self.__val -= 1
?
??? @property
??? def value(self):
??????? with self.__lock:
??????????? return self.__val
?
def do(c:Counter, count=100):
??? for _ in range(count):
??????? for i in range(-50,50):
??????????? if i < 0:
??????????????? c.dec()
??????????? else:
??????????????? c.inc()
?
c = Counter()
threadcount = 10
c2 = 1000
for i in range(threadcount):
??? threading.Thread(target=do, args=(c,c2)).start()
# time.sleep(5)
# print(c.value)
while True:
??? time.sleep(1)
??? # print(threading.enumerate())
??? # print(c.value)
??? if threading.active_count() == 1:
??????? print(threading.enumerate())
??????? print(c.value)
??? else:
??????? print(threading.enumerate())
輸出:
[<Thread(Thread-2, started 11888)>, <Thread(Thread-3, started 8312)>, <Thread(Thread-6, started 10004)>, <_MainThread(MainThread, started 12200)>, <Thread(Thread-8, started 12012)>, <Thread(Thread-9, started 7788)>, <Thread(Thread-5, started 12032)>, <Thread(Thread-10, started 11576)>]
[<_MainThread(MainThread, started 12200)>]
0
[<_MainThread(MainThread, started 12200)>]
0
?
?
lock.acquire(False),獲取到鎖返回True,否則返回False;
?
?
例:
lock = threading.Lock()
lock.acquire()
ret = lock.acquire(False)?? #獲取到鎖則返回True
print(ret)
輸出:
False
?
例:
cups = []
lock = threading.Lock()
?
def worker(lock:threading.Lock, task=100):
??? while True:
??????? if lock.acquire(False):
??????????? count = len(cups)
??????????? logging.info('current count: {}'.format(count))
??????????? if count >= task:
??????????????? lock.release()
??????????????? break
??????????? cups.append(1)
??????????? lock.release()
??????????? logging.info('{} make 1'.format(threading.current_thread()))
??? logging.info('total: {}'.format(count))
?
for _ in range(10):
??? threading.Thread(target=worker, args=(lock,)).start()
輸出:
……
2018-08-06-13:52:35?????? Thread info: 11232 Thread-9 current count: 100
2018-08-06-13:52:35?????? Thread info: 11232 Thread-9 total: 100
2018-08-06-13:52:35?????? Thread info: 11752 Thread-10 current count: 100
2018-08-06-13:52:35?????? Thread info: 11752 Thread-10 total: 100
?
例:
def worker(tasks):
??? for task in tasks:
??????? if task.lock.acquire(False):
??????????? time.sleep(0.01)
??????????? logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))
??????? else:
??????????? logging.info('{} {} is working'.format(threading.current_thread(), task.name))
?
class Task:
??? def __init__(self, name):
??????? self.name = name
??????? self.lock = threading.Lock()
?
tasks = [Task('task-{}'.format(x)) for x in range(10)]
?
for i in range(5):
??? threading.Thread(target=worker, args=(tasks,), name='worker-{}'.format(i)).start()
輸出:
……
2018-08-06-14:10:34?????? Thread info: 12256 worker-1 <Thread(worker-1, started 12256)> task-7 is working
2018-08-06-14:10:34?????? Thread info: 12256 worker-1 <Thread(worker-1, started 12256)> task-8 is working
2018-08-06-14:10:34?????? Thread info: 12256 worker-1 <Thread(worker-1, started 12256)> task-9 is working
2018-08-06-14:10:34?????? Thread info: 11912 worker-3 <Thread(worker-3, started 11912)> task-9 begin to start
2018-08-06-14:10:34?????? Thread info: 10496 worker-4 <Thread(worker-4, started 10496)> task-8 begin to start
2018-08-06-14:10:34?????? Thread info: 10496 worker-4 <Thread(worker-4, started 10496)> task-9 is working
?
?
?
網(wǎng)頁(yè)名稱:40線程2_Event_Lock
本文網(wǎng)址:http://jinyejixie.com/article42/jopdhc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷(xiāo)推廣、定制開(kāi)發(fā)、網(wǎng)站設(shè)計(jì)公司、網(wǎng)站設(shè)計(jì)、網(wǎng)頁(yè)設(shè)計(jì)公司、動(dòng)態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)