python線程創建和同步(python多線程同步)

本文目錄一覽:

深入解析Python中的線程同步方法

深入解析Python中的線程同步方法

同步訪問共享資源

在使用線程的時候,一個很重要的問題是要避免多個線程對同一變量或其它資源的訪問衝突。一旦你稍不留神,重疊訪問、在多個線程中修改(共享資源)等這些操作會導致各種各樣的問題;更嚴重的是,這些問題一般只會在比較極端(比如高並發、生產服務器、甚至在性能更好的硬件設備上)的情況下才會出現。

比如有這樣一個情況:需要追蹤對一事件處理的次數

counter = 0

def process_item(item):

global counter

… do something with item …

counter += 1

如果你在多個線程中同時調用這個函數,你會發現counter的值不是那麼準確。在大多數情況下它是對的,但有時它會比實際的少幾個。

出現這種情況的原因是,計數增加操作實際上分三步執行:

解釋器獲取counter的當前值計算新值將計算的新值回寫counter變量

考慮一下這種情況:在當前線程獲取到counter值後,另一個線程搶佔到了CPU,然後同樣也獲取到了counter值,並進一步將counter值重新計算並完成回寫;之後時間片重新輪到當前線程(這裡僅作標識區分,並非實際當前),此時當前線程獲取到counter值還是原來的,完成後續兩步操作後counter的值實際只加上1。

另一種常見情況是訪問不完整或不一致狀態。這類情況主要發生在一個線程正在初始化或更新數據時,另一個進程卻嘗試讀取正在更改的數據。

原子操作

實現對共享變量或其它資源的同步訪問最簡單的方法是依靠解釋器的原子操作。原子操作是在一步完成執行的操作,在這一步中其它線程無法獲得該共享資源。

通常情況下,這種同步方法只對那些只由單個核心數據類型組成的共享資源有效,譬如,字符串變量、數字、列表或者字典等。下面是幾個線程安全的操作:

讀或者替換一個實例屬性讀或者替換一個全局變量從列表中獲取一項元素原位修改一個列表(例如:使用append增加一個列表項)從字典中獲取一項元素原位修改一個字典(例如:增加一個字典項、調用clear方法)

注意,上面提到過,對一個變量或者屬性進行讀操作,然後修改它,最終將其回寫不是線程安全的。因為另外一個線程會在這個線程讀完卻沒有修改或回寫完成之前更改這個共享變量/屬性。

鎖是Python的threading模塊提供的最基本的同步機制。在任一時刻,一個鎖對象可能被一個線程獲取,或者不被任何線程獲取。如果一個線程嘗試去獲取一個已經被另一個線程獲取到的鎖對象,那麼這個想要獲取鎖對象的線程只能暫時終止執行直到鎖對象被另一個線程釋放掉。

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

lock = Lock()

lock.acquire() #: will block if lock is already held

… access shared resource

lock.release()

注意,即使在訪問共享資源的過程中出錯了也應該釋放鎖,可以用try-finally來達到這一目的:

lock.acquire()

try:

… access shared resource

finally:

lock.release() #: release lock, no matter what

在Python 2.5及以後的版本中,你可以使用with語句。在使用鎖的時候,with語句會在進入語句塊之前自動的獲取到該鎖對象,然後在語句塊執行完成後自動釋放掉鎖:

from __future__ import with_statement #: 2.5 only

with lock:

… access shared resource

acquire方法帶一個可選的等待標識,它可用於設定當有其它線程佔有鎖時是否阻塞。如果你將其值設為False,那麼acquire方法將不再阻塞,只是如果該鎖被佔有時它會返回False:

if not lock.acquire(False):

… 鎖資源失敗

else:

try:

… access shared resource

finally:

lock.release()

你可以使用locked方法來檢查一個鎖對象是否已被獲取,注意不能用該方法來判斷調用acquire方法時是否會阻塞,因為在locked方法調用完成到下一條語句(比如acquire)執行之間該鎖有可能被其它線程佔有。

if not lock.locked():

#: 其它線程可能在下一條語句執行之前佔有了該鎖

lock.acquire() #: 可能會阻塞

簡單鎖的缺點

標準的鎖對象並不關心當前是哪個線程佔有了該鎖;如果該鎖已經被佔有了,那麼任何其它嘗試獲取該鎖的線程都會被阻塞,即使是佔有鎖的這個線程。考慮一下下面這個例子:

lock = threading.Lock()

def get_first_part():

lock.acquire()

try:

… 從共享對象中獲取第一部分數據

finally:

lock.release()

return data

def get_second_part():

lock.acquire()

try:

… 從共享對象中獲取第二部分數據

