本文目錄一覽:
Golang-基於TimeingWheel定時器
在linux下實現定時器主要有如下方式
在這當中 基於時間輪方式實現的定時器 時間複雜度最小,效率最高,然而我們可以通過 優先隊列 實現時間輪定時器。
優先隊列的實現可以使用最大堆和最小堆,因此在隊列中所有的數據都可以定義排序規則自動排序。我們直接通過隊列中 pop 函數獲取數據,就是我們按照自定義排序規則想要的數據。
在 Golang 中實現一個優先隊列異常簡單,在 container/head 包中已經幫我們封裝了,實現的細節,我們只需要實現特定的介面就可以。
下面是官方提供的例子
因為優先隊列底層數據結構是由二叉樹構建的,所以我們可以通過數組來保存二叉樹上的每一個節點。
改數組需要實現 Go 預先定義的介面 Len , Less , Swap , Push , Pop 和 update 。
timerType結構是定時任務抽象結構
首先的 start 函數,當創建一個 TimeingWheel 時,通過一個 goroutine 來執行 start ,在start中for循環和select來監控不同的channel的狀態
通過for循環從隊列中取數據,直到該隊列為空或者是遇見第一個當前時間比任務開始時間大的任務, append 到 expired 中。因為優先隊列中是根據 expiration 來排序的,
所以當取到第一個定時任務未到的任務時,表示該定時任務以後的任務都未到時間。
當 getExpired 函數取出隊列中要執行的任務時,當有的定時任務需要不斷執行,所以就需要判斷是否該定時任務需要重新放回優先隊列中。 isRepeat 是通過判斷任務中 interval 是否大於 0 判斷,
如果大於0 則,表示永久就生效。
防止外部濫用,阻塞定時器協程,框架又一次封裝了timer這個包,名為 timer_wapper 這個包,它提供了兩種調用方式。
參數和上面的參數一樣,只是在第三個參數中使用了任務池,將定時任務放入了任務池中。定時任務的本身執行就是一個 put 操作。
至於put以後,那就是 workers 這個包管理的了。在 worker 包中, 也就是維護了一個任務池,任務池中的任務會有序的執行,方便管理。
C語言等待一定時間輸入自動結束?
準備好linux編程環境,現場手撕定時器實現【linux伺服器開發】
工程師的聖地—Linux內核, 談談內核的架構
c/c++ linux伺服器開發學習地址:C/C++Linux伺服器開發/後台架構師【零聲教育】-學習視頻教程-騰訊課堂
上圖是5個時間輪級聯的效果圖。中間的大輪是工作輪,只有在它上的任務才會被執行;其他輪上的任務時間到後遷移到下一級輪上,他們最終都會遷移到工作輪上而被調度執行。
多級時間輪的原理也容易理解:就拿時鐘做說明,秒針轉動一圈分針轉動一格;分針轉動一圈時針轉動一格;同理時間輪也是如此:當低級輪轉動一圈時,高一級輪轉動一格,同時會將高一級輪上的任務重新分配到低級輪上。從而實現了多級輪級聯的效果。
1.1 多級時間輪對象
多級時間輪應該至少包括以下內容:
每一級時間輪對象
輪子上指針的位置
關於輪子上指針的位置有一個比較巧妙的辦法:那就是位運算。比如定義一個無符號整型的數:
通過獲取當前的系統時間便可以通過位操作轉換為時間輪上的時間,通過與實際時間輪上的時間作比較,從而確定時間輪要前進調度的時間,進而操作對應時間輪槽位對應的任務。
為什麼至少需要這兩個成員呢?
定義多級時間輪,首先需要明確的便是級聯的層數,也就是說需要確定有幾個時間輪。
輪子上指針位置,就是當前時間輪運行到的位置,它與真實時間的差便是後續時間輪需要調度執行,它們的差值是時間輪運作起來的驅動力。
多級時間輪對象的定義
//實現5級時間輪 範圍為0~ (2^8 * 2^6 * 2^6 * 2^6 *2^6)=2^32struct tvec_base{ unsigned long current_index; pthread_t thincrejiffies; pthread_t threadID; struct tvec_root tv1; /*第一個輪*/ struct tvec tv2; /*第二個輪*/ struct tvec tv3; /*第三個輪*/ struct tvec tv4; /*第四個輪*/ struct tvec tv5; /*第五個輪*/};
1.2 時間輪對象
我們知道每一個輪子實際上都是一個哈希表,上面我們只是實例化了五個輪子的對象,但是五個輪子具體包含什麼,有幾個槽位等等沒有明確(即struct tvec和struct tvec_root)。
#define TVN_BITS 6#define TVR_BITS 8#define TVN_SIZE (1
此外,每一個時間輪都是哈希表,因此它的類型應該至少包含兩個指針域來實現雙向鏈表的功能。這裡我們為了方便使用通用的struct list_head的雙向鏈表結構。
1.3 定時任務對象
定時器的主要工作是為了在未來的特定時間完成某項任務,而這個任務經常包含以下內容:
任務的處理邏輯(回調函數)
任務的參數
雙向鏈表節點
到時時間
定時任務對象的定義
typedef void (*timeouthandle)(unsigned long ); struct timer_list{ struct list_head entry; //將時間連接成鏈表 unsigned long expires; //超時時間 void (*function)(unsigned long); //超時後的處理函數 unsigned long data; //處理函數的參數 struct tvec_base *base; //指向時間輪};
在時間輪上的效果圖:
【文章福利】需要C/C++ Linux伺服器架構師學習資料加群812855908(資料包括C/C++,Linux,golang技術,內核,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等)
1.4 雙向鏈表
在時間輪上我們採用雙向鏈表的數據類型。採用雙向鏈表的除了操作上比單鏈表複雜,多佔一個指針域外沒有其他不可接收的問題。而多佔一個指針域在今天大內存的時代明顯不是什麼問題。至於雙向鏈表操作的複雜性,我們可以通過使用通用的struct list結構來解決,因為雙向鏈表有眾多的標準操作函數,我們可以通過直接引用list.h頭文件來使用他們提供的介面。
struct list可以說是一個萬能的雙向鏈表操作框架,我們只需要在自定義的結構中定義一個struct list對象即可使用它的標準操作介面。同時它還提供了一個類似container_of的介面,在應用層一般叫做list_entry,因此我們可以很方便的通過struct list成員找到自定義的結構體的起始地址。
關於應用層的log.h, 我將在下面的代碼中附上該文件。如果需要內核層的實現,可以直接從linux源碼中獲取。
1.5 聯結方式
多級時間輪效果圖:
二. 多級時間輪C語言實現
2.1 雙向鏈表頭文件: list.h
提到雙向鏈表,很多的源碼工程中都會實現一系列的統一的雙向鏈表操作函數。它們為雙向鏈表封裝了統計的介面,使用者只需要在自定義的結構中添加一個struct list_head結構,然後調用它們提供的介面,便可以完成雙向鏈表的所有操作。這些操作一般都在list.h的頭文件中實現。Linux源碼中也有實現(內核態的實現)。他們實現的方式基本完全一樣,只是實現的介面數量和功能上稍有差別。可以說這個list.h文件是學習操作雙向鏈表的不二選擇,它幾乎實現了所有的操作:增、刪、改、查、遍歷、替換、清空等等。這裡我拼湊了一個源碼中的log.h函數,終於湊夠了多級時間輪中使用到的介面。
#if !defined(_BLKID_LIST_H) !defined(LIST_HEAD)#define _BLKID_LIST_H#ifdef __cplusplus extern “C” {#endif/* * Simple doubly linked list implementation. * * Some of the internal functions (“__xxx”) are useful when * manipulating whole lists rather than single entries, as * sometimes we already know the next/prev entries and we can * generate better code by using them directly rather than * using the generic single-entry routines. */struct list_head { struct list_head *next, *prev;};#define LIST_HEAD_INIT(name) { (name), (name) }#define LIST_HEAD(name) \ struct list_head name = LIST_HEAD_INIT(name)#define INIT_LIST_HEAD(ptr) do { \ (ptr)-next = (ptr); (ptr)-prev = (ptr); \} while (0)static inline void__list_add(struct list_head *entry, struct list_head *prev, struct list_head *next){ next-prev = entry; entry-next = next; entry-prev = prev; prev-next = entry;}/** * Insert a new element after the given list head. The new element does not * need to be initialised as empty list. * The list changes from: * head → some element → … * to * head → new element → older element → … * * Example: * struct foo *newfoo = malloc(…); * list_add(newfoo-entry, bar-list_of_foos); * * @param entry The new element to prepend to the list. * @param head The existing list. */static inline voidlist_add(struct list_head *entry, struct list_head *head){ __list_add(entry, head, head-next);}/** * Append a new element to the end of the list given with this list head. * * The list changes from: * head → some element → … → lastelement * to * head → some element → … → lastelement → new element * * Example: * struct foo *newfoo = malloc(…); * list_add_tail(newfoo-entry, bar-list_of_foos); * * @param entry The new element to prepend to the list. * @param head The existing list. */static inline voidlist_add_tail(struct list_head *entry, struct list_head *head){ __list_add(entry, head-prev, head);}static inline void__list_del(struct list_head *prev, struct list_head *next){ next-prev = prev; prev-next = next;}/** * Remove the element from the list it is in. Using this function will reset * the pointers to/from this element so it is removed from the list. It does * NOT free the element itself or manipulate it otherwise. * * Using list_del on a pure list head (like in the example at the top of * this file) will NOT remove the first element from * the list but rather reset the list as empty list. * * Example: * list_del(foo-entry); * * @param entry The element to remove. */static inline voidlist_del(struct list_head *entry){ __list_del(entry-prev, entry-next);}static inline voidlist_del_init(struct list_head *entry){ __list_del(entry-prev, entry-next); INIT_LIST_HEAD(entry);}static inline void list_move_tail(struct list_head *list, struct list_head *head){ __list_del(list-prev, list-next); list_add_tail(list, head);}/** * Check if the list is empty. * * Example: * list_empty(bar-list_of_foos); * * @return True if the list contains one or more elements or False otherwise. */static inline intlist_empty(struct list_head *head){ return head-next == head;}/** * list_replace – replace old entry by new one * @old : the element to be replaced * @new : the new element to insert * * If @old was empty, it will be overwritten. */static inline void list_replace(struct list_head *old, struct list_head *new){ new-next = old-next; new-next-prev = new; new-prev = old-prev; new-prev-next = new;}/** * Retrieve the first list entry for the given list pointer. * * Example: * struct foo *first; * first = list_first_entry(bar-list_of_foos, struct foo, list_of_foos); * * @param ptr The list head * @param type Data type of the list element to retrieve * @param member Member name of the struct list_head field in the list element. * @return A pointer to the first list element. */#define list_first_entry(ptr, type, member) \ list_entry((ptr)-next, type, member)static inline void list_replace_init(struct list_head *old, struct list_head *new){ list_replace(old, new); INIT_LIST_HEAD(old);}/** * list_entry – get the struct for this entry * @ptr: the struct list_head pointer. * @type: the type of the struct this is embedded in. * @member: the name of the list_struct within the struct. */#define list_entry(ptr, type, member) \ ((type *)((char *)(ptr)-(unsigned long)(((type *)0)-member)))/** * list_for_each – iterate over elements in a list * @pos: the struct list_head to use as a loop counter. * @head: the head for your list. */#define list_for_each(pos, head) \ for (pos = (head)-next; pos != (head); pos = pos-next)/** * list_for_each_safe – iterate over elements in a list, but don’t dereference * pos after the body is done (in case it is freed) * @pos: the struct list_head to use as a loop counter. * @pnext: the struct list_head to use as a pointer to the next item. * @head: the head for your list (not included in iteration). */#define list_for_each_safe(pos, pnext, head) \ for (pos = (head)-next, pnext = pos-next; pos != (head); \ pos = pnext, pnext = pos-next)#ifdef __cplusplus}#endif#endif /* _BLKID_LIST_H */
這裡面一般會用到一個重要實現:container_of, 它的原理這裡不敘述
2.2 調試信息頭文件: log.h
這個頭文件實際上不是必須的,我只是用它來添加調試信息(代碼中的errlog(), log()都是log.h中的宏函數)。它的效果是給列印的信息加上顏色,效果如下:
log.h的代碼如下:
#ifndef _LOG_h_#define _LOG_h_#include #define COL(x) “\033[;” #x “m”#define RED COL(31)#define GREEN COL(32)#define YELLOW COL(33)#define BLUE COL(34)#define MAGENTA COL(35)#define CYAN COL(36)#define WHITE COL(0)#define GRAY “\033[0m”#define errlog(fmt, arg…) do{ \ printf(RED”[#ERROR: Toeny Sun:”GRAY YELLOW” %s:%d]:”GRAY WHITE fmt GRAY, __func__, __LINE__, ##arg);\}while(0)#define log(fmt, arg…) do{ \ printf(WHITE”[#DEBUG: Toeny Sun: “GRAY YELLOW”%s:%d]:”GRAY WHITE fmt GRAY, __func__, __LINE__, ##arg);\}while(0)#endif
2.3 時間輪代碼: timewheel.c
/* *毫秒定時器 採用多級時間輪方式 借鑒linux內核中的實現 *支持的範圍為1 ~ 2^32 毫秒(大約有49天) *若設置的定時器超過最大值 則按最大值設置定時器 **/#include #include #include #include #include #include #include “list.h”#include “log.h” #define TVN_BITS 6#define TVR_BITS 8#define TVN_SIZE (1current_index (TVR_BITS + (N) * TVN_BITS)) TVN_MASK) typedef void (*timeouthandle)(unsigned long ); struct timer_list{ struct list_head entry; //將時間連接成鏈表 unsigned long expires; //超時時間 void (*function)(unsigned long); //超時後的處理函數 unsigned long data; //處理函數的參數 struct tvec_base *base; //指向時間輪}; struct tvec { struct list_head vec[TVN_SIZE];}; struct tvec_root{ struct list_head vec[TVR_SIZE];}; //實現5級時間輪 範圍為0~ (2^8 * 2^6 * 2^6 * 2^6 *2^6)=2^32struct tvec_base{ unsigned long current_index; pthread_t thincrejiffies; pthread_t threadID; struct tvec_root tv1; /*第一個輪*/ struct tvec tv2; /*第二個輪*/ struct tvec tv3; /*第三個輪*/ struct tvec tv4; /*第四個輪*/ struct tvec tv5; /*第五個輪*/}; static void internal_add_timer(struct tvec_base *base, struct timer_list *timer){ struct list_head *vec; unsigned long expires = timer-expires; unsigned long idx = expires – base-current_index;#if 1 if( (signed long)idx 0 ) /*這裡是沒有辦法區分出是過時還是超長定時的吧?*/ { vec = base-tv1.vec + (base-current_index TVR_MASK);/*放到第一個輪的當前槽*/ } else if ( idx TVR_SIZE ) /*第一個輪*/ { int i = expires TVR_MASK; vec = base-tv1.vec + i; } else if( idx 1 (TVR_BITS + TVN_BITS) )/*第二個輪*/ { int i = (expires TVR_BITS) TVN_MASK; vec = base-tv2.vec + i; } else if( idx 1 (TVR_BITS + 2 * TVN_BITS) )/*第三個輪*/ { int i = (expires (TVR_BITS + TVN_BITS)) TVN_MASK; vec = base-tv3.vec + i; } else if( idx 1 (TVR_BITS + 3 * TVN_BITS) )/*第四個輪*/ { int i = (expires (TVR_BITS + 2 * TVN_BITS)) TVN_MASK; vec = base-tv4.vec + i; } else /*第五個輪*/ { int i; if (idx 0xffffffffUL) { idx = 0xffffffffUL; expires = idx + base-current_index; } i = (expires (TVR_BITS + 3 * TVN_BITS)) TVN_MASK; vec = base-tv5.vec + i; }#else /*上面可以優化吧*/;#endif list_add_tail(timer-entry, vec);} static inline void detach_timer(struct timer_list *timer){ struct list_head *entry = timer-entry; __list_del(entry-prev, entry-next); entry-next = NULL; entry-prev = NULL;} static int __mod_timer(struct timer_list *timer, unsigned long expires){ if(NULL != timer-entry.next) detach_timer(timer); internal_add_timer(timer-base, timer); return 0;} //修改定時器的超時時間外部介面int mod_timer(void *ptimer, unsigned long expires){ struct timer_list *timer = (struct timer_list *)ptimer; struct tvec_base *base; base = timer-base; if(NULL == base) return -1; expires = expires + base-current_index; if(timer-entry.next != NULL timer-expires == expires) return 0; if( NULL == timer-function ) { errlog(“timer’s timeout function is null\n”); return -1; } timer-expires = expires; return __mod_timer(timer,expires);} //添加一個定時器static void __ti_add_timer(struct timer_list *timer){ if( NULL != timer-entry.next ) { errlog(“timer is already exist\n”); return; } mod_timer(timer, timer-expires); } /*添加一個定時器 外部介面 *返回定時器 */void* ti_add_timer(void *ptimewheel, unsigned long expires,timeouthandle phandle, unsigned long arg){ struct timer_list *ptimer; ptimer = (struct timer_list *)malloc( sizeof(struct timer_list) ); if(NULL == ptimer) return NULL; bzero( ptimer,sizeof(struct timer_list) ); ptimer-entry.next = NULL; ptimer-base = (struct tvec_base *)ptimewheel; ptimer-expires = expires; ptimer-function = phandle; ptimer-data = arg; __ti_add_timer(ptimer); return ptimer;} /* *刪除一個定時器 外部介面 * * */void ti_del_timer(void *p){ struct timer_list *ptimer =(struct timer_list*)p; if(NULL == ptimer) return; if(NULL != ptimer-entry.next) detach_timer(ptimer); free(ptimer);}/*時間輪級聯*/ static int cascade(struct tvec_base *base, struct tvec *tv, int index){ struct list_head *pos,*tmp; struct timer_list *timer; struct list_head tv_list; /*將tv[index]槽位上的所有任務轉移給tv_list,然後清空tv[index]*/ list_replace_init(tv-vec + index, tv_list);/*用tv_list替換tv-vec + index*/ list_for_each_safe(pos, tmp, tv_list)/*遍歷tv_list雙向鏈表,將任務重新添加到時間輪*/ { timer = list_entry(pos,struct timer_list,entry);/*struct timer_list中成員entry的地址是pos, 獲取struct timer_list的首地址*/ internal_add_timer(base, timer); } return index;} static void *deal_function_timeout(void *base){ struct timer_list *timer; int ret; struct timeval tv; struct tvec_base *ba = (struct tvec_base *)base; for(;;) { gettimeofday(tv, NULL); while( ba-current_index = (tv.tv_sec*1000 + tv.tv_usec/1000) )/*單位:ms*/ { struct list_head work_list; int index = ba-current_index TVR_MASK;/*獲取第一個輪上的指針位置*/ struct list_head *head = work_list; /*指針指向0槽時,級聯輪需要更新任務列表*/ if(!index (!cascade(ba, ba-tv2, INDEX(0))) ( !cascade(ba, ba-tv3, INDEX(1))) (!cascade(ba, ba-tv4, INDEX(2))) ) cascade(ba, ba-tv5, INDEX(3)); ba-current_index ++; list_replace_init(ba-tv1.vec + index, work_list); while(!list_empty(head)) { void (*fn)(unsigned long); unsigned long data; timer = list_first_entry(head, struct timer_list, entry); fn = timer-function; data = timer-data; detach_timer(timer); (*fn)(data); } } }} static void init_tvr_list(struct tvec_root * tvr){ int i; for( i = 0; ivec[i]);} static void init_tvn_list(struct tvec * tvn){ int i; for( i = 0; ivec[i]);} //創建時間輪 外部介面void *ti_timewheel_create(void ){ struct tvec_base *base; int ret = 0; struct timeval tv; base = (struct tvec_base *) malloc( sizeof(struct tvec_base) ); if( NULL==base ) return NULL; bzero( base,sizeof(struct tvec_base) ); init_tvr_list(base-tv1); init_tvn_list(base-tv2); init_tvn_list(base-tv3); init_tvn_list(base-tv4); init_tvn_list(base-tv5); gettimeofday(tv, NULL); base-current_index = tv.tv_sec*1000 + tv.tv_usec/1000;/*當前時間毫秒數*/ if( 0 != pthread_create(base-threadID,NULL,deal_function_timeout,base) ) { free(base); return NULL; } return base;} static void ti_release_tvr(struct tvec_root *pvr){ int i; struct list_head *pos,*tmp; struct timer_list *pen; for(i = 0; i TVR_SIZE; i++) { list_for_each_safe(pos,tmp,pvr-vec[i]) { pen = list_entry(pos,struct timer_list, entry); list_del(pos); free(pen); } }} static void ti_release_tvn(struct tvec *pvn){ int i; struct list_head *pos,*tmp; struct timer_list *pen; for(i = 0; i TVN_SIZE; i++) { list_for_each_safe(pos,tmp,pvn-vec[i]) { pen = list_entry(pos,struct timer_list, entry); list_del(pos); free(pen); } }} /* *釋放時間輪 外部介面 * */void ti_timewheel_release(void * pwheel){ struct tvec_base *base = (struct tvec_base *)pwheel; if(NULL == base) return; ti_release_tvr(base-tv1); ti_release_tvn(base-tv2); ti_release_tvn(base-tv3); ti_release_tvn(base-tv4); ti_release_tvn(base-tv5); free(pwheel);} /************demo****************/struct request_para{ void *timer; int val;}; void mytimer(unsigned long arg){ struct request_para *para = (struct request_para *)arg; log(“%d\n”,para-val); mod_timer(para-timer,3000); //進行再次啟動定時器 sleep(10);/*定時器依然被阻塞*/ //定時器資源的釋放是在這裡完成的 //ti_del_timer(para-timer);} int main(int argc,char *argv[]){ void *pwheel = NULL; void *timer = NULL; struct request_para *para; para = (struct request_para *)malloc( sizeof(struct request_para) ); if(NULL == para) return 0; bzero(para,sizeof(struct request_para)); //創建一個時間輪 pwheel = ti_timewheel_create(); if(NULL == pwheel) return -1; //添加一個定時器 para-val = 100; para-timer = ti_add_timer(pwheel, 3000, mytimer, (unsigned long)para); while(1) { sleep(2); } //釋放時間輪 ti_timewheel_release(pwheel); return 0;}
2.4 編譯運行
toney@ubantu:/mnt/hgfs/em嵌入式學習記錄/4. timerwheel/2. 多級時間輪$ lsa.out list.h log.h mutiTimeWheel.ctoney@ubantu:/mnt/hgfs/em嵌入式學習記錄/4. timerwheel/2. 多級時間輪$ gcc mutiTimeWheel.c -lpthreadtoney@ubantu:/mnt/hgfs/em嵌入式學習記錄/4. timerwheel/2. 多級時間輪$ ./a.out [#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100[#DEBUG: Toeny Sun: mytimer:370]:100
從結果可以看出:如果添加的定時任務是比較耗時的操作,那麼後續的任務也會被阻塞,可能一直到超時,甚至一直阻塞下去,這個取決於當前任務是否耗時。這個理論上是絕不能接受的:一個任務不應該也不能去影響其他的任務吧。但是目前沒有對此問題進行改進和完善,以後有機會再繼續完善吧。
【golang詳解】go語言GMP(GPM)原理和調度
Goroutine調度是一個很複雜的機制,下面嘗試用簡單的語言描述一下Goroutine調度機制,想要對其有更深入的了解可以去研讀一下源碼。
首先介紹一下GMP什麼意思:
G ———– goroutine: 即Go協程,每個go關鍵字都會創建一個協程。
M ———- thread內核級線程,所有的G都要放在M上才能運行。
P ———– processor處理器,調度G到M上,其維護了一個隊列,存儲了所有需要它來調度的G。
Goroutine 調度器P和 OS 調度器是通過 M 結合起來的,每個 M 都代表了 1 個內核線程,OS 調度器負責把內核線程分配到 CPU 的核上執行
模型圖:
避免頻繁的創建、銷毀線程,而是對線程的復用。
1)work stealing機制
當本線程無可運行的G時,嘗試從其他線程綁定的P偷取G,而不是銷毀線程。
2)hand off機制
當本線程M0因為G0進行系統調用阻塞時,線程釋放綁定的P,把P轉移給其他空閑的線程執行。進而某個空閑的M1獲取P,繼續執行P隊列中剩下的G。而M0由於陷入系統調用而進被阻塞,M1接替M0的工作,只要P不空閑,就可以保證充分利用CPU。M1的來源有可能是M的緩存池,也可能是新建的。當G0系統調用結束後,根據M0是否能獲取到P,將會將G0做不同的處理:
如果有空閑的P,則獲取一個P,繼續執行G0。
如果沒有空閑的P,則將G0放入全局隊列,等待被其他的P調度。然後M0將進入緩存池睡眠。
如下圖
GOMAXPROCS設置P的數量,最多有GOMAXPROCS個線程分布在多個CPU上同時運行
在Go中一個goroutine最多佔用CPU 10ms,防止其他goroutine被餓死。
具體可以去看另一篇文章
【Golang詳解】go語言調度機制 搶佔式調度
當創建一個新的G之後優先加入本地隊列,如果本地隊列滿了,會將本地隊列的G移動到全局隊列裡面,當M執行work stealing從其他P偷不到G時,它可以從全局G隊列獲取G。
協程經歷過程
我們創建一個協程 go func()經歷過程如下圖:
說明:
這裡有兩個存儲G的隊列,一個是局部調度器P的本地隊列、一個是全局G隊列。新創建的G會先保存在P的本地隊列中,如果P的本地隊列已經滿了就會保存在全局的隊列中;處理器本地隊列是一個使用數組構成的環形鏈表,它最多可以存儲 256 個待執行任務。
G只能運行在M中,一個M必須持有一個P,M與P是1:1的關係。M會從P的本地隊列彈出一個可執行狀態的G來執行,如果P的本地隊列為空,就會想其他的MP組合偷取一個可執行的G來執行;
一個M調度G執行的過程是一個循環機制;會一直從本地隊列或全局隊列中獲取G
上面說到P的個數默認等於CPU核數,每個M必須持有一個P才可以執行G,一般情況下M的個數會略大於P的個數,這多出來的M將會在G產生系統調用時發揮作用。類似線程池,Go也提供一個M的池子,需要時從池子中獲取,用完放回池子,不夠用時就再創建一個。
work-stealing調度演算法:當M執行完了當前P的本地隊列隊列里的所有G後,P也不會就這麼在那躺屍啥都不幹,它會先嘗試從全局隊列隊列尋找G來執行,如果全局隊列為空,它會隨機挑選另外一個P,從它的隊列里中拿走一半的G到自己的隊列中執行。
如果一切正常,調度器會以上述的那種方式順暢地運行,但這個世界沒這麼美好,總有意外發生,以下分析goroutine在兩種例外情況下的行為。
Go runtime會在下面的goroutine被阻塞的情況下運行另外一個goroutine:
用戶態阻塞/喚醒
當goroutine因為channel操作或者network I/O而阻塞時(實際上golang已經用netpoller實現了goroutine網路I/O阻塞不會導致M被阻塞,僅阻塞G,這裡僅僅是舉個栗子),對應的G會被放置到某個wait隊列(如channel的waitq),該G的狀態由_Gruning變為_Gwaitting,而M會跳過該G嘗試獲取並執行下一個G,如果此時沒有可運行的G供M運行,那麼M將解綁P,並進入sleep狀態;當阻塞的G被另一端的G2喚醒時(比如channel的可讀/寫通知),G被標記為,嘗試加入G2所在P的runnext(runnext是線程下一個需要執行的 Goroutine。), 然後再是P的本地隊列和全局隊列。
系統調用阻塞
當M執行某一個G時候如果發生了阻塞操作,M會阻塞,如果當前有一些G在執行,調度器會把這個線程M從P中摘除,然後再創建一個新的操作系統的線程(如果有空閑的線程可用就復用空閑線程)來服務於這個P。當M系統調用結束時候,這個G會嘗試獲取一個空閑的P執行,並放入到這個P的本地隊列。如果獲取不到P,那麼這個線程M變成休眠狀態, 加入到空閑線程中,然後這個G會被放入全局隊列中。
隊列輪轉
可見每個P維護著一個包含G的隊列,不考慮G進入系統調用或IO操作的情況下,P周期性的將G調度到M中執行,執行一小段時間,將上下文保存下來,然後將G放到隊列尾部,然後從隊列中重新取出一個G進行調度。
除了每個P維護的G隊列以外,還有一個全局的隊列,每個P會周期性地查看全局隊列中是否有G待運行並將其調度到M中執行,全局隊列中G的來源,主要有從系統調用中恢復的G。之所以P會周期性地查看全局隊列,也是為了防止全局隊列中的G被餓死。
除了每個P維護的G隊列以外,還有一個全局的隊列,每個P會周期性地查看全局隊列中是否有G待運行並將其調度到M中執行,全局隊列中G的來源,主要有從系統調用中恢復的G。之所以P會周期性地查看全局隊列,也是為了防止全局隊列中的G被餓死。
M0
M0是啟動程序後的編號為0的主線程,這個M對應的實例會在全局變數rutime.m0中,不需要在heap上分配,M0負責執行初始化操作和啟動第一個G,在之後M0就和其他的M一樣了
G0
G0是每次啟動一個M都會第一個創建的goroutine,G0僅用於負責調度G,G0不指向任何可執行的函數,每個M都會有一個自己的G0,在調度或系統調用時會使用G0的棧空間,全局變數的G0是M0的G0
一個G由於調度被中斷,此後如何恢復?
中斷的時候將寄存器里的棧信息,保存到自己的G對象裡面。當再次輪到自己執行時,將自己保存的棧信息複製到寄存器裡面,這樣就接著上次之後運行了。
我這裡只是根據自己的理解進行了簡單的介紹,想要詳細了解有關GMP的底層原理可以去看Go調度器 G-P-M 模型的設計者的文檔或直接看源碼
參考: ()
()
golang協程調度模式解密
golang學習筆記
頻繁創建線程會造成不必要的開銷,所以才有了線程池。在線程池中預先保存一定數量的線程,新任務發布到任務隊列,線程池中的線程不斷地從任務隊列中取出任務並執行,可以有效的減少創建和銷毀帶來的開銷。
過多的線程會導致爭搶cpu資源,且上下文的切換的開銷變大。而工作在用戶態的協程能大大減少上下文切換的開銷。協程調度器把可運行的協程逐個調度到線程中執行,同時即時把阻塞的協程調度出協程,從而有效地避免了線程的頻繁切換,達到了少量線程實現高並發的效果。
多個協程分享操作系統分給線程的時間片,從而達到充分利用CPU的目的,協程調度器決定了則決定了協程運行的順序。每個線程同一時刻只能運行一個協程。
go調度模型包含三個實體:
每個處理器維護者一個協程G的隊列,處理器依次將協程G調度到M中執行。
每個P會周期性地查看全局隊列中是否有G待運行並將其調度到M中執行,全局隊列中的G主要來自系統調用中恢復的G.
如果協程發起系統調用,則整個工作線程M被阻塞,協程隊列中的其他協程都會阻塞。
一般情況下M的個數會略大於P個數,多出來的M將會在G產生系統調用時發揮作用。與線程池類似,Go也提供M池子。當協程G1發起系統掉用時,M1會釋放P,由 M1-P-G1 G2 … 轉變成 M1-G1 , M2會接管P的其他協程 M2-P-G2 G3 G4… 。
冗餘的M可能來源於緩存池,也可能是新建的。
當G1結束系統調用後,根據M1是否獲取到P,進行不用的處理。
多個處理P維護隊列可能不均衡,導致部分處理器非常繁忙,而其餘相對空閑。產生原因是有些協程自身不斷地派生協程。
為此Go調度器提供了工作量竊取策略,當某個處理器P沒有需要調度的協程時,將從其他處理中偷取協程,每次偷取一半。
搶佔式調度,是指避免某個協程長時間執行,而阻礙其他協程被調度的機制。
調度器監控每個協程執行時間,一旦執行時間過長且有其他協程等待,會把協程暫停,轉而調度等待的協程,以達到類似時間片輪轉的效果。比如for循環會一直佔用執行權。
在IO密集型應用,GOMAXPROCS大小設置大一些,獲取性能會更好。
IO密集型會經常發生系統調用,會有一個新的M啟用或創建,但由於Go調度器檢測M到被阻塞有一定延遲。如果P數量多,則P管理協程隊列會變小。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/198097.html