一、Semaphore基礎知識
1、Semaphore的定義和概念
import threading
sem = threading.Semaphore(3)
Semaphore是一種計數器,它用來保護對共享資源的訪問。在任何時刻,同時只能有一個線程訪問共享資源。當線程要訪問共享資源時,它必須首先獲得Semaphore,如果Semaphore計數器的值為0,那麼線程就會被阻塞,直到Semaphore有信號為止。
2、Semaphore的構造函數
__init__(self, value=1)
Semaphore的構造函數有一個參數value,這個參數指定了Semaphore的初始計數器的值,默認為1。
3、Semaphore的方法
Semaphore對象有兩個主要方法,分別是acquire()和release()方法。
- acquire([blocking])
acquire()方法嘗試獲取Semaphore,如果Semaphore計數器的值為0,就會阻塞線程。可選參數blocking默認為1,如果設為0,則acquire()方法會立即返回,不會阻塞線程。當acquire()方法成功獲取Semaphore時,Semaphore計數器的值會減1。
- release()
release()方法會把Semaphore的計數器的值加1,如果此時Semaphore有阻塞的線程,它就會選擇一個線程並喚醒它。
二、Semaphore使用場景
1、實現對公共資源的訪問控制
Semaphore的最主要的應用場景就是實現對公共資源的訪問控制。Semaphore控制對公共資源的訪問,保證同一時刻只有一個線程在訪問該資源,而其他線程必須等待。
import threading
class SharedResource:
def __init__(self):
self.sem = threading.Semaphore(1)
self.data = None
def get(self):
self.sem.acquire()
result = self.data
self.sem.release()
return result
def set(self, data):
self.sem.acquire()
self.data = data
self.sem.release()
上述代碼中,定義了一個SharedResource類,該類有一個Semaphore成員變數sem,Semaphore的計數器的初始值為1。類中定義了get()和set()方法,這兩個方法在訪問data成員變數時,都要先獲取Semaphore,然後再進行操作。
2、控制程序並發訪問線程數
當程序需要同時運行大量的線程的時候,如果沒有控制線程數,容易出現線程過多造成CPU調度產生的負擔,進而導致程序運行緩慢或者崩潰。這時,可以使用Semaphore來控制並發訪問線程數,保證程序有一個平緩的工作負荷。
import threading
import time
sem = threading.Semaphore(3)
def worker():
with sem:
print(f'Thread {threading.get_ident()} started')
time.sleep(1)
print(f'Thread {threading.get_ident()} finished')
for i in range(5):
t = threading.Thread(target=worker)
t.start()
上述代碼中,創建了5個線程來執行worker函數。設置了Semaphore的計數器初始值為3,即同一時間只能有3個線程執行worker函數。使用with語句獲取Semaphore,當3個線程都在執行時,後續的線程會被阻塞。當某個線程執行結束時,會釋放Semaphore,喚醒被阻塞的線程。
三、Semaphore實現生產者-消費者模型
Semaphore還可以用來實現生產者-消費者模型。生產者和消費者之間通過一個隊列來進行通訊。當隊列中沒有可以消費的數據時,消費者線程會阻塞,當隊列滿了時,生產者線程會阻塞。當生產者生產了新的數據時,會通知阻塞的消費者線程開始消費,當消費者消費了數據時,會通知阻塞的生產者線程開始生產。
import threading
import time
queue = []
sem_producer = threading.Semaphore(10)
sem_consumer = threading.Semaphore(0)
mutex = threading.Lock()
class ProducerThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
global queue
for i in range(20):
sem_producer.acquire()
mutex.acquire()
queue.append(i)
print(f'{self.name} produced {i}', end='\n' if i%10==9 else ', ')
mutex.release()
sem_consumer.release()
time.sleep(0.1)
class ConsumerThread(threading.Thread):
def __init__(self, name):
super().__init__(name=name)
def run(self):
global queue
while True:
sem_consumer.acquire()
mutex.acquire()
if len(queue) == 0:
mutex.release()
break
i = queue.pop(0)
print(f'{self.name} consumed {i}', end='\n' if i%10==9 else ', ')
mutex.release()
sem_producer.release()
time.sleep(0.1)
producer_threads = [ProducerThread(f'Producer {i}') for i in range(3)]
consumer_threads = [ConsumerThread(f'Consumer {i}') for i in range(2)]
for thread in producer_threads + consumer_threads:
thread.start()
for thread in producer_threads + consumer_threads:
thread.join()
上述代碼中,定義了ProducerThread和ConsumerThread兩個線程類,ProducerThread負責生產數據,ConsumerThread負責消費數據。queue是一個列表,表示共享數據的隊列,Semaphore sem_producer的計數器初始值為10,Semaphore sem_consumer的計數器初始值為0,互斥鎖mutex保護對隊列的訪問。運行3個生產者線程和2個消費者線程,生產者生產20個數據,消費者消費這20個數據。
四、總結
Semaphore是Python多線程編程中非常有用的工具,可以用來控制對公共資源的訪問,也可以用來控制並發線程數,還能夠實現生產者-消費者模型。但是,使用Semaphore時需要注意兩個問題:第一,互斥鎖和Semaphore的區別,互斥鎖是為了保證對共享資源的互斥訪問,Semaphore是為了控制對公共資源的訪問。第二,使用Semaphore時需要注意死鎖問題,當某個線程無法獲取Semaphore時會被阻塞,如果在某個線程獲取Semaphore之前另一個線程已經獲取了Semaphore並陷入阻塞,就會出現死鎖,程序將無法繼續運行。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/154115.html