Kafka冪等性詳解

一、Kafka冪等性原理

Kafka冪等性保證了消息在發送和消費的過程中不會重複或丟失,其核心原理是基於唯一消息標識符以及多個產生者實例或消費者實例之間的協調。

Kafka中的每一個消息都有一個全局唯一的消息標識符,稱之為消息的唯一標識符(Message UUID)或者消息的Sequence ID。這個標識符是Kafka Broker自動創建的,可以保證消息的順序性和唯一性。

當開啟Kafka冪等性時,每個生產者都會帶上自己的唯一ID並向Kafka Broker發送消息,Broker會根據唯一標識符判斷是否重複;當消費者消費消息時,會自動提交消息的位移偏移量,如果出現重複消費,消費者會自動回滾到上一次提交的位移。

二、Kafka冪等性開啟

Kafka冪等性默認是關閉的,可以通過在Producer配置中添加enable.idempotence=true來開啟,如下所示:

    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("enable.idempotence", "true"); // 開啟冪等性
        props.put("retries", "3");
        props.put("batch.size", "16384");
        props.put("linger.ms", "1");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
    

Kafka冪等性只在acks=all的情況下生效,因為只有在確認所有副本均已寫入消息後,才會返回成功響應。

三、Kafka冪等性參數

Kafka冪等性需要額外的參數支持,如下所示:

max.in.flight.requests.per.connection:每個連接的最大並發請求數,默認是5,如果設置為1,則每次只發送一條消息。

retries:重試次數,默認為Integer.MAX_VALUE。

delivery.timeout.ms:允許消息傳輸的最長時間,默認為2分鐘。

四、Kafka冪等性消費

當消費者接收消息時,需要正確地處理冪等性,不僅僅需要考慮消費端的冪等性,還需要考慮生產端的冪等性,如下所示:

    
        public void onMessage(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
            Map offsetsToCommit = new HashMap();
            for (ConsumerRecord record : records) {
                if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判斷是否是第一次消費消息
                    processRecord(record);
                    offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)); // 提交消費位移
                } else {
                    logger.info("Duplicated message received: {}", record.value());
                }
            }
            acknowledge.acknowledge(); // 手動提交位移
            if (!offsetsToCommit.isEmpty()) {
                kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
            }
        }
    

在消費端,通過判斷消息的headers中是否有惟一id,來判斷該消息是否已經被消費,如果已經被消費,則不進行消費操作;否則,處理該消息並提交消費位移。

五、Kafka冪等性配置

除了上述用於開啟冪等性的配置參數,Kafka還有一些其他的配置參數可以幫助我們更好地控制和管理Kafka冪等性,如下所示:

unclean.leader.election.enable:是否允許髒的Leader選舉,默認為false。

min.insync.replicas:多少個副本需要寫入消息才算成功,默認是1。

transactional.id:若要開啟Kafka事務,則需要用到這個參數。

六、Kafka冪等性作用

Kafka冪等性通過去除重複消息,從而降低了系統中數據被重複消費或發生重複操作的概率,避免了一些潛在的並發問題。

此外,Kafka冪等性還可以確保數據的順序性和完整性,從而保證數據一致性。

七、Kafka冪等性寫入

在進行寫入操作時,需要注意如下幾點:

1、為了確保唯一標識符的可用性,可以通過實現自定義序列化器或者使用字符串作為Key。

2、為了實現冪等性,對於重複消息需要進行忽略以及失敗的重新嘗試。

3、為了防止消息丟失,消費者應該盡量快的完成消費並提交位移。

八、Kafka冪等性面試題

1、什麼是Kafka冪等性?

2、Kafka冪等性的原理是什麼?

3、如何在Kafka中開啟冪等性?

4、Kafka冪等性可以保證什麼?有什麼作用?

5、Kafka冪等性有哪些應用場景?

6、Kafka冪等性可以通過哪些參數進行配置?

九、Kafka冪等性和事務

Kafka冪等性與事務是可以一起使用的。通過事務的支持,可以確保原子性、隔離性和持久性。

在使用事務時,需要在Producer的配置中添加transactional.id參數,並將Kafka冪等性的配置參數與事務相關參數進行配合使用,如下所示:

    
        props.put("transaction.id", "transaction-id");
        props.put("max.in.flight.requests.per.connection", 1); // 和冪等性相關的配置
        props.put("retries", 3); // 和冪等性相關的配置
        props.put("enable.idempotence", true); // 開啟冪等性
        KafkaProducer producer = new KafkaProducer(props);
    

十、Kafka冪等性跨分區選取