finally:

lock.release()

return data

示例中,我們有一個共享資源,有兩個分別取這個共享資源第一部分和第二部分的函數。兩個訪問函數都使用了鎖來確保在獲取數據時沒有其它線程修改對應的共享數據。

現在,如果我們想添加第三個函數來獲取兩個部分的數據,我們將會陷入泥潭。一個簡單的方法是依次調用這兩個函數,然後返回結合的結果:

def get_both_parts():

first = get_first_part()

seconde = get_second_part()

return first, second

這裡的問題是,如有某個線程在兩個函數調用之間修改了共享資源,那麼我們最終會得到不一致的數據。最明顯的解決方法是在這個函數中也使用lock:

def get_both_parts():

lock.acquire()

try:

first = get_first_part()

seconde = get_second_part()

finally:

lock.release()

return first, second

然而,這是不可行的。裏面的兩個訪問函數將會阻塞,因為外層語句已經佔有了該鎖。為了解決這個問題,你可以通過使用標記在訪問函數中讓外層語句釋放鎖,但這樣容易失去控制並導致出錯。幸運的是,threading模塊包含了一個更加實用的鎖實現:re-entrant鎖。

Re-Entrant Locks (RLock)

RLock類是簡單鎖的另一個版本,它的特點在於,同一個鎖對象只有在被其它的線程佔有時嘗試獲取才會發生阻塞;而簡單鎖在同一個線程中同時只能被佔有一次。如果當前線程已經佔有了某個RLock鎖對象,那麼當前線程仍能再次獲取到該RLock鎖對象。

lock = threading.Lock()

lock.acquire()

lock.acquire() #: 這裡將會阻塞

lock = threading.RLock()

lock.acquire()

lock.acquire() #: 這裡不會發生阻塞

RLock的主要作用是解決嵌套訪問共享資源的問題,就像前面描述的示例。要想解決前面示例中的問題,我們只需要將Lock換為RLock對象,這樣嵌套調用也會OK.

lock = threading.RLock()

def get_first_part():

… see above

def get_second_part():

… see above

def get_both_parts():

… see above

這樣既可以單獨訪問兩部分數據也可以一次訪問兩部分數據而不會被鎖阻塞或者獲得不一致的數據。

注意RLock會追蹤遞歸層級,因此記得在acquire後進行release操作。

Semaphores

信號量是一個更高級的鎖機制。信號量內部有一個計數器而不像鎖對象內部有鎖標識,而且只有當佔用信號量的線程數超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區。

semaphore = threading.BoundedSemaphore()

semaphore.acquire() #: counter減小

… 訪問共享資源

semaphore.release() #: counter增大

當信號量被獲取的時候,計數器減小;當信號量被釋放的時候,計數器增大。當獲取信號量的時候,如果計數器值為0,則該進程將阻塞。當某一信號量被釋放,counter值增加為1時,被阻塞的線程(如果有的話)中會有一個得以繼續運行。

信號量通常被用來限制對容量有限的資源的訪問,比如一個網絡連接或者數據庫服務器。在這類場景中,只需要將計數器初始化為最大值,信號量的實現將為你完成剩下的事情。

max_connections = 10

semaphore = threading.BoundedSemaphore(max_connections)

如果你不傳任何初始化參數,計數器的值會被初始化為1.

Python的threading模塊提供了兩種信號量實現。Semaphore類提供了一個無限大小的信號量,你可以調用release任意次來增大計數器的值。為了避免錯誤出現,最好使用BoundedSemaphore類,這樣當你調用release的次數大於acquire次數時程序會出錯提醒。

線程同步

鎖可以用在線程間的同步上。threading模塊包含了一些用於線程間同步的類。

Events

一個事件是一個簡單的同步對象,事件表示為一個內部標識(internal flag),線程等待這個標識被其它線程設定,或者自己設定、清除這個標識。

event = threading.Event()

#: 一個客戶端線程等待flag被設定

event.wait()

#: 服務端線程設置或者清除flag

event.set()

event.clear()

一旦標識被設定,wait方法就不做任何處理(不會阻塞),當標識被清除時,wait將被阻塞直至其被重新設定。任意數量的線程可能會等待同一個事件。

Conditions

條件是事件對象的高級版本。條件表現為程序中的某種狀態改變,線程可以等待給定條件或者條件發生的信號。

下面是一個簡單的生產者/消費者實例。首先你需要創建一個條件對象:

#: 表示一個資源的附屬項

condition = threading.Condition()

生產者線程在通知消費者線程有新生成資源之前需要獲得條件:

#: 生產者線程

… 生產資源項

condition.acquire()

… 將資源項添加到資源中

