golang線程池,golang 線程池和協程池

本文目錄一覽:

golang的線程模型——GMP模型

內核線程(Kernel-Level Thread ,KLT)

輕量級進程(Light Weight Process,LWP):輕量級進程就是我們通常意義上所講的線程,由於每個輕量級進程都由一個內核線程支持,因此只有先支持內核線程,才能有輕量級進程

用戶線程與系統線程一一對應,用戶線程執行如lo操作的系統調用時,來回切換操作開銷相對比較大

多個用戶線程對應一個內核線程,當內核線程對應的一個用戶線程被阻塞掛起時候,其他用戶線程也阻塞不能執行了。

多對多模型是可以充分利用多核CPU提升運行效能的

go線程模型包含三個概念:內核線程(M),goroutine(G),G的上下文環境(P);

GMP模型是goalng特有的。

P與M一般是一一對應的。P(上下文)管理着一組G(goroutine)掛載在M(內核線程)上運行,圖中左邊藍色為正在執行狀態的goroutine,右邊為待執行狀態的goroutiine隊列。P的數量由環境變量GOMAXPROCS的值或程序運行runtime.GOMAXPROCS()進行設置。

當一個os線程在執行M1一個G1發生阻塞時,調度器讓M1拋棄P,等待G1返回,然後另起一個M2接收P來執行剩下的goroutine隊列(G2、G3…),這是golang調度器厲害的地方,可以保證有足夠的線程來運行剩下所有的goroutine。

當G1結束後,M1會重新拿回P來完成,如果拿不到就丟到全局runqueue中,然後自己放到線程池或轉入休眠狀態。空閑的上下文P會周期性的檢查全局runqueue上的goroutine,並且執行它。

另一種情況就是當有些P1太閑而其他P2很忙碌的時候,會從其他上下文P2拿一些G來執行。

詳細可以翻看下方第一個參考鏈接,寫得真好。

最後用大佬的總結來做最後的收尾————

Go語言運行時,通過核心元素G,M,P 和 自己的調度器,實現了自己的並發線程模型。調度器通過對G,M,P的調度實現了兩級線程模型中操作系統內核之外的調度任務。整個調度過程中會在多種時機去觸發最核心的步驟 「一整輪調度」,而一整輪調度中最關鍵的部分在「全力查找可運行G」,它保證了M的高效運行(換句話說就是充分使用了計算機的物理資源),一整輪調度中還會涉及到M的啟用停止。最後別忘了,還有一個與Go程序生命周期相同的系統監測任務來進行一些輔助性的工作。

淺析Golang的線程模型與調度器

Golang CSP並發模型

Golang線程模型

【golang詳解】go語言GMP(GPM)原理和調度

Goroutine調度是一個很複雜的機制,下面嘗試用簡單的語言描述一下Goroutine調度機制,想要對其有更深入的了解可以去研讀一下源碼。

首先介紹一下GMP什麼意思:

G ———– goroutine: 即Go協程,每個go關鍵字都會創建一個協程。

M ———- thread內核級線程,所有的G都要放在M上才能運行。

P ———– processor處理器,調度G到M上,其維護了一個隊列,存儲了所有需要它來調度的G。

Goroutine 調度器P和 OS 調度器是通過 M 結合起來的,每個 M 都代表了 1 個內核線程,OS 調度器負責把內核線程分配到 CPU 的核上執行

模型圖:

避免頻繁的創建、銷毀線程,而是對線程的復用。

1)work stealing機制

  當本線程無可運行的G時,嘗試從其他線程綁定的P偷取G,而不是銷毀線程。

2)hand off機制

  當本線程M0因為G0進行系統調用阻塞時,線程釋放綁定的P,把P轉移給其他空閑的線程執行。進而某個空閑的M1獲取P,繼續執行P隊列中剩下的G。而M0由於陷入系統調用而進被阻塞,M1接替M0的工作,只要P不空閑,就可以保證充分利用CPU。M1的來源有可能是M的緩存池,也可能是新建的。當G0系統調用結束後,根據M0是否能獲取到P,將會將G0做不同的處理:

如果有空閑的P,則獲取一個P,繼續執行G0。

如果沒有空閑的P,則將G0放入全局隊列,等待被其他的P調度。然後M0將進入緩存池睡眠。

如下圖

GOMAXPROCS設置P的數量,最多有GOMAXPROCS個線程分佈在多個CPU上同時運行

在Go中一個goroutine最多佔用CPU 10ms,防止其他goroutine被餓死。

具體可以去看另一篇文章

【Golang詳解】go語言調度機制 搶佔式調度

