Kafka如何保证不重复消费又不丢失数据

Kafka是一个高吞吐量分布式消息系统,被广泛应用于数据传输中。在Kafka中,一些场景下需要保证消息不丢失也不被重复消费,本文将从多个方面对这个问题进行阐述。

一、消息持久化

Kafka将所有消息进行持久化存储,每个partition有相应的replication factor,即副本数量。当Producer发送消息到Kafka,它会被复制到该Partition所有副本机器上,只有在副本写入到磁盘时才会通知Producer写入成功。这意味着即使一个broker宕机,也会有其他副本来保证消息的可用性和一致性。

对于消费者而言,它只需简单地向Kafka请求下一批消息。Kafka仅在Broker端记录每个Consumer最后读取的消息Offset,这个Offset是Consumer自己记录的,这样即使Consumer下线或者重启后,可以在Offset的位置继续读取消息。

实例:

“`
ProducerConfig props = new ProducerConfig(getProducerProperties());
Producer producer = new Producer(props);
KeyedMessage message = new KeyedMessage(“topic”,”key”, “value”);
producer.send(message);
producer.close();

ConsumerConfig props = new ConsumerConfig(getConsumerProperties());
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(props);
String topic = “topic”;
Map topicCount = new HashMap();
topicCount.put(topic, 1);
Map<String, List<KafkaStream>> consumerStreams =
consumer.createMessageStreams(topicCount);
List<KafkaStream> streams = consumerStreams.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
“`

二、幂等性

Kafka 0.11后加入了幂等性保证,此功能可被Producer使用,以确保相同的消息能够被重复生产,但在消息分配时,每个消息只会被处理一次。这将确保即使重复发送消息,也不会导致数据损坏或消息重复,从而达到不丢失数据和不重复消费的目标。

实例:

“`java
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.ACKS_CONFIG, “all”);
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32_768);
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, “snappy”);

Producer producer = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producer.initTransactions();

try {
producer.beginTransaction();
for (String data : dataList) {
producer.send(new ProducerRecord(“topic”, key, data));
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
producer.flush();
producer.close();
“`

三、使用Consumer Group

Kafka提供了Consumer Group概念,一个Consumer Group由一组Consumer组成,共同消费一个或者一组Partition。当一个Consumer Group进行消息消费时,Partition中的每个消息只能被组中的一个Consumer消费,其他Consumer则不能再消费该Partition中的任何消息。当Consumer Group中的任何一个Consumer宕机或加入时,该Consumer Group都不会影响另外Consumer Group中的消费情况。

实例:

“`java
Properties props = new Properties();
props.put(“group.id”, “test”);
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf(“offset = %d, key = %s, value = %s”, record.offset(), record.key(), record.value());
}
}
“`

四、使用Batch Consumer

Batch Consumer每次消费多个消息,以提高消息处理效率和优化网络IO。它可以一次消费多条消息,然后一次性提交Offset,从而能够提高效率,减少IO操作次数。

实例:

“`java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class BatchConsumerExampler {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “test”);
props.put(“enable.auto.commit”, “false”);
props.put(“auto.offset.reset”, “earliest”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
final KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
try {
while (true) {
final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
if (consumerRecords.count() == 0) {
continue;
}
int count = 0;
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.printf(“ConsumerRecord:(%d, %s, %d, %d, %s)\n”,
consumerRecord.key(), consumerRecord.value(),
consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp());
count++;
}
System.out.println(“succeed to consume ” + count + ” records”);
consumer.commitAsync();
}
} finally {
consumer.close();
}
}
}
“`

五、总结

本文从消息持久化、幂等性、Consumer Group、Batch Consumer等方面对Kafka如何保证数据不重复消费又不丢失进行了详细的阐述。相信在实际工作中,针对不同场景,读者能够合理地使用这些技术,保障数据的安全性和处理效率。

原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/309947.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
小蓝小蓝
上一篇 2025-01-04 19:32
下一篇 2025-01-04 19:32

相关推荐

  • Python读取CSV数据画散点图

    本文将从以下方面详细阐述Python读取CSV文件并画出散点图的方法: 一、CSV文件介绍 CSV(Comma-Separated Values)即逗号分隔值,是一种存储表格数据的…

    编程 2025-04-29
  • Python中读入csv文件数据的方法用法介绍

    csv是一种常见的数据格式,通常用于存储小型数据集。Python作为一种广泛流行的编程语言,内置了许多操作csv文件的库。本文将从多个方面详细介绍Python读入csv文件的方法。…

    编程 2025-04-29
  • 如何用Python统计列表中各数据的方差和标准差

    本文将从多个方面阐述如何使用Python统计列表中各数据的方差和标准差, 并给出详细的代码示例。 一、什么是方差和标准差 方差是衡量数据变异程度的统计指标,它是每个数据值和该数据值…

    编程 2025-04-29
  • Python多线程读取数据

    本文将详细介绍多线程读取数据在Python中的实现方法以及相关知识点。 一、线程和多线程 线程是操作系统调度的最小单位。单线程程序只有一个线程,按照程序从上到下的顺序逐行执行。而多…

    编程 2025-04-29
  • Python爬取公交数据

    本文将从以下几个方面详细阐述python爬取公交数据的方法: 一、准备工作 1、安装相关库 import requests from bs4 import BeautifulSou…

    编程 2025-04-29
  • Python两张表数据匹配

    本篇文章将详细阐述如何使用Python将两张表格中的数据匹配。以下是具体的解决方法。 一、数据匹配的概念 在生活和工作中,我们常常需要对多组数据进行比对和匹配。在数据量较小的情况下…

    编程 2025-04-29
  • Python数据标准差标准化

    本文将为大家详细讲述Python中的数据标准差标准化,以及涉及到的相关知识。 一、什么是数据标准差标准化 数据标准差标准化是数据处理中的一种方法,通过对数据进行标准差标准化可以将不…

    编程 2025-04-29
  • 如何使用Python读取CSV数据

    在数据分析、数据挖掘和机器学习等领域,CSV文件是一种非常常见的文件格式。Python作为一种广泛使用的编程语言,也提供了方便易用的CSV读取库。本文将介绍如何使用Python读取…

    编程 2025-04-29
  • Python如何打乱数据集

    本文将从多个方面详细阐述Python打乱数据集的方法。 一、shuffle函数原理 shuffle函数是Python中的一个内置函数,主要作用是将一个可迭代对象的元素随机排序。 在…

    编程 2025-04-29
  • Python根据表格数据生成折线图

    本文将介绍如何使用Python根据表格数据生成折线图。折线图是一种常见的数据可视化图表形式,可以用来展示数据的趋势和变化。Python是一种流行的编程语言,其强大的数据分析和可视化…

    编程 2025-04-29

发表回复

登录后才能评论