Redis 6引入多線程IO,下面我們來和 Netty 的多線程模型進行對比
分析思路:
- 初始化線程?
- 如何分配client給thread?
- 如何處理讀寫事件,在什麼線程處理?
- 如何處理命令的邏輯,在什麼線程處理?
Netty的多線程模型

用戶代碼
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
//..}});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
複製代碼
初始化線程( ServerBootsrap.bind())
netty初始化線程,創建一個 boss線程池,一個work線程池,並且給new了一個channel處理註冊線程的連接,並且為這個channel 添加了一個 ServerBootstrapAcceptor的channel。
- 操作線程: 主線程執行
- 執行時機: 初始化線程
- 執行代碼: ServerBootsrap.bind()
如何分配 client給thread?
- 操作線程 : 主線程執行
- 執行時機 : 新連接接入
新連接的建立可以分為三個步驟 1.檢測到有新的連接 2.將新連接註冊到work線程組 3.註冊新連接的讀事件
BOSS線程組的 NioEventLoop.run() 不斷檢查所有的管道,當管道狀態為可讀或者連接的時候就會讀取管道。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
複製代碼
然後就按 Channel 的責任鏈傳遞下去
- unsafe.read()
- —->
- pipeline.fireChannelRead(byteBuf);
- —->
- ServerBootstrapAcceptor.channelRead()
- —->
- MultithreadEventLoopGroup.register(child) 分配一個線程給這個channel,一個線程可能擁有多個channel
如何匹配線程
DefaultEventExecutorChooserFactory.java
用線程個數取余來分配
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
AbstractChannel.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 重點!!! 這個線程就永遠被掛靠在channel上面了
AbstractChannel.this.eventLoop = eventLoop;
// 監聽讀事件 NIO底層的註冊
register0(promise);
}
}
複製代碼
如何處理讀寫事件,在什麼線程處理?
ChannelInboundHandler.channelRead
如何處理命令的邏輯,在什麼線程處理?
ChannelInboundHandler.channelRead
總結:Netty通過開始註冊一個Boss線程池(通常情況都是一個),來監聽(NioEventLoop,run)連接的channel,如果有channel來進行連接,就通過責任鏈找到
ServerBootstrapAcceptor.channelRead() 分配給channel一個線程(NioEventLoop),這個線程(NioEventLoop)就會通過run()去不斷的 去讀Channel裡面的,處理命令。
Redis的多線程模型