當創建一個新的G之後優先加入本地隊列,如果本地隊列滿了,會將本地隊列的G移動到全局隊列裏面,當M執行work stealing從其他P偷不到G時,它可以從全局G隊列獲取G。

協程經歷過程

我們創建一個協程 go func()經歷過程如下圖:

說明:

這裡有兩個存儲G的隊列,一個是局部調度器P的本地隊列、一個是全局G隊列。新創建的G會先保存在P的本地隊列中,如果P的本地隊列已經滿了就會保存在全局的隊列中;處理器本地隊列是一個使用數組構成的環形鏈表,它最多可以存儲 256 個待執行任務。

G只能運行在M中,一個M必須持有一個P,M與P是1:1的關係。M會從P的本地隊列彈出一個可執行狀態的G來執行,如果P的本地隊列為空,就會想其他的MP組合偷取一個可執行的G來執行;

一個M調度G執行的過程是一個循環機制;會一直從本地隊列或全局隊列中獲取G

上面說到P的個數默認等於CPU核數,每個M必須持有一個P才可以執行G,一般情況下M的個數會略大於P的個數,這多出來的M將會在G產生系統調用時發揮作用。類似線程池,Go也提供一個M的池子,需要時從池子中獲取,用完放回池子,不夠用時就再創建一個。

work-stealing調度算法:當M執行完了當前P的本地隊列隊列里的所有G後,P也不會就這麼在那躺屍啥都不幹,它會先嘗試從全局隊列隊列尋找G來執行,如果全局隊列為空,它會隨機挑選另外一個P,從它的隊列里中拿走一半的G到自己的隊列中執行。

如果一切正常,調度器會以上述的那種方式順暢地運行,但這個世界沒這麼美好,總有意外發生,以下分析goroutine在兩種例外情況下的行為。

Go runtime會在下面的goroutine被阻塞的情況下運行另外一個goroutine:

用戶態阻塞/喚醒

當goroutine因為channel操作或者network I/O而阻塞時(實際上golang已經用netpoller實現了goroutine網絡I/O阻塞不會導致M被阻塞,僅阻塞G,這裡僅僅是舉個栗子),對應的G會被放置到某個wait隊列(如channel的waitq),該G的狀態由_Gruning變為_Gwaitting,而M會跳過該G嘗試獲取並執行下一個G,如果此時沒有可運行的G供M運行,那麼M將解綁P,並進入sleep狀態;當阻塞的G被另一端的G2喚醒時(比如channel的可讀/寫通知),G被標記為,嘗試加入G2所在P的runnext(runnext是線程下一個需要執行的 Goroutine。), 然後再是P的本地隊列和全局隊列。

系統調用阻塞

當M執行某一個G時候如果發生了阻塞操作,M會阻塞,如果當前有一些G在執行,調度器會把這個線程M從P中摘除,然後再創建一個新的操作系統的線程(如果有空閑的線程可用就復用空閑線程)來服務於這個P。當M系統調用結束時候,這個G會嘗試獲取一個空閑的P執行,並放入到這個P的本地隊列。如果獲取不到P,那麼這個線程M變成休眠狀態, 加入到空閑線程中,然後這個G會被放入全局隊列中。

隊列輪轉

可見每個P維護着一個包含G的隊列,不考慮G進入系統調用或IO操作的情況下,P周期性的將G調度到M中執行,執行一小段時間,將上下文保存下來,然後將G放到隊列尾部,然後從隊列中重新取出一個G進行調度。

除了每個P維護的G隊列以外,還有一個全局的隊列,每個P會周期性地查看全局隊列中是否有G待運行並將其調度到M中執行,全局隊列中G的來源,主要有從系統調用中恢復的G。之所以P會周期性地查看全局隊列,也是為了防止全局隊列中的G被餓死。

除了每個P維護的G隊列以外,還有一個全局的隊列,每個P會周期性地查看全局隊列中是否有G待運行並將其調度到M中執行,全局隊列中G的來源,主要有從系統調用中恢復的G。之所以P會周期性地查看全局隊列,也是為了防止全局隊列中的G被餓死。

M0

M0是啟動程序後的編號為0的主線程,這個M對應的實例會在全局變量rutime.m0中,不需要在heap上分配,M0負責執行初始化操作和啟動第一個G,在之後M0就和其他的M一樣了

G0

G0是每次啟動一個M都會第一個創建的goroutine,G0僅用於負責調度G,G0不指向任何可執行的函數,每個M都會有一個自己的G0,在調度或系統調用時會使用G0的棧空間,全局變量的G0是M0的G0

一個G由於調度被中斷,此後如何恢復?

