什麼是Kafka
Kafka基於Scala和Java語言開發,設計中大量使用了批量處理和異步的思想,最高可以每秒處理百萬級別的消息,是用於構建實時數據管道和流的應用程序。

Kafka的應用場景
Kafka是一個分佈式流式處理平台。流平台具有三個關鍵功能:
- 消息隊列:發佈和訂閱消息流,這個功能類似於消息隊列,這也是Kafka被歸類為消息隊列的原因。
- 容錯的持久方式存儲記錄消息流:Kafka會把消息持久化到磁盤,有效避免消息丟失的風險。
- 流式處理平台:在消息發佈的時候進行處理,Kafka提供了一個完整的流式處理類庫。
Kafka主要有兩大應用場景:
- 消息隊列:建立實時流數據管道,可靠地在系統或應用程序之間獲取數據。
- 數據處理:構建實時的流數據處理程序來轉換或處理數據流。

註: Kafka在2.8預覽版中,採用Raft元數據模式,取消了對Zookeeper的依賴。
Kafka的版本里程碑
| 版本號 | 備註 |
| 0.8 | 引入了副本機制,成為了一個真正意義上完備的分佈式高可靠消息隊列解決方案 |
| 0.8.2 | 新版本 Producer API,即需要指定 Broker 地址的 Producer |
| 0.9 | 增加了基礎的安全認證 / 權限,Java 重寫了新版本消費者 API |
| 0.10 | 引入了 Kafka Streams |
| 0.11 | 提供冪等性 Producer API 以及事務(Transaction) API,對 Kafka 消息格式做了重構。 |
| 1.0 | Kafka Streams 的各種改進 |
| 2.0 | Kafka Streams 的各種改進 |
Kafka的優勢
- 高吞吐、低延時:這是 Kafka 顯著的特點,Kafka 能夠達到百萬級的消息吞吐量,延遲可達毫秒級。
- 持久化存儲:Kafka 的消息最終持久化保存在磁盤之上,提供了順序讀寫以保證性能,並且通過 Kafka 的副本機制提高了數據可靠性。
- 分佈式可擴展:Kafka的數據是分佈式存儲在不同broker節點的,以topic組織數據並且按Partition進行分佈式存儲,整體的擴展性都非常好。
- 高容錯性:集群中任意一個 broker 節點宕機,Kafka 仍能對外提供服務。
Kafka基本結構
Kafka具有四個核心API:
- Producer API: 發佈消息到1個或多個topic(主題)中。
- Consumer API:來訂閱一個或多個topic,並處理產生的消息。
- Streams API:充當一個流處理器,從1個或多個topic消費輸入流,並生產一個輸出流到1個或多個輸出topic,有效地將輸入流轉換到輸出流。
- Connector API:可構建或運行可重用的生產者或消費者,將topic連接到現有的應用程序或數據系統。例如,連接到關係數據庫的連接器可以捕獲表的每個變更。

Kafka的關鍵術語
- Producer:消息和數據的生產者,向Kafka的一個Topic發佈消息的進程/代碼/服務。
- Consumer:消息和數據的消費者,訂閱數據(Topic)並且處理發佈的消息的進程/代碼/服務。
- Consumer Group:對於同一個Topic,會廣播給不同的Group。在一個Group中,一條消息只能被消費組中一個的Consumer消費。
Consumer Group中不能有比Partition數量更多的消費者,否則多出的消費者一直處於空等待,不會收到消息。

- Topic:每條發佈到Kafka集群的消息都有一個類別,這個類別被稱為Topic。作用是對數據進行區分、隔離。
- Broker:Kafka集群中的每個Kafka節點。保存Topic的一個或多個Partition。
- Partition:物理概念,Kafka下數據儲存的基本單元。一個Topic數據,會被分散存儲到多個Partition,每一個Partition都是一個順序的、不可變的消息隊列,並且可以持續的添加消息。

