Kafka幂等性详解

一、Kafka幂等性原理

Kafka幂等性保证了消息在发送和消费的过程中不会重复或丢失,其核心原理是基于唯一消息标识符以及多个产生者实例或消费者实例之间的协调。

Kafka中的每一个消息都有一个全局唯一的消息标识符,称之为消息的唯一标识符(Message UUID)或者消息的Sequence ID。这个标识符是Kafka Broker自动创建的,可以保证消息的顺序性和唯一性。

当开启Kafka幂等性时,每个生产者都会带上自己的唯一ID并向Kafka Broker发送消息,Broker会根据唯一标识符判断是否重复;当消费者消费消息时,会自动提交消息的位移偏移量,如果出现重复消费,消费者会自动回滚到上一次提交的位移。

二、Kafka幂等性开启

Kafka幂等性默认是关闭的,可以通过在Producer配置中添加enable.idempotence=true来开启,如下所示:

    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("enable.idempotence", "true"); // 开启幂等性
        props.put("retries", "3");
        props.put("batch.size", "16384");
        props.put("linger.ms", "1");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
    

Kafka幂等性只在acks=all的情况下生效,因为只有在确认所有副本均已写入消息后,才会返回成功响应。

三、Kafka幂等性参数

Kafka幂等性需要额外的参数支持,如下所示:

max.in.flight.requests.per.connection:每个连接的最大并发请求数,默认是5,如果设置为1,则每次只发送一条消息。

retries:重试次数,默认为Integer.MAX_VALUE。

delivery.timeout.ms:允许消息传输的最长时间,默认为2分钟。

四、Kafka幂等性消费

当消费者接收消息时,需要正确地处理幂等性,不仅仅需要考虑消费端的幂等性,还需要考虑生产端的幂等性,如下所示:

    
        public void onMessage(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
            Map offsetsToCommit = new HashMap();
            for (ConsumerRecord record : records) {
                if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判断是否是第一次消费消息
                    processRecord(record);
                    offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)); // 提交消费位移
                } else {
                    logger.info("Duplicated message received: {}", record.value());
                }
            }
            acknowledge.acknowledge(); // 手动提交位移
            if (!offsetsToCommit.isEmpty()) {
                kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
            }
        }
    

在消费端,通过判断消息的headers中是否有惟一id,来判断该消息是否已经被消费,如果已经被消费,则不进行消费操作;否则,处理该消息并提交消费位移。

五、Kafka幂等性配置

除了上述用于开启幂等性的配置参数,Kafka还有一些其他的配置参数可以帮助我们更好地控制和管理Kafka幂等性,如下所示:

unclean.leader.election.enable:是否允许脏的Leader选举,默认为false。

min.insync.replicas:多少个副本需要写入消息才算成功,默认是1。

transactional.id:若要开启Kafka事务,则需要用到这个参数。

六、Kafka幂等性作用

Kafka幂等性通过去除重复消息,从而降低了系统中数据被重复消费或发生重复操作的概率,避免了一些潜在的并发问题。

此外,Kafka幂等性还可以确保数据的顺序性和完整性,从而保证数据一致性。

七、Kafka幂等性写入

在进行写入操作时,需要注意如下几点:

1、为了确保唯一标识符的可用性,可以通过实现自定义序列化器或者使用字符串作为Key。

2、为了实现幂等性,对于重复消息需要进行忽略以及失败的重新尝试。

3、为了防止消息丢失,消费者应该尽量快的完成消费并提交位移。

八、Kafka幂等性面试题

1、什么是Kafka幂等性?

2、Kafka幂等性的原理是什么?

3、如何在Kafka中开启幂等性?

4、Kafka幂等性可以保证什么?有什么作用?

5、Kafka幂等性有哪些应用场景?

6、Kafka幂等性可以通过哪些参数进行配置?

九、Kafka幂等性和事务

Kafka幂等性与事务是可以一起使用的。通过事务的支持,可以确保原子性、隔离性和持久性。

在使用事务时,需要在Producer的配置中添加transactional.id参数,并将Kafka幂等性的配置参数与事务相关参数进行配合使用,如下所示:

    
        props.put("transaction.id", "transaction-id");
        props.put("max.in.flight.requests.per.connection", 1); // 和幂等性相关的配置
        props.put("retries", 3); // 和幂等性相关的配置
        props.put("enable.idempotence", true); // 开启幂等性
        KafkaProducer producer = new KafkaProducer(props);
    

十、Kafka幂等性跨分区选取