condition.notify() #: 發出有可用資源的信號

condition.release()

消費者必須獲取條件(以及相關聯的鎖),然後嘗試從資源中獲取資源項:

#: 消費者線程

condition.acquire()

while True:

…從資源中獲取資源項

if item:

break

condition.wait() #: 休眠,直至有新的資源

condition.release()

… 處理資源

wait方法釋放了鎖,然後將當前線程阻塞,直到有其它線程調用了同一條件對象的notify或者notifyAll方法,然後又重新拿到鎖。如果同時有多個線程在等待,那麼notify方法只會喚醒其中的一個線程,而notifyAll則會喚醒全部線程。

為了避免在wait方法處阻塞,你可以傳入一個超時參數,一個以秒為單位的浮點數。如果設置了超時參數,wait將會在指定時間返回,即使notify沒被調用。一旦使用了超時,你必須檢查資源來確定發生了什麼。

注意,條件對象關聯着一個鎖,你必須在訪問條件之前獲取這個鎖;同樣的,你必須在完成對條件的訪問時釋放這個鎖。在生產代碼中,你應該使用try-finally或者with.

可以通過將鎖對象作為條件構造函數的參數來讓條件關聯一個已經存在的鎖,這可以實現多個條件公用一個資源:

lock = threading.RLock()

condition_1 = threading.Condition(lock)

condition_2 = threading.Condition(lock)

互斥鎖同步

我們先來看一個例子:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import time, threading

# 假定這是你的銀行存款:

balance = 0

muxlock = threading.Lock()

def change_it(n):

# 先存後取,結果應該為0:

global balance

balance = balance + n

balance = balance – n

def run_thread(n):

# 循環次數一旦多起來,最後的數字就變成非0

for i in range(100000):

change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))

t2 = threading.Thread(target=run_thread, args=(8,))

t3 = threading.Thread(target=run_thread, args=(9,))

t1.start()

t2.start()

t3.start()

t1.join()

t2.join()

t3.join()

print balance

結果 :

[/data/web/test_python]$ python multhread_threading.py

[/data/web/test_python]$ python multhread_threading.py

61

[/data/web/test_python]$ python multhread_threading.py

[/data/web/test_python]$ python multhread_threading.py

24

上面的例子引出了多線程編程的最常見問題:數據共享。當多個線程都修改某一個共享數據的時候,需要進行同步控制。

線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為「鎖定」,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成「非鎖定」,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。

threading模塊中定義了Lock類,可以方便的處理鎖定:

#創建鎖mutex = threading.Lock()

#鎖定mutex.acquire([timeout])

#釋放mutex.release()

其中,鎖定方法acquire可以有一個超時時間的可選參數timeout。如果設定了timeout,則在超時後通過返回值可以判斷是否得到了鎖,從而可以進行一些其他的處理。

使用互斥鎖實現上面的例子的代碼如下:

balance = 0

muxlock = threading.Lock()

def change_it(n):

# 獲取鎖,確保只有一個線程操作這個數

muxlock.acquire()

global balance

balance = balance + n

balance = balance – n

# 釋放鎖,給其他被阻塞的線程繼續操作

muxlock.release()

def run_thread(n):

for i in range(10000):

change_it(n)

加鎖後的結果,就能確保數據正確:

[/data/web/test_python]$ python multhread_threading.py

[/data/web/test_python]$ python multhread_threading.py

[/data/web/test_python]$ python multhread_threading.py

[/data/web/test_python]$ python multhread_threading.py

python 線程同步問題

class myThread (threading.Thread):

def __init__(self, threadID, name, counter):

threading.Thread.__init__(self)

self.threadID = threadID

self.name = name

self.counter = counter

def run(self):

print “Starting ” + self.name

# 獲得鎖,成功獲得鎖定後返回True

# 可選的timeout參數不填時將一直阻塞直到獲得鎖定

# 否則超時後將返回False

threadLock.acquire()

print_time(self.name, self.counter, 3)

# 釋放鎖

threadLock.release()

def print_time(threadName, delay, counter):

while counter:

time.sleep(delay)

print “%s: %s” % (threadName, time.ctime(time.time()))

counter -= 1

threadLock = threading.Lock()

threads = []

# 創建新線程

thread1 = myThread(1, “Thread-1”, 1)

thread2 = myThread(2, “Thread-2”, 2)

# 開啟新線程

thread1.start()

thread2.start()

PYTHON多線程同步的幾種方法

Python進階(二十六)-多線程實現同步的四種方式

臨界資源即那些一次只能被一個線程訪問的資源,典型例子就是打印機,它一次只能被一個程序用來執行打印功能,因為不能多個線程同時操作,而訪問這部分資源的代碼通常稱之為臨界區。