註:
每一個Topic的信息被切分為多個Partitions。若Partition數量設置成1個,則可以保證消息消費的順序性。如果某Topic有N個Partition,集群有N個Broker,那麼每個Broker存儲該topic的一個Partition。如果某Topic有N個Partition,集群有(N+M)個Broker,那麼其中有N個Broker存儲該Topic的一個Partition,剩下的M個Broker不存儲該Topic的Partition數據。如果某Topic有N個Partition,集群中Broker數目少於N個,那麼一個Broker存儲該Topic的一個或多個Partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。當Broker收到消息,根據分區算法選擇將其存儲到哪一個 Partition。其路由機製為優先按照指定Partition來路由;若未指定patition但指定key,則通過對key的value進行hash選出一個patition;如果patition和key都未指定,則輪詢選出一個patition。
- Offset:偏移量,分區中的消息位置,由Kafka自身維護,Consumer消費時也要保存一份Offset以維護消費過的消息位置。
- Replication:同一個Partition可能會有多個副本,多個副本之間數據是一樣的,增加容錯性與可擴展性。
註:
當集群中的有Broker掛掉的情況,系統可以主動的使用Replication提供服務。系統默認設置每一個Topic的Replication係數為1,可以在創建Topic時單獨設置。Replication的基本單位是Topic的Partition。所有的讀和寫都由Leader進,Followers只是做為數據的備份。Follower必須能夠及時複製Leader的數據。
- Replication Leader:一個Partition的多個副本上,需要一個Leader負責該Partition上與Producer和Consumer交互。一個Partition只對應一個Replication Leader。
- Replication Follower:Follower跟隨Leader,所有寫請求都會廣播給所有Follower,Follower與Leader保持數據同步。
- ReplicaManager:負責管理當前Broker所有分區和副本的信息,處理KafkaController發起的一些請求,副本狀態的切換、添加/讀取消息等。
- Rebalance。消費者組內某個消費者實例掛掉後,其他消費者實例自動重新分配訂閱主題分區的過程。Rebalance是Kafka消費者端實現高可用的重要手段。

Kafka通過Zookeeper管理集群配置,選舉Leader,以及在Consumer Group發生變化時進行Rebalance。
Kafka的複製機制
如何將所有Replication均勻分佈到整個集群
為了更好的做負載均衡,Kafka盡量將所有的Partition均勻分配到整個集群上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。同時為了提高Kafka的容錯能力,也需要將同一個Partition的Replication盡量分散到不同的機器。如果所有的Replication都在同一個Broker上,那一旦該Broker宕機,該Partition的所有Replication都無法工作,也就達不到HA的效果。同時,如果某個Broker宕機了,需要保證它上面的負載可以被均勻的分配到其它倖存的所有Broker上。
Kafka分配Replication的算法如下:
- 將所有Broker(假設共n個Broker)和待分配的Partition排序。
- 將第i個Partition分配到第(i % n)個Broker上。
- 將第i個Partition的第j個Replication分配到第((i + j) % n)個Broker上。
HW高水位與LEO
HW是High Watermark的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(Offset),消費者只能拉取到這個Offset之前的消息。

