Apache Kafka 原理
分區副本機制
Kafka 有三層結構:Kafka 有多個主題,每個主題有多個分區,每個分區又有多條消息。
分區機制:主要解決了單台服務器存儲容量有限和單台服務器並發數限制的問題。一個分片的不同副本不能放到同一個 broker 上。
當主題數據量非常大的時候,一個服務器存放不了,就將數據分成兩個或者多個部分,存放在多台服務器上。每個服務器上的數據,叫做一個分片。
分區對於 Kafka 集群的好處是:實現負載均衡,高存儲能力、高伸縮性。分區對於消費者來說,可以提高並發度,提高效率。
副本:副本備份機制解決了數據存儲的高可用問題。
當數據只保存一份的時候,有丟失的風險。為了更好的容錯和容災,將數據拷貝幾份,保存到不同的機器上。
多個 follower 副本通常存放在和 leader 副本不同的 broker 中。通過這樣的機制實現了高可用,當某台機器掛掉後,其他 follower 副本也能迅速」轉正「,開始對外提供服務。
Kafka 副本的作用:在 kafka 中,實現副本的目的就是冗餘備份,且僅僅是冗餘備份,所有的讀寫請求都是由 leader 副本進行處理的。Follower 副本僅有一個功能,那就是從 leader 副本拉取消息,盡量讓自己跟 leader 副本的內容一致。
Follower 副本不對外提供服務,這樣可以防止出現一些類似於數據庫事務的幻讀臟讀的問題。為了提高一些性能而導致出現數據不一致問題,顯然是不值得的。
Kafka 保證數據不丟失機制
從 Kafka 的大體角度上可以分為數據生產者,Kafka 集群,還有就是消費者,而要保證數據的不丟失也要從這三個角度去考慮。
消息生產者
消息生產者保證數據不丟失 – 消息確認機制(ACK 機制),參考值有三個:0,1,-1。
// producer 無需等待來自 broker 的確認而繼續發送下一批消息。
// 這種情況下數據傳輸效率最高,但是數據可靠性確是最低的。
properties.put(ProducerConfig.ACKS_CONFIG,"0");
// producer 只要收到一個分區副本成功寫入的通知就認為推送消息成功了。
// 這裡有一個地方需要注意,這個副本必須是 leader 副本。
// 只有 leader 副本成功寫入了,producer 才會認為消息發送成功。
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// ack=-1,簡單來說就是,producer 只有收到分區內所有副本的成功寫入的通知才認為推送消息成功了。
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
消息消費者
Kafka 消費消息的模型:
消息隊列:0 1 2 3 4 5 6 7 8 9 10 11 12
Producers wirtes
Consumer A (offset=9) reads
Consumer B (offset=11) reads
消費者丟失數據:由於 Kafka consumer 默認是自動提交位移的(先更新位移,再消費消息),如果消費程序出現故障,沒消費完畢,則丟失了消息,此時,broker 並不知道。
解決方案:
enable.auto.commit=false 關閉自動提交位移。
在消息被完整處理之後再手動提交位移。
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
消息存儲及查詢機制
Kafka 使用日誌文件的方式來保存生產者消息,每條消息都有一個 offset 值來表示它在分區中的偏移量。
Kafka 中存儲的一般都是海量的消息數據,為了避免日誌文件過大,一個分片並不是直接對應在一個磁盤上的日誌文件,而是對應磁盤上的一個目錄,這個目錄的命名規則是 <topic_name>_<partition_id>。
Kafka 容器數據目錄:/kafka/kafka-logs-kafka1
消息存儲機制
Kafka 作為消息中間件,只負責消息的臨時存儲,並不是永久存儲,所以需要刪除過期的數據。如果將所有的數據都存儲在一個文件中,要刪除過期的數據的時候,就變得非常的麻煩。如果將其進行切分成多個文件後,如果要刪除過期數據,就可以根據文件的日期屬性刪除即可。默認只保留 168 小時,即七天之內的數據。因此 Kafka 的數據存儲方案是多文件存儲。
Log 分段:
每個分片目錄中,kafka 通過分段的方式將數據分為多個 LogSegment。
一個 LogSegment 對應磁盤上的一個日誌文件(00000000000000000000.log)和一個索引文件 (00000000000000000000.index)。
其中日誌文件是用來記錄消息的,索引文件是用來保存消息的索引。
每個 LogSegment 的大小可以在 server.properties 中 log.segment.bytes=107370(設置分段大小,默認是 1 GB)選項進行設置。
當 log 文件等於 1 G 時,新的會寫入到下一個 segment 中。
timeindex 文件,是 kafka 的具體時間日誌。
通過 offset 查找 message
存儲的結構:
一個主題 –> 多個分區 –> 多個日誌段(多個文件)。
第一步 – 查詢 segment file:
segment file 命名規則跟 offset 有關,根據 segment file 可以知道它的起始偏移量,因為 Segment file 的命名規則是上一個 segment 文件最後一條消息的 offset 值。所以只要根據 offset 二分查找文件列表,就可以快速定位到具體文件。
比如,第一個 segment file 是 00000000000000000000.index 表示最開始的文件,起始偏移量 (offset) 為 0。第二個是 00000000000000091932.index – 代表消息量起始偏移量為 91933 = 91932 + 1。那麼 offset=5000 時應該定位 00000000000000000000.index。
第二步 – 通過 segment file 查找 message:
通過第一步定位到 segment file,當 offset=5000 時,依次定位到 00000000000000000000.index 的元數據物理位置和 00000000000000000000.log 的物理偏移地址,然後再通過 00000000000000000000.log 順序查找直到 offset=5000 為止。
生產者消息分發策略
Kafka 在數據生產的時候,有一個數據分發策略。默認的情況使用 DefaultPartitioner.class 類。
這個類中就定義數據分發的策略:
public interface Partitioner extends Configurable, Closeable {
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes The serialized key to partition on( or null if no key)
* @param value The value to partition on or null
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* This is called when partitioner is closed.
*/
public void close();
}
默認實現類:org.apache.kafka.clients.producer.internals.DefaultPartitioner
1) 如果是用戶指定了 partition,生產就不會調用 DefaultPartitioner.partition() 方法。
數據分發策略的時候,可以指定數據發往哪個partition。
當 ProducerRecord 的構造參數中有 partition 的時候,就可以發送到對應 partition 上。
/**
* Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
* @param value The record contents
* @param headers The headers that will be included in the record
*/
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
}
2) DefaultPartitioner 源碼
如果指定 key,是取決於 key 的 hash 值。
如果不指定 key,輪詢分發。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取該 topic 的分區列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 獲得分區的個數
int numPartitions = partitions.size();
// 如果 key 值為 null; 如果沒有指定 key,那麼就是輪詢
if (keyBytes == null) {
// 維護一個 key 為 topic 的 ConcurrentHashMap,並通過 CAS 操作的方式對 value 值執行遞增 +1 操作
int nextValue = nextValue(topic);
// 獲取該 topic 的可用分區列表
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 如果可用分區大於 0
if (availablePartitions.size() > 0) {
// 執行求余操作,保證消息落在可用分區上
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 指定了 key,key 肯定就不為 null
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
消費者負載均衡機制
同一個分區中的數據,只能被一個消費者組中的一個消費者所消費。例如 P0 分區中的數據不能被 Consumer Group A 中 C1 與 C2 同時消費。
消費組:一個消費組中可以包含多個消費者,properties.put(ConsumerConfig.GROUP_ID_CONFIG, “groupName”);
如果該消費組有四個消費者,主題有四個分區,那麼每人一個。多個消費組可以重複消費消息。
- 如果有 3 個 Partition,p0/p1/p2,同一個消費組有 3 個消費者,c0/c1/c2,則為一一對應關係。
- 如果有 3 個 Partition,p0/p1/p2,同一個消費組有 2 個消費者,c0/c1,則其中一個消費者消費 2 個分區的數據,另一個消費者消費一個分區的數據。
- 如果有 2 個 Partition, p0/p1,同一個消費組有 3 個消費者,c0/c1/c3,則其中有一個消費者空閑,另外 2 個消費者消費分別各自消費一個分區的數據。
Kakfa 配置文件說明
server.properties
1、broker.id = 0
Kafka 集群是由多個節點組成的,每個節點稱為一個 broker,中文翻譯是代理。每個 broker 都有一個不同的 brokerId,由 broker.id 指定,是一個不小於 0 的整數,各 brokerId 必須不同,但不必連續。如果想擴展 kafka 集群,只需引入新節點,分配一個不同的 broker.id 即可。
啟動 kafka 集群時,每一個 broker 都會實例化並啟動一個 kafkaController,並將該 broker 的 brokerId 註冊到 zooKeeper 的相應節點中。集群各 broker 會根據選舉機制選出其中一個 broker 作為 leader,即 leader kafkaController。Leader kafkaController 負責主題的創建與刪除、分區和副本的管理等。當 leader kafkaController 宕機後,其他 broker 會再次選舉出新的 leader kafkaController。
2、log.dir = /export/data/kafka/
Broker 持久化消息到哪裡,數據目錄。
3、log.retention.hours = 168
Log 文件最小存活時間,默認是 168h,即 7 天。相同作用的還有 log.retention.minutes、log.retention.ms。
數據存儲的最大時間超過這個時間會根據 log.cleanup.policy 設置的策略處理數據,也就是消費端能夠多久去消費數據。
log.retention.bytes 和 log.retention.hours 任意一個達到要求,都會執行刪除,會被 topic 創建時的指定參數覆蓋。
4、log.retention.check.interval.ms
多長時間檢查一次是否有 log 文件要刪除。默認是 300000ms,即 5 分鐘。
5、log.retention.bytes
限制單個分區的 log 文件的最大值,超過這個值,將刪除舊的 log,以滿足 log 文件不超過這個值。默認是 -1,即不限制。
6、log.roll.hours
多少時間會生成一個新的 log segment,默認是 168h,即 7 天。相同作用的還有 log.roll.ms、segment.ms。
7、log.segment.bytes
Log segment 多大之後會生成一個新的 log segment,默認是 1073741824,即 1G。
8、log.flush.interval.messages
指定 broker 每收到幾個消息就把消息從內存刷到硬盤(刷盤)。默認是 9223372036854775807。
Kafka 官方不建議使用這個配置,建議使用副本機制和操作系統的後台刷新功能,因為這更高效。這個配置可以根據不同的 topic 設置不同的值,即在創建 topic 的時候設置值。
在 Linux 操作系統中,把數據寫入到文件系統之後,數據其實在操作系統的 page cache 裏面,並沒有刷到磁盤上去。如果此時操作系統掛了,其實數據就丟了。
1、kafka 是多副本的,當配置了同步複製之後。多個副本的數據都在 page cache 裏面,出現多個副本同時掛掉的概率比 1 個副本掛掉,概率就小很多了。
2、操作系統有後台線程,定期刷盤。如果應用程序每寫入 1 次數據,都調用一次 fsync,那性能損耗就很大,所以一般都會在性能和可靠性之間進行權衡。因為對應一個應用來說,雖然應用掛了,只要操作系統不掛,數據就不會丟。
9、log.flush.interval.ms
指定 broker 每隔多少毫秒就把消息從內存刷到硬盤。默認值同 log.flush.interval.messages 一樣, 9223372036854775807。
同 log.flush.interval.messages 一樣,kafka 官方不建議使用這個配置。
10、delete.topic.enable=true
是否允許從物理上刪除 topic。
Kafka 監控與運維
kafka-eagle 概述
在生產環境下,在 Kafka 集群中,消息數據變化是被關注的問題,當業務前提不複雜時,可以使用 Kafka 命令提供帶有 Zookeeper 客戶端工具的工具,可以輕鬆完成工作。隨着業務的複雜性,增加 Group 和 Topic,那麼使用 Kafka 提供命令工具,已經感到無能為力,那麼 Kafka 監控系統目前尤為重要,需要觀察消費者應用的細節。
為了簡化開發者和服務工程師維護 Kafka 集群的工作有一個監控管理工具,叫做 Kafka-eagle。這個管理工具可以很容易地發現分佈在集群中的哪些 topic 分佈不均勻,或者是分區在整個集群分佈不均勻的的情況。它支持管理多個集群、選擇副本、副本重新分配以及創建 topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具。
搭建安裝 kafka-eagle
Kafka-eagle 在 Docker 中沒有鏡像。
環境要求:需要安裝 jdk,啟動 zk 以及 kafka 的服務。
# 啟動 Zookeeper
zkServer.sh start
# 啟動 Kafka
cd /export/servers/kafka/bin
nohup ./kafka-server-start.sh /export/servers/kafka/config/server.properties 2>&1 &
Windows host 文件:
192.168.186.20 kafka1
192.168.186.20 kafka2
192.168.186.20 kafka3
192.168.186.11 node1
192.168.186.12 node2
192.168.186.13 node3
搭建步驟:
1) 下載 kafka-eagle 的源碼包:
Kafka-eagle 官網 – http://download.kafka-eagle.org/
可以從官網上面直接下載最新的安裝包即可 kafka-eagle-bin-1.3.2.tar.gz 這個版本即可。
代碼託管地址 – https://github.com/smartloli/kafka-eagle/releases
2) 上傳安裝包並解壓:
這裡選擇將 kafak-eagle 安裝在 node3 服務器。
如果要解壓的是 zip 格式,需要先安裝命令支持。
yum install unzip
unzip xxxx.zip
# 將安裝包上傳至 node01 服務器的 /export/softwares 路徑下, 然後解壓
cd /export/software/
unzip kafka-eagle.zip
cd kafka-eagle/kafka-eagle-web/target/
tar -zxf kafka-eagle-web-2.0.1-bin.tar.gz -C /export/servers
3) 準備數據庫:
Kafka-eagle 需要使用一個數據庫來保存一些元數據信息,這裡直接使用 MySQL 數據庫來保存即可,在本地執行以下命令創建一個 MySQL 數據庫即可。
可以使用 SQLite 或者 MySQL 數據庫。
-- 進入 mysql 客戶端
create database if not exists eagle character set utf8mb4;
4) 修改 kafka-eagle 配置文件:
cd /export/servers/kafka-eagle-web-1.3.2/conf
vi system-config.properties
# 修改內容如下
######################################
# multi zookeeper & kafka cluster list
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=node1:2181,node2:2181,node3:2181
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
# cluster2.kafka.eagle.offset.storage=zk
######################################
# kafka sqlite jdbc driver address
######################################
# kafka.eagle.driver=org.sqlite.JDBC
# kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# kafka.eagle.username=root
# kafka.eagle.password=www.kafka-eagle.org
######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.1.116:3306/eagle?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=password
默認情況下 MySQL 只允許本機連接到 MySQL 實例中,所以如果要遠程訪問,必須開放權限。
# 修改權限
update user set host = '%' where user ='root';
# 刷新配置
flush privileges;
5) 配置環境變量:
Kafka-eagle 必須配置環境變量,node03 服務器執行以下命令來進行配置環境變量。
vi /etc/profile
# 內容如下:
export KE_HOME=/export/servers/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH
# 讓修改立即生效,執行
source /etc/profile
6) 啟動 kakfa-eagle:
cd /export/servers/kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start
原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/224930.html