中斷的時候將寄存器里的棧信息,保存到自己的G對象裏面。當再次輪到自己執行時,將自己保存的棧信息複製到寄存器裏面,這樣就接着上次之後運行了。

我這裡只是根據自己的理解進行了簡單的介紹,想要詳細了解有關GMP的底層原理可以去看Go調度器 G-P-M 模型的設計者的文檔或直接看源碼

參考: ()

()

Linux下C/C++ 手寫一個線程池-

在我們日常生活中會遇到許許多多的問題,如果一個服務端要接受很多客戶端的數據,該怎麼辦?多線程並發內存不夠怎麼辦?所以我們需要了解線程池的相關知識。

1.線程池的簡介

線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然後在創建線程後自動啟動這些任務。線程池線程都是後台線程。每個線程都使用默認的堆棧大小,以默認的優先級運行,並處於多線程單元中。如果某個線程在託管代碼中空閑(如正在等待某個事件),則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間後創建另一個輔助線程但線程的數目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成後才啟動。

2.線程池的組成

1、線程池管理器(ThreadPoolManager):用於創建並管理線程池

2、工作線程(WorkThread): 線程池中線程

3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行。

4、任務隊列:用於存放沒有處理的任務。提供一種緩衝機制。

3.線程池的主要優點

1.避免線程太多,使得內存耗盡

2.避免創建與銷毀線程的代價

3.任務與執行分離

1.線程池結構體定義

代碼如下(示例):

相關視頻推薦

150行代碼,帶你手寫線程池,自行準備linux環境

C++後台開發該學哪些內容,標準技術路線及面經與算法該如何刷

學習地址:C/C++Linux服務器開發/後台架構師【零聲教育】-學習視頻教程-騰訊課堂

需要更多C/C++ Linux服務器架構師學習資料加qun 812855908 (資料包括C/C++,Linux,golang技術,內核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg,大廠面試題 等)

2.接口定義

代碼如下(示例):

3.回調函數

代碼如下(示例):

4.全部代碼(加註釋)

代碼如下(示例):

關於線程池是基本代碼就在上面了,關於編程這一部分內容,我建議大家還是要自己去動手實現,如果只是單純的看了一遍,知識這塊可能會記住,但是操作起來可能就比較吃力,萬事開頭難,只要堅持下去,總會有結果的。

golang協程調度模式解密

golang學習筆記

頻繁創建線程會造成不必要的開銷,所以才有了線程池。在線程池中預先保存一定數量的線程,新任務發佈到任務隊列,線程池中的線程不斷地從任務隊列中取出任務並執行,可以有效的減少創建和銷毀帶來的開銷。

過多的線程會導致爭搶cpu資源,且上下文的切換的開銷變大。而工作在用戶態的協程能大大減少上下文切換的開銷。協程調度器把可運行的協程逐個調度到線程中執行,同時即時把阻塞的協程調度出協程,從而有效地避免了線程的頻繁切換,達到了少量線程實現高並發的效果。

多個協程分享操作系統分給線程的時間片,從而達到充分利用CPU的目的,協程調度器決定了則決定了協程運行的順序。每個線程同一時刻只能運行一個協程。

go調度模型包含三個實體:

每個處理器維護者一個協程G的隊列,處理器依次將協程G調度到M中執行。

每個P會周期性地查看全局隊列中是否有G待運行並將其調度到M中執行,全局隊列中的G主要來自系統調用中恢復的G.

如果協程發起系統調用,則整個工作線程M被阻塞,協程隊列中的其他協程都會阻塞。

一般情況下M的個數會略大於P個數,多出來的M將會在G產生系統調用時發揮作用。與線程池類似,Go也提供M池子。當協程G1發起系統掉用時,M1會釋放P,由 M1-P-G1 G2 … 轉變成 M1-G1 , M2會接管P的其他協程 M2-P-G2 G3 G4… 。

冗餘的M可能來源於緩存池,也可能是新建的。

當G1結束系統調用後,根據M1是否獲取到P,進行不用的處理。

多個處理P維護隊列可能不均衡,導致部分處理器非常繁忙,而其餘相對空閑。產生原因是有些協程自身不斷地派生協程。

為此Go調度器提供了工作量竊取策略,當某個處理器P沒有需要調度的協程時,將從其他處理中偷取協程,每次偷取一半。

搶佔式調度,是指避免某個協程長時間執行,而阻礙其他協程被調度的機制。

調度器監控每個協程執行時間,一旦執行時間過長且有其他協程等待,會把協程暫停,轉而調度等待的協程,以達到類似時間片輪轉的效果。比如for循環會一直佔用執行權。

