Kafka如何保證不重複消費又不丟失數據

Kafka是一個高吞吐量分散式消息系統,被廣泛應用於數據傳輸中。在Kafka中,一些場景下需要保證消息不丟失也不被重複消費,本文將從多個方面對這個問題進行闡述。

一、消息持久化

Kafka將所有消息進行持久化存儲,每個partition有相應的replication factor,即副本數量。當Producer發送消息到Kafka,它會被複制到該Partition所有副本機器上,只有在副本寫入到磁碟時才會通知Producer寫入成功。這意味著即使一個broker宕機,也會有其他副本來保證消息的可用性和一致性。

對於消費者而言,它只需簡單地向Kafka請求下一批消息。Kafka僅在Broker端記錄每個Consumer最後讀取的消息Offset,這個Offset是Consumer自己記錄的,這樣即使Consumer下線或者重啟後,可以在Offset的位置繼續讀取消息。

實例:

“`
ProducerConfig props = new ProducerConfig(getProducerProperties());
Producer producer = new Producer(props);
KeyedMessage message = new KeyedMessage(“topic”,”key”, “value”);
producer.send(message);
producer.close();

ConsumerConfig props = new ConsumerConfig(getConsumerProperties());
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(props);
String topic = “topic”;
Map topicCount = new HashMap();
topicCount.put(topic, 1);
Map<String, List<KafkaStream>> consumerStreams =
consumer.createMessageStreams(topicCount);
List<KafkaStream> streams = consumerStreams.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
“`

二、冪等性

Kafka 0.11後加入了冪等性保證,此功能可被Producer使用,以確保相同的消息能夠被重複生產,但在消息分配時,每個消息只會被處理一次。這將確保即使重複發送消息,也不會導致數據損壞或消息重複,從而達到不丟失數據和不重複消費的目標。

實例:

“`java
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.ACKS_CONFIG, “all”);
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32_768);
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, “snappy”);

Producer producer = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producer.initTransactions();

try {
producer.beginTransaction();
for (String data : dataList) {
producer.send(new ProducerRecord(“topic”, key, data));
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
producer.flush();
producer.close();
“`

三、使用Consumer Group

Kafka提供了Consumer Group概念,一個Consumer Group由一組Consumer組成,共同消費一個或者一組Partition。當一個Consumer Group進行消息消費時,Partition中的每個消息只能被組中的一個Consumer消費,其他Consumer則不能再消費該Partition中的任何消息。當Consumer Group中的任何一個Consumer宕機或加入時,該Consumer Group都不會影響另外Consumer Group中的消費情況。

實例:

“`java
Properties props = new Properties();
props.put(“group.id”, “test”);
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf(“offset = %d, key = %s, value = %s”, record.offset(), record.key(), record.value());
}
}
“`

四、使用Batch Consumer

Batch Consumer每次消費多個消息,以提高消息處理效率和優化網路IO。它可以一次消費多條消息,然後一次性提交Offset,從而能夠提高效率,減少IO操作次數。

實例:

“`java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class BatchConsumerExampler {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “test”);
props.put(“enable.auto.commit”, “false”);
props.put(“auto.offset.reset”, “earliest”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
final KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
try {
while (true) {
final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
if (consumerRecords.count() == 0) {
continue;
}
int count = 0;
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.printf(“ConsumerRecord:(%d, %s, %d, %d, %s)\n”,
consumerRecord.key(), consumerRecord.value(),
consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp());
count++;
}
System.out.println(“succeed to consume ” + count + ” records”);
consumer.commitAsync();
}
} finally {
consumer.close();
}
}
}
“`

五、總結

本文從消息持久化、冪等性、Consumer Group、Batch Consumer等方面對Kafka如何保證數據不重複消費又不丟失進行了詳細的闡述。相信在實際工作中,針對不同場景,讀者能夠合理地使用這些技術,保障數據的安全性和處理效率。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/309947.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2025-01-04 19:32
下一篇 2025-01-04 19:32

相關推薦

  • Python讀取CSV數據畫散點圖

    本文將從以下方面詳細闡述Python讀取CSV文件並畫出散點圖的方法: 一、CSV文件介紹 CSV(Comma-Separated Values)即逗號分隔值,是一種存儲表格數據的…

    編程 2025-04-29
  • Python中讀入csv文件數據的方法用法介紹

    csv是一種常見的數據格式,通常用於存儲小型數據集。Python作為一種廣泛流行的編程語言,內置了許多操作csv文件的庫。本文將從多個方面詳細介紹Python讀入csv文件的方法。…

    編程 2025-04-29
  • 如何用Python統計列表中各數據的方差和標準差

    本文將從多個方面闡述如何使用Python統計列表中各數據的方差和標準差, 並給出詳細的代碼示例。 一、什麼是方差和標準差 方差是衡量數據變異程度的統計指標,它是每個數據值和該數據值…

    編程 2025-04-29
  • Python多線程讀取數據

    本文將詳細介紹多線程讀取數據在Python中的實現方法以及相關知識點。 一、線程和多線程 線程是操作系統調度的最小單位。單線程程序只有一個線程,按照程序從上到下的順序逐行執行。而多…

    編程 2025-04-29
  • Python爬取公交數據

    本文將從以下幾個方面詳細闡述python爬取公交數據的方法: 一、準備工作 1、安裝相關庫 import requests from bs4 import BeautifulSou…

    編程 2025-04-29
  • Python兩張表數據匹配

    本篇文章將詳細闡述如何使用Python將兩張表格中的數據匹配。以下是具體的解決方法。 一、數據匹配的概念 在生活和工作中,我們常常需要對多組數據進行比對和匹配。在數據量較小的情況下…

    編程 2025-04-29
  • Python數據標準差標準化

    本文將為大家詳細講述Python中的數據標準差標準化,以及涉及到的相關知識。 一、什麼是數據標準差標準化 數據標準差標準化是數據處理中的一種方法,通過對數據進行標準差標準化可以將不…

    編程 2025-04-29
  • 如何使用Python讀取CSV數據

    在數據分析、數據挖掘和機器學習等領域,CSV文件是一種非常常見的文件格式。Python作為一種廣泛使用的編程語言,也提供了方便易用的CSV讀取庫。本文將介紹如何使用Python讀取…

    編程 2025-04-29
  • Python如何打亂數據集

    本文將從多個方面詳細闡述Python打亂數據集的方法。 一、shuffle函數原理 shuffle函數是Python中的一個內置函數,主要作用是將一個可迭代對象的元素隨機排序。 在…

    編程 2025-04-29
  • Python根據表格數據生成折線圖

    本文將介紹如何使用Python根據表格數據生成折線圖。折線圖是一種常見的數據可視化圖表形式,可以用來展示數據的趨勢和變化。Python是一種流行的編程語言,其強大的數據分析和可視化…

    編程 2025-04-29

發表回復

登錄後才能評論