Kafka并不保证同一组消费者中的每个消费者都可以消费到每个分区中的每个消息,这就需要我们手动为每个消费者分配特定的分区进行消费。在Kafka中,有两种方式可以对分区进行操作,分别是手动分配和自动分配,手动分配更加灵活,但自动分配更加简便易行。

对于Kafka幂等性来说,需要注意两点:

1、在使用手动分配方式时,需要注意确保同一组消费者中,至少一个消费者消费到每个分区。

2、在使用自动分配方式时,需要在启动消费者时设置partition.assignment.strategy参数为RoundRobinAssignor

代码示例

开启Kafka幂等性:

    
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("enable.idempotence", "true"); // 开启幂等性
        props.put("retries", "3");
        props.put("batch.size", "16384");
        props.put("linger.ms", "1");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer(props);
    

消费者监听Kafka消息,并实现幂等性处理:

    
        public void onMessage(List<ConsumerRecord> records, Acknowledgment acknowledgment) {
            Map offsetsToCommit = new HashMap();
            for (ConsumerRecord record : records) {
                if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判断是否是第一次消费消息
                    processRecord(record);
                    offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1)); // 提交消费位移
                } else {
                    logger.info("Duplicated message received: {}", record.value());
                }
            }
            acknowledgment.acknowledge(); // 手动提交位移
            if (!offsetsToCommit.isEmpty()) {
                kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
            }
        }
    

参考资料

1、https://kafka.apache.org/documentation/#producerconfigs_idempotence

2、https://kafka.apache.org/documentation/#consumerconfigs_enable.idempotence

3、https://blog.csdn.net/qq_34707550/article/details/80205655

4、https://www.jianshu.com/p/66b10a39353f

原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/280796.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
小蓝小蓝
上一篇 2024-12-21 13:05
下一篇 2024-12-21 13:05

相关推荐

  • Python消费Kafka数据指南

    本文将为您详细介绍如何使用Python消费Kafka数据,旨在帮助读者快速掌握这一重要技能。 一、Kafka简介 Kafka是一种高性能和可伸缩的分布式消息队列,由Apache软件…

    编程 2025-04-28
  • Linux sync详解

    一、sync概述 sync是Linux中一个非常重要的命令,它可以将文件系统缓存中的内容,强制写入磁盘中。在执行sync之前,所有的文件系统更新将不会立即写入磁盘,而是先缓存在内存…

    编程 2025-04-25
  • 神经网络代码详解

    神经网络作为一种人工智能技术,被广泛应用于语音识别、图像识别、自然语言处理等领域。而神经网络的模型编写,离不开代码。本文将从多个方面详细阐述神经网络模型编写的代码技术。 一、神经网…

    编程 2025-04-25
  • Linux修改文件名命令详解

    在Linux系统中,修改文件名是一个很常见的操作。Linux提供了多种方式来修改文件名,这篇文章将介绍Linux修改文件名的详细操作。 一、mv命令 mv命令是Linux下的常用命…

    编程 2025-04-25
  • nginx与apache应用开发详解

    一、概述 nginx和apache都是常见的web服务器。nginx是一个高性能的反向代理web服务器,将负载均衡和缓存集成在了一起,可以动静分离。apache是一个可扩展的web…

    编程 2025-04-25
  • MPU6050工作原理详解

    一、什么是MPU6050 MPU6050是一种六轴惯性传感器,能够同时测量加速度和角速度。它由三个传感器组成:一个三轴加速度计和一个三轴陀螺仪。这个组合提供了非常精细的姿态解算,其…

    编程 2025-04-25
  • 详解eclipse设置

    一、安装与基础设置 1、下载eclipse并进行安装。 2、打开eclipse,选择对应的工作空间路径。 File -> Switch Workspace -> [选择…

    编程 2025-04-25
  • Python输入输出详解

    一、文件读写 Python中文件的读写操作是必不可少的基本技能之一。读写文件分别使用open()函数中的’r’和’w’参数,读取文件…

    编程 2025-04-25
  • Python安装OS库详解

    一、OS简介 OS库是Python标准库的一部分,它提供了跨平台的操作系统功能,使得Python可以进行文件操作、进程管理、环境变量读取等系统级操作。 OS库中包含了大量的文件和目…

    编程 2025-04-25
  • Java BigDecimal 精度详解

    一、基础概念 Java BigDecimal 是一个用于高精度计算的类。普通的 double 或 float 类型只能精确表示有限的数字,而对于需要高精度计算的场景,BigDeci…

    编程 2025-04-25

发表回复

登录后才能评论