在IO密集型應用,GOMAXPROCS大小設置大一些,獲取性能會更好。

IO密集型會經常發生系統調用,會有一個新的M啟用或創建,但由於Go調度器檢測M到被阻塞有一定延遲。如果P數量多,則P管理協程隊列會變小。

如何用go語言每分鐘處理100萬個請求

在Malwarebytes 我們經歷了顯著的增長,自從我一年前加入了硅谷的公司,一個主要的職責成了設計架構和開發一些系統來支持一個快速增長的信息安全公司和所有需要的設施來支持一個每天百萬用戶使用的產品。我在反病毒和反惡意軟件行業的不同公司工作了12年,從而我知道由於我們每天處理大量的數據,這些系統是多麼複雜。

有趣的是,在過去的大約9年間,我參與的所有的web後端的開發通常是通過Ruby on Rails技術實現的。不要錯怪我。我喜歡Ruby on Rails,並且我相信它是個令人驚訝的環境。但是一段時間後,你會開始以ruby的方式開始思考和設計系統,你會忘記,如果你可以利用多線程、並行、快速執行和小內存開銷,軟件架構本來應該是多麼高效和簡單。很多年期間,我是一個c/c++、Delphi和c#開發者,我剛開始意識到使用正確的工具可以把複雜的事情變得簡單些。

作為首席架構師,我不會很關心在互聯網上的語言和框架戰爭。我相信效率、生產力。代碼可維護性主要依賴於你如何把解決方案設計得很簡單。

問題

當工作在我們的匿名遙測和分析系統中,我們的目標是可以處理來自於百萬級別的終端的大量的POST請求。web處理服務可以接收包含了很多payload的集合的JSON數據,這些數據需要寫入Amazon S3中。接下來,map-reduce系統可以操作這些數據。

按照習慣,我們會調研服務層級架構,涉及的軟件如下:

Sidekiq

Resque

DelayedJob

Elasticbeanstalk Worker Tier

RabbitMQ

and so on…

搭建了2個不同的集群,一個提供web前端,另外一個提供後端處理,這樣我們可以橫向擴展後端服務的數量。

但是,從剛開始,在 討論階段我們的團隊就知道我們應該使用Go,因為我們看到這會潛在性地成為一個非常龐大( large traffic)的系統。我已經使用了Go語言大約2年時間,我們開發了幾個系統,但是很少會達到這樣的負載(amount of load)。

我們開始創建一些結構,定義從POST調用得到的web請求負載,還有一個上傳到S3 budket的函數。

type PayloadCollection struct {

WindowsVersion string `json:”version”`

Token string `json:”token”`

Payloads []Payload `json:”data”`

}

type Payload struct {

// [redacted]

}

func (p *Payload) UploadToS3() error {

// the storageFolder method ensures that there are no name collision in

// case we get same timestamp in the key name

storage_path := fmt.Sprintf(“%v/%v”, p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)

encodeErr := json.NewEncoder(b).Encode(payload)

if encodeErr != nil {

return encodeErr

}

// Everything we post to the S3 bucket should be marked ‘private’

var acl = s3.Private

var contentType = “application/octet-stream”

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})

}

本地Go routines方法

剛開始,我們採用了一個非常本地化的POST處理實現,僅僅嘗試把發到簡單go routine的job並行化:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != “POST” {

w.WriteHeader(http.StatusMethodNotAllowed)

return

}

// Read the body into a string for json decoding

var content = PayloadCollection{}

err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)

if err != nil {

w.Header().Set(“Content-Type”, “application/json; charset=UTF-8”)

w.WriteHeader(http.StatusBadRequest)

return

}

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

go payload.UploadToS3() // —– DON’T DO THIS

}

w.WriteHeader(http.StatusOK)

}

對於中小負載,這會對大多數的人適用,但是大規模下,這個方案會很快被證明不是很好用。我們期望的請求數,不在我們剛開始計劃的數量級,當我們把第一個版本部署到生產環境上。我們完全低估了流量。

上面的方案在很多地方很不好。沒有辦法控制我們產生的go routine的數量。由於我們收到了每分鐘1百萬的POST請求,這段代碼很快就崩潰了。

再次嘗試

我們需要找一個不同的方式。自開始我們就討論過, 我們需要保持請求處理程序的生命周期很短,並且進程在後台產生。當然,這是你在Ruby on Rails的世界裏必須要做的事情,否則你會阻塞在所有可用的工作 web處理器上,不管你是使用puma、unicore還是passenger(我們不要討論JRuby這個話題)。然後我們需要利用常用的處理方案來做這些,比如Resque、 Sidekiq、 SQS等。這個列表會繼續保留,因為有很多的方案可以實現這些。

