Kafka是一个高吞吐量分布式消息系统,被广泛应用于数据传输中。在Kafka中,一些场景下需要保证消息不丢失也不被重复消费,本文将从多个方面对这个问题进行阐述。
一、消息持久化
Kafka将所有消息进行持久化存储,每个partition有相应的replication factor,即副本数量。当Producer发送消息到Kafka,它会被复制到该Partition所有副本机器上,只有在副本写入到磁盘时才会通知Producer写入成功。这意味着即使一个broker宕机,也会有其他副本来保证消息的可用性和一致性。
对于消费者而言,它只需简单地向Kafka请求下一批消息。Kafka仅在Broker端记录每个Consumer最后读取的消息Offset,这个Offset是Consumer自己记录的,这样即使Consumer下线或者重启后,可以在Offset的位置继续读取消息。
实例:
“`
ProducerConfig props = new ProducerConfig(getProducerProperties());
Producer producer = new Producer(props);
KeyedMessage message = new KeyedMessage(“topic”,”key”, “value”);
producer.send(message);
producer.close();
ConsumerConfig props = new ConsumerConfig(getConsumerProperties());
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(props);
String topic = “topic”;
Map topicCount = new HashMap();
topicCount.put(topic, 1);
Map<String, List<KafkaStream>> consumerStreams =
consumer.createMessageStreams(topicCount);
List<KafkaStream> streams = consumerStreams.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
“`
二、幂等性
Kafka 0.11后加入了幂等性保证,此功能可被Producer使用,以确保相同的消息能够被重复生产,但在消息分配时,每个消息只会被处理一次。这将确保即使重复发送消息,也不会导致数据损坏或消息重复,从而达到不丢失数据和不重复消费的目标。
实例:
“`java
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.ACKS_CONFIG, “all”);
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32_768);
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, “snappy”);
Producer producer = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (String data : dataList) {
producer.send(new ProducerRecord(“topic”, key, data));
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
producer.flush();
producer.close();
“`
三、使用Consumer Group
Kafka提供了Consumer Group概念,一个Consumer Group由一组Consumer组成,共同消费一个或者一组Partition。当一个Consumer Group进行消息消费时,Partition中的每个消息只能被组中的一个Consumer消费,其他Consumer则不能再消费该Partition中的任何消息。当Consumer Group中的任何一个Consumer宕机或加入时,该Consumer Group都不会影响另外Consumer Group中的消费情况。
实例:
“`java
Properties props = new Properties();
props.put(“group.id”, “test”);
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf(“offset = %d, key = %s, value = %s”, record.offset(), record.key(), record.value());
}
}
“`
四、使用Batch Consumer
Batch Consumer每次消费多个消息,以提高消息处理效率和优化网络IO。它可以一次消费多条消息,然后一次性提交Offset,从而能够提高效率,减少IO操作次数。
实例:
“`java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class BatchConsumerExampler {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “test”);
props.put(“enable.auto.commit”, “false”);
props.put(“auto.offset.reset”, “earliest”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
final KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
try {
while (true) {
final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
if (consumerRecords.count() == 0) {
continue;
}
int count = 0;
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.printf(“ConsumerRecord:(%d, %s, %d, %d, %s)\n”,
consumerRecord.key(), consumerRecord.value(),
consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp());
count++;
}
System.out.println(“succeed to consume ” + count + ” records”);
consumer.commitAsync();
}
} finally {
consumer.close();
}
}
}
“`
五、总结
本文从消息持久化、幂等性、Consumer Group、Batch Consumer等方面对Kafka如何保证数据不重复消费又不丢失进行了详细的阐述。相信在实际工作中,针对不同场景,读者能够合理地使用这些技术,保障数据的安全性和处理效率。
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/309947.html