學(xué)習(xí)過操作系統(tǒng)的小伙伴應(yīng)該對資源的控制有所了解——資源是一定的,不能讓所有進(jìn)程無控制地進(jìn)行搶占。這就要求我們對進(jìn)行進(jìn)行并發(fā)控制。那么python是如何實(shí)現(xiàn)多進(jìn)程并發(fā)控制的呢?主要是使用互斥鎖和新號量來控制進(jìn)程資源調(diào)度。接下來小編就來介紹怎么使用python實(shí)現(xiàn)多進(jìn)程并發(fā)控制吧。
一、了解鎖
應(yīng)用場景舉例描述: Lock 互斥鎖:舉例說明–有三個同事同時需要上廁所,但是只有一間廁所,將同事比作進(jìn)程,多個進(jìn)程并打搶占一個廁所,我們要保證順序優(yōu)先, 一個個來,那么就必須串行,先到先得,有人使用了,就鎖住,結(jié)束后剩余的人繼續(xù)爭搶
1、利用Lock來處理
模擬三個同事?lián)屨紟?/p>
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
def task1(p, lock):
# 上鎖
lock.acquire()
print(f'{p} 開始排泄')
time.sleep(random.randint(1, 3))
print(f'{p} 排泄結(jié)束')
# 解鎖
lock.release()
def task2(p, lock):
lock.acquire()
print(f'{p} 開始排泄')
time.sleep(random.randint(1, 3))
print(f'{p} 排泄結(jié)束')
lock.release()
def task3(p, lock):
lock.acquire()
print(f'{p} 開始排泄')
time.sleep(random.randint(1, 3))
print(f'{p} 排泄結(jié)束')
lock.release()
if __name__ == '__main__':
# 實(shí)例化一個鎖對象
mutex = Lock()
# 將鎖以參數(shù)的形式傳入進(jìn)程對象
p1 = Process(target=task1, args=('task1', mutex,))
p2 = Process(target=task2, args=('task2', mutex,))
p3 = Process(target=task3, args=('task3', mutex,))
p1.start()
p2.start()
p3.start()
執(zhí)行結(jié)果:
# 輸出結(jié)果:三個進(jìn)程開始爭搶互斥鎖,先搶到的先執(zhí)行,執(zhí)行結(jié)束后,釋放掉鎖,剩下的繼續(xù)爭搶
task1 開始排泄
task1 排泄結(jié)束
task2 開始排泄
task2 排泄結(jié)束
task3 開始排泄
task3 排泄結(jié)束
1、 注意:
- 互斥鎖在函數(shù)中,lock.acquire()上鎖一次就要lock.release()解鎖一次,在上鎖與解鎖之間寫需要執(zhí)行的代碼。
- 如果連續(xù)上鎖兩次以上,就會出現(xiàn)死鎖現(xiàn)象,代碼將不繼續(xù)執(zhí)行下去。必須是鎖一次解一次。
2、 lock和join比較:
- 共同點(diǎn)------都可以把并行變成串行,保證了執(zhí)行順序
- 不同點(diǎn)------join是人為設(shè)定了順序,lock是讓其爭搶順序,保證了公平性
2、利用反射,來優(yōu)化上面的代碼
上面的代碼雖然起到了先進(jìn)先出,一進(jìn)一出的效果,但是并不完美。總所周知,我們上廁所是誰先搶到誰先上,并不是說按照代碼里start()順序執(zhí)行。應(yīng)該由先搶占到的進(jìn)程限制性才更合理。
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys
def task1(p, lock):
# 上鎖
lock.acquire()
print(f'{p} 開始打印')
time.sleep(random.randint(1, 3))
print(f'{p} 打印結(jié)束')
# 解鎖
lock.release()
def task2(p, lock):
lock.acquire()
print(f'{p} 開始打印')
time.sleep(random.randint(1, 3))
print(f'{p} 打印結(jié)束')
lock.release()
def task3(p, lock):
lock.acquire()
print(f'{p} 開始打印')
time.sleep(random.randint(1, 3))
print(f'{p} 打印結(jié)束')
lock.release()
if __name__ == '__main__':
slock = Lock()
for i in range(1,4):
p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(f'task{i}', slock))
p.start()
輸出結(jié)果:
task2 開始打印
task2 打印結(jié)束
task3 開始打印
task3 打印結(jié)束
task1 開始打印
task1 打印結(jié)束
二、進(jìn)程并發(fā)控制 semaphore
semaphore(信號量):用來控制對共享資源的訪問數(shù)量,可以控制同一時刻并發(fā)的進(jìn)程數(shù)
信號量: 也是一把鎖,但是不保證數(shù)據(jù)安全性,同時開啟多個線程,但是規(guī)定了同時并發(fā)執(zhí)行的上限,后面走多少,進(jìn)多少。(用于控制并發(fā)數(shù)量)
1.多進(jìn)程控制示例(1)
# 舉例說明:一間廁所有5個坑位,最多只能同時有5個人上廁所,當(dāng)前時刻有20個人想上廁所,但是只能讓5個人進(jìn)去,然后出來多少個,才能進(jìn)去多少個上廁所
# 從一個模塊引用多個功能的時候,用逗號隔開
from threading import Semaphore, Thread, currentThread
import time
import random
sem = Semaphore(3) # 并發(fā)執(zhí)行數(shù)設(shè)置為5
def task():
sem.acquire()
print(f'{currentThread().name}')
time.sleep(random.randint(1,3))
sem.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task)
t.start()
執(zhí)行結(jié)果:首次并發(fā)量是3,后面先搶到鎖先執(zhí)行
Thread-1
Thread-2
Thread-3Thread-4
Thread-5Thread-6
Thread-7Thread-8
Process finished with exit code 0
2.多進(jìn)程控制示例(2)
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 搶占并獲得鎖,運(yùn)行")
time.sleep(i)
print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 運(yùn)行結(jié)束,釋放鎖")
s.release()
if __name__ == '__main__':
s = multiprocessing.Semaphore(2)
for i in range(8):
p = multiprocessing.Process(target=worker, args=(s, 1))
p.start()
執(zhí)行結(jié)果:
在執(zhí)行結(jié)果輸出的終端,每阻塞一次,按下回車鍵,可以更加清晰的看出進(jìn)程的并發(fā)執(zhí)行。
由下面執(zhí)行結(jié)果可以看出,同一時刻,有兩個進(jìn)程在執(zhí)行
2021-05-18 22:50:37 Process-1 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:37 Process-2 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:38 Process-1 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:38 Process-3 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:38 Process-2 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:38 Process-4 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:39 Process-3 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:39 Process-5 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:39 Process-4 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:39 Process-6 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:40 Process-5 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:40 Process-7 搶占并獲得鎖,運(yùn)行
2021-05-18 22:50:40 Process-6 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:40 Process-8 搶占并獲得鎖,運(yùn)行2021-05-18 22:50:41 Process-7 運(yùn)行結(jié)束,釋放鎖
2021-05-18 22:50:41 Process-8 運(yùn)行結(jié)束,釋放鎖Process finished with exit code 0
三、進(jìn)程同步之LOCK
多個進(jìn)程并發(fā)執(zhí)行,提高資源利用率,從而提高效率,但是有時候我們需要在某一時刻只能有一個進(jìn)程訪問某個共享資源的話,就需要使用鎖LOCK
1.不加LOCK的示例
import multiprocessing
import time
def task1():
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task1 輸出信息')
time.sleep(1)
n -= 1
def task2():
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task2 輸出信息')
time.sleep(1)
n -= 1
def task3():
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task3 輸出信息')
time.sleep(1)
n -= 1
if __name__ == '__main__':
p1 = multiprocessing.Process(target=task1)
p2 = multiprocessing.Process(target=task2)
p3 = multiprocessing.Process(target=task3)
p1.start()
p2.start()
p3.start()
執(zhí)行結(jié)果:
2021-59-18 22:59:46 task1 輸出信息
2021-59-18 22:59:46 task2 輸出信息
2021-59-18 22:59:46 task3 輸出信息2021-59-18 22:59:47 task1 輸出信息
2021-59-18 22:59:47 task2 輸出信息
2021-59-18 22:59:47 task3 輸出信息2021-59-18 22:59:48 task1 輸出信息
2021-59-18 22:59:48 task2 輸出信息
2021-59-18 22:59:48 task3 輸出信息Process finished with exit code 0
2.加上LOCK的示例
有兩種加鎖方式:首先將 lock = multiprocessing.Lock() 生成鎖對象lock
- with lock: with會在執(zhí)行前啟動lock,在執(zhí)行結(jié)束后關(guān)閉lock
- lock.acquire() … lock.release() : 注意,這倆必須是一個接一個的對應(yīng)關(guān)系
import multiprocessing
import time
def task1(lock):
with lock:
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task1 輸出信息')
time.sleep(1)
n -= 1
def task2(lock):
lock.acquire()
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task2 輸出信息')
time.sleep(1)
n -= 1
lock.release()
def task3(lock):
lock.acquire()
n = 4
while n > 1:
print(f'{time.strftime("%Y-%M-%d %H:%M:%S")} task3 輸出信息')
time.sleep(1)
n -= 1
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
p1 = multiprocessing.Process(target=task1, args=(lock,))
p2 = multiprocessing.Process(target=task2, args=(lock,))
p3 = multiprocessing.Process(target=task3, args=(lock,))
p1.start()
p2.start()
p3.start()
執(zhí)行結(jié)果
2021-11-18 23:11:37 task1 輸出信息
2021-11-18 23:11:38 task1 輸出信息
2021-11-18 23:11:39 task1 輸出信息
2021-11-18 23:11:40 task2 輸出信息
2021-11-18 23:11:41 task2 輸出信息
2021-11-18 23:11:42 task2 輸出信息
2021-11-18 23:11:43 task3 輸出信息
2021-11-18 23:11:44 task3 輸出信息
2021-11-18 23:11:45 task3 輸出信息
Process finished with exit code 0
到此這篇python實(shí)現(xiàn)多進(jìn)程并發(fā)控制的文章就介紹到這了,更多python進(jìn)程相關(guān)的學(xué)習(xí)內(nèi)容請搜索W3Cschool以前的文章或繼續(xù)瀏覽下面的相關(guān)文章。