所以,第二次迭代,我們創建了一個緩衝channel,我們可以把job排隊,然後把它們上傳到S3。因為我們可以控制我們隊列中的item最大值,我們有大量的內存來排列job,我們認為只要把job在channel裏面緩衝就可以了。

var Queue chan Payload

func init() {

Queue = make(chan Payload, MAX_QUEUE)

}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

Queue – payload

}

}

接下來,我們再從隊列中取job,然後處理它們。我們使用類似於下面的代碼:

func StartProcessor() {

for {

select {

case job := -Queue:

job.payload.UploadToS3() // — STILL NOT GOOD

}

}

}

說實話,我不知道我們在想什麼。這肯定是一個滿是Red-Bulls的夜晚。這個方法不會帶來什麼改善,我們用了一個 有缺陷的緩衝隊列並發,僅僅是把問題推遲了。我們的同步處理器同時僅僅會上傳一個數據到S3,因為來到的請求遠遠大於單核處理器上傳到S3的能力,我們的帶緩衝channel很快達到了它的極限,然後阻塞了請求處理邏輯的queue更多item的能力。

我們僅僅避免了問題,同時開始了我們的系統掛掉的倒計時。當部署了這個有缺陷的版本後,我們的延時保持在每分鐘以常量增長。

最好的解決方案

我們討論過在使用用Go channel時利用一種常用的模式,來創建一個二級channel系統,一個來queue job,另外一個來控制使用多少個worker來並發操作JobQueue。

想法是,以一個恆定速率並行上傳到S3,既不會導致機器崩潰也不好產生S3的連接錯誤。這樣我們選擇了創建一個Job/Worker模式。對於那些熟悉Java、C#等語言的開發者,可以把這種模式想像成利用channel以golang的方式來實現了一個worker線程池,作為一種替代。

var (

MaxWorker = os.Getenv(“MAX_WORKERS”)

MaxQueue = os.Getenv(“MAX_QUEUE”)

)

// Job represents the job to be run

type Job struct {

Payload Payload

}

// A buffered channel that we can send work requests on.

var JobQueue chan Job

// Worker represents the worker that executes the job

type Worker struct {

WorkerPool chan chan Job

JobChannel chan Job

quit chan bool

}

func NewWorker(workerPool chan chan Job) Worker {

return Worker{

WorkerPool: workerPool,

JobChannel: make(chan Job),

quit: make(chan bool)}

}

// Start method starts the run loop for the worker, listening for a quit channel in

// case we need to stop it

func (w Worker) Start() {

go func() {

for {

// register the current worker into the worker queue.

w.WorkerPool – w.JobChannel

select {

case job := -w.JobChannel:

// we have received a work request.

if err := job.Payload.UploadToS3(); err != nil {

log.Errorf(“Error uploading to S3: %s”, err.Error())

}

case -w.quit:

// we have received a signal to stop

return

}

}

}()

}

// Stop signals the worker to stop listening for work requests.

func (w Worker) Stop() {

go func() {

w.quit – true

}()

}

我們已經修改了我們的web請求handler,用payload創建一個Job實例,然後發到JobQueue channel,以便於worker來獲取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != “POST” {

w.WriteHeader(http.StatusMethodNotAllowed)

return

}

// Read the body into a string for json decoding

var content = PayloadCollection{}

err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)

if err != nil {

w.Header().Set(“Content-Type”, “application/json; charset=UTF-8”)

w.WriteHeader(http.StatusBadRequest)

return

}

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

// let’s create a job with the payload

work := Job{Payload: payload}

// Push the work onto the queue.

JobQueue – work

}

w.WriteHeader(http.StatusOK)

}

在web server初始化時,我們創建一個Dispatcher,然後調用Run()函數創建一個worker池子,然後開始監聽JobQueue中的job。

dispatcher := NewDispatcher(MaxWorker)

dispatcher.Run()

下面是dispatcher的實現代碼:

type Dispatcher struct {

// A pool of workers channels that are registered with the dispatcher

WorkerPool chan chan Job

}

func NewDispatcher(maxWorkers int) *Dispatcher {

pool := make(chan chan Job, maxWorkers)

return Dispatcher{WorkerPool: pool}

}

func (d *Dispatcher) Run() {

// starting n number of workers

for i := 0; i d.maxWorkers; i++ {

worker := NewWorker(d.pool)

worker.Start()

}

go d.dispatch()

}