Kafka並不保證同一組消費者中的每個消費者都可以消費到每個分區中的每個消息,這就需要我們手動為每個消費者分配特定的分區進行消費。在Kafka中,有兩種方式可以對分區進行操作,分別是手動分配和自動分配,手動分配更加靈活,但自動分配更加簡便易行。

對於Kafka冪等性來說,需要注意兩點:

1、在使用手動分配方式時,需要注意確保同一組消費者中,至少一個消費者消費到每個分區。

2、在使用自動分配方式時,需要在啟動消費者時設置partition.assignment.strategy參數為RoundRobinAssignor

代碼示例

開啟Kafka冪等性:

    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("enable.idempotence", "true"); // 開啟冪等性
        props.put("retries", "3");
        props.put("batch.size", "16384");
        props.put("linger.ms", "1");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
    

消費者監聽Kafka消息,並實現冪等性處理:

    
        public void onMessage(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
            Map offsetsToCommit = new HashMap();
            for (ConsumerRecord record : records) {
                if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判斷是否是第一次消費消息
                    processRecord(record);
                    offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)); // 提交消費位移
                } else {
                    logger.info("Duplicated message received: {}", record.value());
                }
            }
            acknowledgment.acknowledge(); // 手動提交位移
            if (!offsetsToCommit.isEmpty()) {
                kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
            }
        }
    

參考資料

1、https://kafka.apache.org/documentation/#producerconfigs_idempotence

2、https://kafka.apache.org/documentation/#consumerconfigs_enable.idempotence

3、https://blog.csdn.net/qq_34707550/article/details/80205655

4、https://www.jianshu.com/p/66b10a39353f

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/280796.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-21 13:05
下一篇 2024-12-21 13:05

相關推薦

  • Python消費Kafka數據指南

    本文將為您詳細介紹如何使用Python消費Kafka數據,旨在幫助讀者快速掌握這一重要技能。 一、Kafka簡介 Kafka是一種高性能和可伸縮的分布式消息隊列,由Apache軟件…

    編程 2025-04-28
  • Linux sync詳解

    一、sync概述 sync是Linux中一個非常重要的命令,它可以將文件系統緩存中的內容,強制寫入磁盤中。在執行sync之前,所有的文件系統更新將不會立即寫入磁盤,而是先緩存在內存…

    編程 2025-04-25
  • 神經網絡代碼詳解

    神經網絡作為一種人工智能技術,被廣泛應用於語音識別、圖像識別、自然語言處理等領域。而神經網絡的模型編寫,離不開代碼。本文將從多個方面詳細闡述神經網絡模型編寫的代碼技術。 一、神經網…

    編程 2025-04-25
  • Linux修改文件名命令詳解

    在Linux系統中,修改文件名是一個很常見的操作。Linux提供了多種方式來修改文件名,這篇文章將介紹Linux修改文件名的詳細操作。 一、mv命令 mv命令是Linux下的常用命…

    編程 2025-04-25
  • nginx與apache應用開發詳解

    一、概述 nginx和apache都是常見的web服務器。nginx是一個高性能的反向代理web服務器,將負載均衡和緩存集成在了一起,可以動靜分離。apache是一個可擴展的web…

    編程 2025-04-25
  • MPU6050工作原理詳解

    一、什麼是MPU6050 MPU6050是一種六軸慣性傳感器,能夠同時測量加速度和角速度。它由三個傳感器組成:一個三軸加速度計和一個三軸陀螺儀。這個組合提供了非常精細的姿態解算,其…

    編程 2025-04-25
  • 詳解eclipse設置

    一、安裝與基礎設置 1、下載eclipse並進行安裝。 2、打開eclipse,選擇對應的工作空間路徑。 File -> Switch Workspace -> [選擇…

    編程 2025-04-25
  • Python輸入輸出詳解

    一、文件讀寫 Python中文件的讀寫操作是必不可少的基本技能之一。讀寫文件分別使用open()函數中的’r’和’w’參數,讀取文件…

    編程 2025-04-25
  • Python安裝OS庫詳解

    一、OS簡介 OS庫是Python標準庫的一部分,它提供了跨平台的操作系統功能,使得Python可以進行文件操作、進程管理、環境變量讀取等系統級操作。 OS庫中包含了大量的文件和目…

    編程 2025-04-25
  • Java BigDecimal 精度詳解

    一、基礎概念 Java BigDecimal 是一個用於高精度計算的類。普通的 double 或 float 類型只能精確表示有限的數字,而對於需要高精度計算的場景,BigDeci…

    編程 2025-04-25

發表回復

登錄後才能評論