一、Kafka冪等性原理
Kafka冪等性保證了消息在發送和消費的過程中不會重複或丟失,其核心原理是基於唯一消息標識符以及多個產生者實例或消費者實例之間的協調。
Kafka中的每一個消息都有一個全局唯一的消息標識符,稱之為消息的唯一標識符(Message UUID)或者消息的Sequence ID。這個標識符是Kafka Broker自動創建的,可以保證消息的順序性和唯一性。
當開啟Kafka冪等性時,每個生產者都會帶上自己的唯一ID並向Kafka Broker發送消息,Broker會根據唯一標識符判斷是否重複;當消費者消費消息時,會自動提交消息的位移偏移量,如果出現重複消費,消費者會自動回滾到上一次提交的位移。
二、Kafka冪等性開啟
Kafka冪等性默認是關閉的,可以通過在Producer配置中添加enable.idempotence=true來開啟,如下所示:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("enable.idempotence", "true"); // 開啟冪等性
props.put("retries", "3");
props.put("batch.size", "16384");
props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
Kafka冪等性只在acks=all的情況下生效,因為只有在確認所有副本均已寫入消息後,才會返回成功響應。
三、Kafka冪等性參數
Kafka冪等性需要額外的參數支持,如下所示:
max.in.flight.requests.per.connection:每個連接的最大並發請求數,默認是5,如果設置為1,則每次只發送一條消息。
retries:重試次數,默認為Integer.MAX_VALUE。
delivery.timeout.ms:允許消息傳輸的最長時間,默認為2分鐘。
四、Kafka冪等性消費
當消費者接收消息時,需要正確地處理冪等性,不僅僅需要考慮消費端的冪等性,還需要考慮生產端的冪等性,如下所示:
public void onMessage(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
Map offsetsToCommit = new HashMap();
for (ConsumerRecord record : records) {
if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判斷是否是第一次消費消息
processRecord(record);
offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // 提交消費位移
} else {
logger.info("Duplicated message received: {}", record.value());
}
}
acknowledge.acknowledge(); // 手動提交位移
if (!offsetsToCommit.isEmpty()) {
kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
}
}
在消費端,通過判斷消息的headers中是否有惟一id,來判斷該消息是否已經被消費,如果已經被消費,則不進行消費操作;否則,處理該消息並提交消費位移。
五、Kafka冪等性配置
除了上述用於開啟冪等性的配置參數,Kafka還有一些其他的配置參數可以幫助我們更好地控制和管理Kafka冪等性,如下所示:
unclean.leader.election.enable:是否允許髒的Leader選舉,默認為false。
min.insync.replicas:多少個副本需要寫入消息才算成功,默認是1。
transactional.id:若要開啟Kafka事務,則需要用到這個參數。
六、Kafka冪等性作用
Kafka冪等性通過去除重複消息,從而降低了系統中數據被重複消費或發生重複操作的概率,避免了一些潛在的並發問題。
此外,Kafka冪等性還可以確保數據的順序性和完整性,從而保證數據一致性。
七、Kafka冪等性寫入
在進行寫入操作時,需要注意如下幾點:
1、為了確保唯一標識符的可用性,可以通過實現自定義序列化器或者使用字元串作為Key。
2、為了實現冪等性,對於重複消息需要進行忽略以及失敗的重新嘗試。
3、為了防止消息丟失,消費者應該盡量快的完成消費並提交位移。
八、Kafka冪等性面試題
1、什麼是Kafka冪等性?
2、Kafka冪等性的原理是什麼?
3、如何在Kafka中開啟冪等性?
4、Kafka冪等性可以保證什麼?有什麼作用?
5、Kafka冪等性有哪些應用場景?
6、Kafka冪等性可以通過哪些參數進行配置?
九、Kafka冪等性和事務
Kafka冪等性與事務是可以一起使用的。通過事務的支持,可以確保原子性、隔離性和持久性。
在使用事務時,需要在Producer的配置中添加transactional.id參數,並將Kafka冪等性的配置參數與事務相關參數進行配合使用,如下所示:
props.put("transaction.id", "transaction-id");
props.put("max.in.flight.requests.per.connection", 1); // 和冪等性相關的配置
props.put("retries", 3); // 和冪等性相關的配置
props.put("enable.idempotence", true); // 開啟冪等性
KafkaProducer producer = new KafkaProducer(props);
十、Kafka冪等性跨分區選取
Kafka並不保證同一組消費者中的每個消費者都可以消費到每個分區中的每個消息,這就需要我們手動為每個消費者分配特定的分區進行消費。在Kafka中,有兩種方式可以對分區進行操作,分別是手動分配和自動分配,手動分配更加靈活,但自動分配更加簡便易行。
對於Kafka冪等性來說,需要注意兩點:
1、在使用手動分配方式時,需要注意確保同一組消費者中,至少一個消費者消費到每個分區。
2、在使用自動分配方式時,需要在啟動消費者時設置partition.assignment.strategy參數為RoundRobinAssignor。
代碼示例
開啟Kafka冪等性:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("enable.idempotence", "true"); // 開啟冪等性
props.put("retries", "3");
props.put("batch.size", "16384");
props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
消費者監聽Kafka消息,並實現冪等性處理:
public void onMessage(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
Map offsetsToCommit = new HashMap();
for (ConsumerRecord record : records) {
if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判斷是否是第一次消費消息
processRecord(record);
offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // 提交消費位移
} else {
logger.info("Duplicated message received: {}", record.value());
}
}
acknowledgment.acknowledge(); // 手動提交位移
if (!offsetsToCommit.isEmpty()) {
kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
}
}
參考資料
1、https://kafka.apache.org/documentation/#producerconfigs_idempotence
2、https://kafka.apache.org/documentation/#consumerconfigs_enable.idempotence
3、https://blog.csdn.net/qq_34707550/article/details/80205655
4、https://www.jianshu.com/p/66b10a39353f
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/280796.html