func (d *Dispatcher) dispatch() {

for {

select {

case job := -JobQueue:

// a job request has been received

go func(job Job) {

// try to obtain a worker job channel that is available.

// this will block until a worker is idle

jobChannel := -d.WorkerPool

// dispatch the job to the worker job channel

jobChannel – job

}(job)

}

}

}

注意到,我們提供了初始化並加入到池子的worker的最大數量。因為這個工程我們利用了Amazon Elasticbeanstalk帶有的docker化的Go環境,所以我們常常會遵守12-factor方法論來配置我們的生成環境中的系統,我們從環境變了讀取這些值。這種方式,我們控制worker的數量和JobQueue的大小,所以我們可以很快的改變這些值,而不需要重新部署集群。

var (

MaxWorker = os.Getenv(“MAX_WORKERS”)

MaxQueue = os.Getenv(“MAX_QUEUE”)

)

直接結果

我們部署了之後,立馬看到了延時降到微乎其微的數值,並未我們處理請求的能力提升很大。

Elastic Load Balancers完全啟動後,我們看到ElasticBeanstalk 應用服務於每分鐘1百萬請求。通常情況下在上午時間有幾個小時,流量峰值超過每分鐘一百萬次。

我們一旦部署了新的代碼,服務器的數量從100台大幅 下降到大約20台。

我們合理配置了我們的集群和自動均衡配置之後,我們可以把服務器的數量降至4x EC2 c4.Large實例,並且Elastic Auto-Scaling設置為如果CPU達到5分鐘的90%利用率,我們就會產生新的實例。

總結

在我的書中,簡單總是獲勝。我們可以使用多隊列、後台worker、複雜的部署設計一個複雜的系統,但是我們決定利用Elasticbeanstalk 的auto-scaling的能力和Go語言開箱即用的特性簡化並發。

我們僅僅用了4台機器,這並不是什麼新鮮事了。可能它們還不如我的MacBook能力強大,但是卻處理了每分鐘1百萬的寫入到S3的請求。

處理問題有正確的工具。當你的 Ruby on Rails 系統需要更強大的web handler時,可以考慮下ruby生態系統之外的技術,或許可以得到更簡單但更強大的替代方案。

一篇文章帶你讀懂 io_uring 的接口與實現

io_uring 是 Linux 提供的一個異步 I/O 接口。io_uring 在 2019 年加入 Linux 內核,經過了兩年的發展,現在已經變得非常強大。本文基於 Linux 5.12.10 介紹 io_uring 接口。

io_uring 的實現主要在 fs/io_uring.c 中。

io_uring 的實現僅僅使用了三個 syscall:io_uring_setup, io_uring_enter 和 io_uring_register。它們分別用於設置 io_uring 上下文,提交並獲取完成任務,以及註冊內核用戶共享的緩衝區。使用前兩個 syscall 已經足夠使用 io_uring 接口了。

用戶和內核通過提交隊列和完成隊列進行任務的提交和收割。後文中會出現大量的簡寫,在這裡先做一些介紹。

用戶通過調用 io_uring_setup 1 初始化一個新的 io_uring 上下文。該函數返回一個 file descriptor,並將 io_uring 支持的功能、以及各個數據結構在 fd 中的偏移量存入 params。用戶根據偏移量將 fd 映射到內存 (mmap) 後即可獲得一塊內核用戶共享的內存區域。這塊內存區域中,有 io_uring 的上下文信息:提交隊列信息 (SQ_RING) 和完成隊列信息 (CQ_RING);還有一塊專門用來存放提交隊列元素的區域 (SQEs)。SQ_RING 中只存儲 SQE 在 SQEs 區域中的序號,CQ_RING 存儲完整的任務完成數據。2

在 Linux 5.12 中,SQE 大小為 64B,CQE 大小為 16B。因此,相同數量的 SQE 和 CQE 所需要的空間不一樣。初始化 io_uring 時,用戶如果不在 params 中設置 CQ 長度,內核會分配 entries 個 SQE,以及 entries * 2 個 CQE。

io_uring_setup 設計的巧妙之處在於,內核通過一塊和用戶共享的內存區域進行消息的傳遞。在創建上下文後,任務提交、任務收割等操作都通過這塊共享的內存區域進行,在 IO_SQPOLL 模式下(後文將詳細介紹),可以完全繞過 Linux 的 syscall 機制完成需要內核介入的操作(比如讀寫文件),大大減少了 syscall 切換上下文、刷 TLB 的開銷。

io_uring 可以處理多種 I/O 相關的請求。比如:

下面以 fsync 為例,介紹執行這個操作中可能用到的結構體和函數。

