一、kafka概述
Kafka是一個開源的消息系統,最初由LinkedIn公司開發並貢獻給Apache基金會。Kafka的設計目標是使得能夠處理大規模的實時數據流,以及向多個客戶端提供高吞吐量的數據流。Kafka除了作為消息系統使用,還可以用作存儲系統,即將Kafka作為數據存儲工具使用,這是由於Kafka的持久性、可擴展性和快速的讀寫性能。
Kafka的核心是由一個或多個broker和ZooKeeper協調器組成的集群。生產者將消息發送到Kafka的topic,消費者從topic消費消息。每個broker在本地存儲消息,同時通過複製和分區機制實現高可靠性、擴展性和吞吐量。
二、kafka的消息模型
Kafka的消息模型是基於發布/訂閱模式的,在Kafka中被稱為topic。生產者將消息發送到topic,消費者從topic消費消息。一個topic可以有多個訂閱者,而生產者在將消息發送到topic時不必知道哪些訂閱者將會接收到這些消息。Kafka還可以支持動態創建topic。
Kafka的topic可以由一個或多個partition組成,每個partition在存儲上是一個獨立的分片,也就是說一個topic可以跨越多個broker。每個partition內的消息是有序的,並且在單個broker上具有高吞吐量。同時,由於partition的數量可以隨意增加,因此Kafka可以輕鬆地擴展存儲和吞吐量。
對於一個topic,Kafka提供了多個消費者group(消費者組),一個消費者組內可以有多個消費者。對於同一個消費者組內的多個消費者來說,它們會從不同的partition中消費消息,消費進度是獨立的。
三、kafka的核心設計
1、分布式的存儲與分區機制
public class Partition {
private int partitionId;
private Broker leader;
private List replicas;
public Partition(int partitionId, Broker leader, List replicas) {
this.partitionId = partitionId;
this.leader = leader;
this.replicas = replicas;
}
}
Kafka的消息通過partition進行分區,每個partition可以由多個broker進行複製,對於每個partition而言,只有其中一個broker是leader,其餘的broker都是follower。leader負責接收生產者的消息,並將消息寫入到本地存儲中,同時將消息發送給follower以進行備份。如果leader宕機了,Kafka會自動將其中一個follower升級為leader。
2、高吞吐量的消息讀寫
public class Producer {
private KafkaClient kafkaClient;
public Producer(KafkaClient kafkaClient) {
this.kafkaClient = kafkaClient;
}
public void send(String topic, String message) {
Message messageToSend = new Message(message);
kafkaClient.send(messageToSend, topic);
}
}
Kafka的生產者將消息發送到topic,消費者從topic消費消息。Kafka的重要設計之一是零拷貝(Zero Copy),在網絡傳輸和磁盤IO中儘可能地避免數據的複製,從而提升IO效率。
3、多副本與可擴展性
public class Broker {
private int brokerId;
private boolean isLeader;
public Broker(int brokerId, boolean isLeader) {
this.brokerId = brokerId;
this.isLeader = isLeader;
}
public boolean isLeader() {
return isLeader;
}
}
Kafka的吞吐量可以通過添加更多的broker來進行水平擴展。每個broker存儲一個或多個topic的partition數據。Kafka的副本機制保證了消息的可靠性,分布式系統中,單個broker的故障不會影響整個系統的可用性。
四、kafka實戰
1、安裝和配置Kafka
# 安裝Kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.2/kafka_2.13-2.6.2.tgz
tar -xzvf kafka_2.13-2.6.2.tgz
cd kafka_2.13-2.6.2
# 配置Kafka
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
# 修改配置
vi config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
vi config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
2、創建Topic
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
3、啟動Kafka
# 以後台方式啟動Kafka
./bin/kafka-server-start.sh -daemon config/server.properties
# 啟動server-1
./bin/kafka-server-start.sh -daemon config/server-1.properties
# 啟動server-2
./bin/kafka-server-start.sh -daemon config/server-2.properties
4、生產者和消費者
# 啟動控制台生產者
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# 啟動控制台消費者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
五、總結
通過本文對Kafka的深入解析,我們了解了Kafka的概念、消息模型、核心設計和實戰操作。Kafka具有高可靠性、高擴展性和高吞吐量等優點,可以幫助我們處理實時數據流,從而在數據處理和存儲領域發揮重要作用。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/294126.html