一、Kafka概述
Kafka是一個分散式、可橫向擴展的消息隊列,是一種高吞吐量的分散式發布訂閱消息系統。Kafka的設計目標是:將發布者與訂閱者解耦,同時提高消息處理速度。Kafka是用Scala編寫的,但是它支持多種編程語言。
二、Kafka架構
Kafka的架構中有四個角色:生產者,消費者,Kafka Broker和ZooKeeper。
1. 生產者
Kafka生產者將消息發布到Kafka Broker。生產者將消息發送到指定的Topic。Topic是寫入消息的主題,「生產者」發布的每條消息都屬於某個主題。生產者必須知道應該將消息發送到哪個Topic中。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception{
String topicName = "my-topic";
String key = "my-key";
String value = "my-value";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
ProducerRecord record = new ProducerRecord(topicName, key, value);
producer.send(record);
producer.close();
}
}
2. 消費者
消費者可以訂閱一個或多個Topic,並消費其中的消息。Kafka消費者將分區中的消息讀取出來,按照順序消費。Kafka消費者在處理消息時是有狀態的,需要記錄已消費消息的偏移量。消費者可以設置偏移量的提交方式,是同步式還是非同步式。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception{
String topicName = "my-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
3. Kafka Broker
Kafka Broker是Kafka集群中的一個,是一個Kafka Server實例。Broker接收生產者發布的消息,並將消息寫入磁碟上的一個或多個分區中。Broker還提供了消費者可以從其訂閱的Partition中讀取消息的服務。
4. ZooKeeper
Kafka依賴ZooKeeper來完成元數據管理、Leader選舉、分區管理等任務。ZooKeeper還支持監視Kafka Broker和Consumer的狀態。
三、Kafka分區和副本
Kafka中的Topic分為多個Partition,每個Partition可以分配多個Replica(副本)。Partition和Replica的數量可以在創建Topic時進行配置。消息Producer將消息通過輪詢演算法發送到一個Partition中的某個Replica。
1. Partition
Partition用於較大的Topic。Kafka中所有消息都必須屬於某個Topic,但是如果一個Topic的消息量很大,需要很高的處理能力來處理每個Message,那麼就可以採用Partition的方式來劃分Message。Partition將一個Topic中的所有消息分為若干個區,每一個區中的消息互不干擾。可以將每個Partition存儲在一個文件中。
2. Replica
副本可以提高讀寫的性能,副本的作用就是讓數據更可靠。Kafka中的Replica(副本)是指能夠複製Partition內容的節點,同一個Topic中,不同Partition的Replica可以配置成不同的個數(即Replication Factor)。一個Partition的所有副本被稱為一個副本集。如果在一個Broker中存儲了多個Partition的多個副本,同一個Broker中相同Partition的副本不能保存在同一個磁碟路徑下。
四、Kafka的可靠性
Kafka的可靠性分為兩個方面:Producer的可靠性和Consumer的可靠性。
1. Producer的可靠性
Kafka保證消息不會丟失是通過副本策略實現的。Producer向Broker中發送消息,Broker接收到消息後將消息寫入磁碟,並同時往一個或多個其它Broker的Replica中也寫入消息。只有將消息發送給所有的Replica後才返回ACK,表示消息已經保存成功,這時Producer才能確定消息已經被保存下來。
2. Consumer的可靠性
Kafka提供了兩種Commit的方式:自動Commit和手動Commit。
- 自動Commit:Consumer在poll方法的過程中可以選擇自動提交偏移量,但是這種方式可能會出現宕機丟失數據的情況。
- 手動Commit:Consumer在處理消息後,可以手動提交偏移量。如果偏移量提交失敗的話,Consumer將會等待一段時間後再次提交,直到提交成功。
原創文章,作者:HCMBX,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/331938.html