io_op_def io_op_defs[] 數組中定義了 io_uring 支持的操作,以及它在 io_uring 中的一些參數。3 比如 IORING_OP_FSYNC:

io_uring 中幾乎每個操作都有對應的準備和執行函數。比如 fsync 操作就對應 io_fsync_prep 和 io_fsync函數。

除了 fsync 這種同步(阻塞)操作,內核中還支持一些異步(非阻塞)調用的操作,比如 Direct I/O 模式下的文件讀寫。對於這些操作,io_uring 中還會有一個對應的異步準備函數,以 _async 結尾。比如:

這些函數就是 io_uring 對某個 I/O 操作的包裝。

用戶將需要進行的操作寫入 io_uring 的 SQ 中。在 CQ 中,用戶可以收割任務的完成情況。這裡,我們介紹 SQE 和 CQE 的編碼。

include/uapi/linux/io_uring.h 4 中定義了 SQE 和 CQE。SQE 是一個 64B 大小的結構體,裏面包含了所有操作可能用到的信息。

io_uring_sqe的定義

CQE 是一個 16B 大小的結構體,包含操作的執行結果。

繼續以 fsync 為例。要在 io_uring 中完成 fsync 操作,用戶需要將 SQE 中的 opcode 設置為 IORING_OP_FSYNC,將 fd 設置為需要同步的文件,並填充 fsync_flags。其他操作也是類似,設置 opcode 並將操作所需要的參數並寫入 SQE 即可。

通常來說,使用 io_uring 的程序都需要用到 64 位的 user_data 來唯一標識一個操作 5。user_data 是 SQE 的一部分。io_uring 執行完某個操作後,會將這個操作的 user_data 和操作的返回值一起寫入 CQ 中。

相關視頻推薦

io_uring 新起之秀的linux io模式,是如何媲美epoll的

網絡原理tcp/udp,網絡編程epoll/reactor,面試中正經「八股文」

學習地址:

需要C/C++ Linux服務器架構師學習資料加qun812855908獲取(資料包括 C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg 等),免費分享

io_uring 通過環形隊列和用戶交互。

我們的先以用戶提交任務為例,介紹 io_uring 的內核用戶交互方式。用戶提交任務的過程如下:

接下來我們簡要介紹內核獲取任務、內核完成任務、用戶收割任務的過程。

介紹完 io_uring 的用戶態接口後,我們就可以詳細介紹 io_uring 在內核中是如何實現的了。

io_uring 在創建時有兩個選項,對應着 io_uring 處理任務的不同方式:

這些選項的設定會影響之後用戶與 io_uring 交互的方式:

每個 io_uring 都由一個輕量級的 io-wq6 線程池支持,從而實現 Buffered I/O 的異步執行。對於 Buffered I/O 來說,文件的內容可能在 page cache 里,也可能需要從盤上讀取。如果文件的內容已經在 page cache 中,這些內容可以直接在 io_uring_enter 的時候讀取到,並在返回用戶態時收割。否則,讀寫操作會在 workqueue 里執行。

如果沒有在創建 io_uring 時指定 IORING_SETUP_IOPOLL 選項,io_uring 的操作就會放進 io-wq 中執行。

上圖覆蓋了關閉 IOPOLL 模式下,用戶通過 io_uring 執行操作的整個調用流程。用戶提交的 SQE 經過一系列處理後,會在 io_queue_sqe 中試探着執行一次。

所有的操作都被提交到內核隊列後,如果用戶設置了 IORING_ENTER_GETEVENTS flag,io_uring_enter 在返回用戶態前會等待指定個數的操作完成。

之後,Linux 隨時會調度 io-wq 的內核線程執行。此時,io_wq_submit_work 函數會不斷用阻塞模式執行用戶指定的操作。某個操作完整執行後,它的返回值就會被寫入 CQ 中。用戶通過 io_uring 上下文中的 CQ 隊尾位置就能知道內核處理好了哪些操作,無需再次調用 io_uring_enter。

通過火焰圖可以觀察到,在關閉 IOPOLL 時,內核會花大量時間處理讀取操作。

創建 io_uring 時指定 IORING_SETUP_IOPOLL 選項即可開啟 I/O 輪詢模式。通常來說,用 O_DIRECT 模式打開的文件支持使用輪詢模式讀寫內容,執行 read / write 操作。

在輪詢模式下,io_uring_enter 只負責把操作提交到內核的文件讀寫隊列中。之後,用戶需要多次調用 io_uring_enter 來輪詢操作是否完成。

在輪詢模式下,io-wq 不會被使用。提交任務時,io_read 直接調用內核的 Direct I/O 接口向設備隊列提交任務。