鎖機制

threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖

import threadingimport timeclass Num:

def __init__(self):

self.num = 0

self.lock = threading.Lock() def add(self):

self.lock.acquire()#加鎖,鎖住相應的資源

self.num += 1

num = self.num

self.lock.release()#解鎖,離開該資源

return num

n = Num()class jdThread(threading.Thread):

def __init__(self,item):

threading.Thread.__init__(self)

self.item = item def run(self):

time.sleep(2)

value = n.add()#將num加1,並輸出原來的數據和+1之後的數據

print(self.item,value)for item in range(5):

t = jdThread(item)

t.start()

t.join()#使線程一個一個執行12345678910111213141516171819202122232425262728

當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入「locked」狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為「blocked」狀態,稱為「同步阻塞」(參見多線程的基本概念)。

直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。

信號量

信號量也提供acquire方法和release方法,每當調用acquire方法的時候,如果內部計數器大於0,則將其減1,如果內部計數器等於0,則會阻塞該線程,知道有線程調用了release方法將內部計數器更新到大於1位置。

import threadingimport timeclass Num:

def __init__(self):

self.num = 0

self.sem = threading.Semaphore(value = 3) #允許最多三個線程同時訪問資源

def add(self):

self.sem.acquire()#內部計數器減1

self.num += 1

num = self.num

self.sem.release()#內部計數器加1

return num

n = Num()class jdThread(threading.Thread):

def __init__(self,item):

threading.Thread.__init__(self)

self.item = item def run(self):

time.sleep(2)

value = n.add()

print(self.item,value)for item in range(100):

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/286431.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-23 03:47
下一篇 2024-12-23 03:47

相關推薦

  • Python計算陽曆日期對應周幾

    本文介紹如何通過Python計算任意陽曆日期對應周幾。 一、獲取日期 獲取日期可以通過Python內置的模塊datetime實現,示例代碼如下: from datetime imp…

    編程 2025-04-29
  • Python列表中負數的個數

    Python列表是一個有序的集合,可以存儲多個不同類型的元素。而負數是指小於0的整數。在Python列表中,我們想要找到負數的個數,可以通過以下幾個方面進行實現。 一、使用循環遍歷…

    編程 2025-04-29
  • Python周杰倫代碼用法介紹

    本文將從多個方面對Python周杰倫代碼進行詳細的闡述。 一、代碼介紹 from urllib.request import urlopen from bs4 import Bea…

    編程 2025-04-29
  • Python中引入上一級目錄中函數

    Python中經常需要調用其他文件夾中的模塊或函數,其中一個常見的操作是引入上一級目錄中的函數。在此,我們將從多個角度詳細解釋如何在Python中引入上一級目錄的函數。 一、加入環…

    編程 2025-04-29
  • 如何查看Anaconda中Python路徑

    對Anaconda中Python路徑即conda環境的查看進行詳細的闡述。 一、使用命令行查看 1、在Windows系統中,可以使用命令提示符(cmd)或者Anaconda Pro…

    編程 2025-04-29
  • Python清華鏡像下載

    Python清華鏡像是一個高質量的Python開發資源鏡像站,提供了Python及其相關的開發工具、框架和文檔的下載服務。本文將從以下幾個方面對Python清華鏡像下載進行詳細的闡…

    編程 2025-04-29
  • 蝴蝶優化算法Python版

    蝴蝶優化算法是一種基於仿生學的優化算法,模仿自然界中的蝴蝶進行搜索。它可以應用於多個領域的優化問題,包括數學優化、工程問題、機器學習等。本文將從多個方面對蝴蝶優化算法Python版…

    編程 2025-04-29
  • Python字典去重複工具

    使用Python語言編寫字典去重複工具,可幫助用戶快速去重複。 一、字典去重複工具的需求 在使用Python編寫程序時,我們經常需要處理數據文件,其中包含了大量的重複數據。為了方便…

    編程 2025-04-29
  • Python程序需要編譯才能執行

    Python 被廣泛應用於數據分析、人工智能、科學計算等領域,它的靈活性和簡單易學的性質使得越來越多的人喜歡使用 Python 進行編程。然而,在 Python 中程序執行的方式不…

    編程 2025-04-29
  • python強行終止程序快捷鍵

    本文將從多個方面對python強行終止程序快捷鍵進行詳細闡述,並提供相應代碼示例。 一、Ctrl+C快捷鍵 Ctrl+C快捷鍵是在終端中經常用來強行終止運行的程序。當你在終端中運行…

    編程 2025-04-29

發表回復

登錄後才能評論