如圖所示,它代表一個日誌文件,這個日誌文件中有 9 條消息,第一條消息的Offset(LogStartOffset)為0,最後一條消息的Offset為8,Offset為9的消息用虛線框表示,代表下一條待寫入的消息。日誌文件的HW為6,表示消費者只能拉取到Offset在0至5之間的消息,而Offset為6的消息對消費者而言是不可見的。
LEO是Log End Offset的縮寫,它標識當前日誌文件中下一條待寫入消息的Offset,圖中Offset為9的位置即為當前日誌文件的LEO,LEO的大小相當於當前日誌分區中最後一條消息的Offset值加1。分區ISR集合中的每個副本都會維護自身的LEO,而ISR集合中最小的LEO即為分區的HW,對消費者而言只能消費HW之前的消息。
ISR副本集合
ISR全稱是「In-Sync Replicas」,是分區中正在與Leader副本進行同步的Replication列表。正常情況下ISR必定包含Leader副本。
ISR列表是持久化在Zookeeper中的,任何在ISR列表中的副本都有資格參與Leader選舉。
ISR列表是動態變化的,副本被包含在ISR列表中的條件是由參數replica.lag.time.max.ms控制的,參數含義是副本同步落後於Leader的最大時間間隔,默認10s,意思就是如果說某個Follower所在的Broker因為JVM FullGC之類的問題,卡頓相對Leader延時超過10s,就會被從 ISR 中排除。Kafka之所以這樣設計,主要是為了減少消息丟失,只有與Leader副本進行實時同步的Follower副本才有資格參與Leader選舉,這裡指相對實時。

註:
分區中的所有副本統稱為AR(Assigned Replicas)。ISR集合是AR集合中的一個子集。與Leader副本同步滯後過多的副本(不包括Leader副本)組成OSR(Out-of-Sync Replicas)
複製機制
如圖所示,假設某個分區的ISR集合中有3個副本,即一個Leader副本和2個Follower副本,此時分區的LEO和HW都為3。消息3和消息4從生產者發出之後會被先存入Leader副本。




在消息寫入Leader副本之後,Follower副本會發送拉取請求來拉取消息3和消息4以進行消息同步。
在同步過程中,不同的Follower副本的同步效率也不盡相同。在某一時刻Follower1完全跟上了Leader副本而Follower2隻同步了消息3,如此Leader副本的LEO為5,Follower1的LEO為5,Follower2的LEO為4,那麼當前分區的HW取最小值4,此時消費者可以消費到offset為0至3之間的消息。
當所有的副本都成功寫入了消息3和消息4,整個分區的HW和LEO都變為5,因此消費者可以消費到offset為4的消息了。
關於讀寫分離
Kafka並不支持讀寫分區,生產消費端所有的讀寫請求都是由Replication Leader副本處理的,Replication Follower副本的主要工作就是從Leader副本處異步拉取消息,進行消息數據的同步,並不對外提供讀寫服務。
Kafka之所以這樣設計,主要是為了保證讀寫一致性,因為副本同步是一個異步的過程,如果當Follower副本還沒完全和Leader同步時,從Follower副本讀取數據可能會讀不到最新的消息。
Kafka的消息發送機制
Producer採用push模式將消息發佈到Broker,每條消息都被append到patition中,屬於順序寫磁盤(順序寫磁盤效率比隨機寫內存要高,保障kafka吞吐率)。
Producer寫入消息序列圖如下所示:

流程說明:
- Producer先從Zookeeper的”/brokers/…/state”節點找到該Partition的Leader。
- Producer將消息發送給該Leader。
- Leader將消息寫入本地log。
- followers從Leader pull消息,寫入本地log後Leader發送ACK。
- Leader收到所有ISR中的replica的ACK後,增加HW並向Producer發送ACK。
Broker保存消息
每個patition物理上對應一個文件夾(該文件夾存儲該patition的所有消息和索引文件)
無論消息是否被消費,Kafka都會保留所有消息。有兩種策略可以刪除舊數據:
基於時間:log.retention.hours=168基於大小:log.retention.bytes=1073741824
Consumer消費消息
Kafka集群保持所有的消息,直到它們過期(無論消息是否被消費)。實際上消費者所持有的僅有的元數據就是這個offset(偏移量),也就是說offset由消費者來控制:正常情況當消費者消費消息的時候,偏移量也線性的的增加。但是實際偏移量由消費者控制,消費者可以將偏移量重置為更早的位置,重新讀取消息。可以看到這種設計對消費者來說操作自如,一個消費者的操作不會影響其它消費者對此log的處理。

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/281205.html
微信掃一掃
支付寶掃一掃