初始化線程( initThreadedIO() 函數)
- 操作線程 :主線程執行
- 執行時機 :初始化線程
首先,如果用戶沒有開啟多線程IO,也就是 io_thread_num ==1,按照單線程處理; 如果超過線程數IO_THREADS_MAX_NUM上限則異常退出。
創建io_threads_num個線程(listCreate),並且對除主線程(id==0)以外的線程進行處理: (listCreate 創建一個線程)
- 初始化線程的等待任務為0
- 獲取鎖,使得線程不能進行操作
- 將線程tid與Redis中的線程id進行映射
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
// 如果用戶沒有開啟多線程IO直接返回 使用主線程處理
if (server.io_threads_num == 1) return;
// 線程數設置超過上限
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
// 初始化io_threads_num個對應線程
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; // Index 0為主線程,跳過
/* Things we do only for the additional threads. */
// 非主線程則需要以下處理
pthread_t tid;
// 為線程初始化生成對應的鎖
pthread_mutex_init(&io_threads_mutex[i],NULL);
// 線程等待狀態初始化為0
io_threads_pending[i] = 0;
// 初始化後將線程鎖住
pthread_mutex_lock(&io_threads_mutex[i]);
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
// 將index和對應線程ID加以映射
io_threads[i] = tid;
}
}
複製代碼
讀事件到來(readQueryFromClient)
- 操作線程 :主線程執行
- 機制時機 :讀事件到來
Redis需要判斷是否滿足 Thread IO 條件,執行 postponeClientRead,執行後會將 Client放到等待讀取的隊列,並將Client設置為等待讀取的狀態。
// 讀取到一個客戶端的請求
int postponeClientRead(client *c) {
if (io_threads_active && // 線程是否在不斷(spining)等待IO
server.io_threads_do_reads && // 是否多線程IO讀取
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{//client不能是主從,且未處於等待讀取的狀態
// 將Client設置為等待讀取的狀態Flag
c->flags |= CLIENT_PENDING_READ;
// 把client加入到等待讀列表
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
複製代碼
這時server維護了一個 clients_pending_read,包含所有的讀事件 pending的客戶端列表。
如何分配client給thread(線程) (handleClientsWithPendingReadsUsingThreads)
- 操作線程 :主線程執行
- 執行時機 :執行處理事件之後
首先,Redis檢查有等待的讀Client listLength(
server.clients_pending_read)
如果是長度不為0,進行while循環,將每個等待的client分配給線程,當等待長度超過線程時, 每個線程分配給到的client可能超過1個;
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 在線程組取余
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
並且修改每個線程需要完成的數量(初始化為0):
// 所有線程
for (int j = 1; j < server.io_threads_num; j++) {
// 拿出當前線程需要處理多少個客戶端
int count = listLength(io_threads_list[j]);
// 設置當前線程需要多少客戶端
io_threads_pending[j] = count;
}
等待處理直到沒有剩餘任務:
while(1) {
unsigned long pending = 0;
// 拿出所有線程,查看線程是否還有需要的客戶端
// 這裡主要是監聽子線程是否完全處理好任務
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
當本次IO的所有(讀/寫)線程處理完畢之後,清空client_pending_read:
主線程會在這裡處理命令
listRewind(server.clients_pending_read,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~ CLIENT_PENDING_COMMAND;
processCommandAndResetClient(c);
}
processInputBufferAndReplicate(c);
}
listEmpty(server.clients_pending_read);
複製代碼
如何處理讀請求(IOThreadMain)
- 操作線程 :子線程
- 執行時機 : 子線程啟動時 while執行
在上面過程中,當任務分發完畢後,每個線程按照正常流程將自己負責的Client的讀取緩衝區的內容進行處理,和原來的單線程沒有太大差異。
Redis為每個客戶端分配了輸入緩衝區,它的作用是將客戶端發送的命令臨時保存,同時Redis從會輸入緩衝區拉取命令並執行,輸入緩衝區為客戶端發送命令到Redis執行命令提供了緩衝功能。
Redis的 Thread IO 模型中,每次所有的線程都只能進行或者 寫/讀 操作,通過 io_threads_op控制。 同時每個線程負責的client一次執行:
// io thread主函數,在各個子線程執行
void *IOThreadMain(void *myid) {
// 線程 ID,跟普通線程池的操作方式一樣,都是通過 線程ID 進行操作
long id = (unsigned long)myid;
while(1) {
/*
*這裡的等待操作比較特殊,沒有使用簡單的 sleep,
*避免了 sleep 時間設置不當可能導致糟糕的性能,
*但是也有個問題就是頻繁 loop 可能一定程度上造成 cpu 佔用較長
*/
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
// 根據線程 id 以及待分配列表進行 任務分配
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
// 有可能分配了兩個客戶端連接
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
// 當前全局處於寫事件時,向輸出緩衝區寫入響應內容
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// 當前全局處於讀事件時,從輸入緩衝區讀取請求內容
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Donen", id);
}
複製代碼
readQeuryFromClient()->processInputBuffer(c)->processCommand() 進行command的分發和處理。
這裡的readQueueFromClient只做寫入客戶端的輸入緩存區:
// 複製到 Client 緩存區
else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
// 如果我們在 IO線程(子線程)的時候
// 我們不能直接執行命令,flags設置為CLIENT_PENDING_COMMAND
// 然後讓主線程執行
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
}
}
複製代碼
每個線程執行readQueryFromClient,將對應的請求放入一個隊列,單線程執行(從輸入緩存區讀取內容),線程將結果寫入客戶端的buff中。
每輪處理中,需要將各個線程的鎖開啟,並且將相關標誌位:
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---n");
serverAssert(io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
// 解開線程的鎖定狀態
pthread_mutex_unlock(&io_threads_mutex[j]);
// 現在可以開始多線程IO執行對應讀/寫任務
io_threads_active = 1;
}
複製代碼
結束時,首先檢查是否有是否有待讀的IO,如果沒有,將線程說的,標誌關閉:
void stopThreadedIO(void) {
// 需要停止的時候可能還有等待讀的Client 在停止前進行處理
handleClientsWithPendingReadsUsingThreads();
if (tio_debug) { printf("E"); fflush(stdout); }
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
// 本輪IO結束 將所有線程上鎖
pthread_mutex_lock(&io_threads_mutex[j]);
// IO狀態設置為關閉
io_threads_active = 0;
}
複製代碼
總結:Threaded IO將服務讀Client的輸入緩存區和將執行結果寫入輸出緩衝區的過程改為了多線程模型, 同時保持同一時間全部線程均處於讀或寫的狀態。但是命令的具體執行以單線程(隊列)的形式。 因為Redis希望保持堅定結果避免鎖和競爭問題,並且讀寫緩存佔用命令執行聲明周期的比重比大 ,處理這部分IO模型給性能帶來來顯著的提升。
Netty 和 Redis6 區別:
如何分配client給thread?
netty:當Boss監聽到連接事件,netty會給一個channel分配一個線程。這個線程專門負責這條channel的讀寫事件,可以是解析也可以是執行命令
redis6:每當接收到一個讀事件,Client放到等待讀取的隊列。在執行處理事件之後,主線程會 統一給線程池的線程分配client。線程把client要讀buffer都放到client的緩存。主線程等待所有 io線程執行完畢,主線程再執行client的緩存變成命令
如何處理讀寫事件,在什麼線程處理?
netty:在子線程執行,讀寫完直接執行邏輯 redis6:在子線程執行,讀寫完放在client的緩衝區
如何處理命令的邏輯,在什麼線程處理?
netty:在子線程執行,直接執行邏輯 redis6:在主線程執行,主線程遍歷等待隊列讀取緩衝區編譯成命令再執行
為什麼redis選擇使用這個模式?

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/234199.html