如果用戶設置了 IORING_ENTER_GETEVENTS flag,在返回用戶態前,io_uring_enter 會通過 io_iopoll_check 調用內核接口輪詢任務是否完成。

通過火焰圖可以看到,io_uring_enter 在提交任務這一塊只花了一小部分時間。大部分時間都在輪詢 I/O 操作是否完成。

在實際生產環境中,我們往往會有這樣的需求:往文件中寫入 n 次,然後用 fsync 落盤。在使用 io_uring時,SQ 中的任務不一定會按順序執行。給操作設定 IO_SQE_LINK 選項,就可以建立任務之間的先後關係。IO_SQE_LINK 之後的第一個任務一定在當前任務完成後執行。7

io_uring 內部使用鏈表來管理任務的依賴關係。每一個操作在經過 io_submit_sqe 的處理後,都會變成一個 io_kiocb 對象。這個對象有可能會被放入鏈表中。io_submit_sqe 8 會對含有 IO_SQE_LINK 的 SQE 作特殊處理,處理過程如下:

由此看來,SQ 中連續的 IO_SQE_LINK 記錄會按先後關係依次處理。在 io_submit_sqes 結束前,所有的任務都會被提交。因此,如果任務有先後關係,它們必須在同一個 io_uring_enter syscall 中批量提交。

其他用於控制 io_uring 任務依賴的選項包括 IOSQE_IO_DRAIN 和 IOSQE_IO_HARDLINK,這裡不再展開。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/227236.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-09 16:28
下一篇 2024-12-09 16:28

相關推薦

  • 使用Golang調用Python

    在現代軟件開發中,多種編程語言的協作是相當普遍的。其中一種使用場景是Golang調用Python,這使得在使用Python庫的同時,可以利用Golang的高性能和強大並發能力。這篇…

    編程 2025-04-29
  • Python線程等待指南

    本文將從多個方面詳細講解Python線程等待的相關知識。 一、等待線程結束 在多線程編程中,經常需要等待線程執行完畢再進行下一步操作。可以使用join()方法實現等待線程執行完畢再…

    編程 2025-04-29
  • 使用Golang創建黑色背景圖片的方法

    本文將從多個方面介紹使用Golang創建黑色背景圖片的方法。 一、安裝必要的代碼庫和工具 在開始創建黑色背景圖片之前,我們需要先安裝必要的代碼庫和工具: go get -u git…

    編程 2025-04-29
  • Python兩個線程交替打印1到100

    這篇文章的主題是關於Python多線程的應用。我們將會通過實際的代碼,學習如何使用Python兩個線程交替打印1到100。 一、創建線程 在Python中,我們可以使用Thread…

    編程 2025-04-28
  • ROS線程發佈消息異常解決方法

    針對ROS線程發佈消息異常問題,我們可以從以下幾個方面進行分析和解決。 一、檢查ROS代碼是否正確 首先,我們需要檢查ROS代碼是否正確。可能會出現的問題包括: 是否正確初始化RO…

    編程 2025-04-28
  • Python線程池並發爬蟲

    Python線程池並發爬蟲是實現多線程爬取數據的常用技術之一,可以在一定程度上提高爬取效率和數據處理能力。本文將從多個方面對Python線程池並發爬蟲做詳細的闡述,包括線程池的實現…

    編程 2025-04-27
  • 線程池中的一個線程異常了會被怎麼處理

    本文將從以下幾個方面對線程池中的一個線程異常了會被怎麼處理進行詳細闡述:異常的類型、如何捕獲異常、異常的處理方式。 一、異常的類型 在線程池中,可以出現多種類型的異常,例如線程執行…

    編程 2025-04-27
  • 線程池的七個參數

    在多線程編程中,線程池是一種非常重要的編程模型,可以解決線程創建銷毀的開銷問題,提高程序的效率。在使用線程池時,需要對其七個參數進行配置,以達到最佳性能。下面將從多個方面詳細闡述線…

    編程 2025-04-25
  • Golang中使用strings.Split函數進行字符串分割的方法

    一、Split函數的基本用法 字符串是編程中常見的數據類型,它們可以在程序中被處理、存儲和傳輸。在Go語言中,字符串也是一個基本的數據類型,而strings包提供了一些操作字符串的…

    編程 2025-04-23
  • Java DelayQueue:實現延遲任務的線程安全隊列

    一、DelayQueue的概述 Java的DelayQueue 是一個阻塞隊列隊列,主要用來實現對延遲任務的調度,也就是在指定的時間之後才能夠取出任務來執行。該隊列中保存的元素都必…

    編程 2025-04-23

發表回復

登錄後才能評論