Kafka是一個高吞吐量分布式消息系統,被廣泛應用於數據傳輸中。在Kafka中,一些場景下需要保證消息不丟失也不被重複消費,本文將從多個方面對這個問題進行闡述。
一、消息持久化
Kafka將所有消息進行持久化存儲,每個partition有相應的replication factor,即副本數量。當Producer發送消息到Kafka,它會被複制到該Partition所有副本機器上,只有在副本寫入到磁盤時才會通知Producer寫入成功。這意味着即使一個broker宕機,也會有其他副本來保證消息的可用性和一致性。
對於消費者而言,它只需簡單地向Kafka請求下一批消息。Kafka僅在Broker端記錄每個Consumer最後讀取的消息Offset,這個Offset是Consumer自己記錄的,這樣即使Consumer下線或者重啟後,可以在Offset的位置繼續讀取消息。
實例:
“`
ProducerConfig props = new ProducerConfig(getProducerProperties());
Producer producer = new Producer(props);
KeyedMessage message = new KeyedMessage(“topic”,”key”, “value”);
producer.send(message);
producer.close();
ConsumerConfig props = new ConsumerConfig(getConsumerProperties());
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(props);
String topic = “topic”;
Map topicCount = new HashMap();
topicCount.put(topic, 1);
Map<String, List<KafkaStream>> consumerStreams =
consumer.createMessageStreams(topicCount);
List<KafkaStream> streams = consumerStreams.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
“`
二、冪等性
Kafka 0.11後加入了冪等性保證,此功能可被Producer使用,以確保相同的消息能夠被重複生產,但在消息分配時,每個消息只會被處理一次。這將確保即使重複發送消息,也不會導致數據損壞或消息重複,從而達到不丟失數據和不重複消費的目標。
實例:
“`java
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProps.put(ProducerConfig.ACKS_CONFIG, “all”);
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32_768);
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, “snappy”);
Producer producer = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (String data : dataList) {
producer.send(new ProducerRecord(“topic”, key, data));
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
producer.flush();
producer.close();
“`
三、使用Consumer Group
Kafka提供了Consumer Group概念,一個Consumer Group由一組Consumer組成,共同消費一個或者一組Partition。當一個Consumer Group進行消息消費時,Partition中的每個消息只能被組中的一個Consumer消費,其他Consumer則不能再消費該Partition中的任何消息。當Consumer Group中的任何一個Consumer宕機或加入時,該Consumer Group都不會影響另外Consumer Group中的消費情況。
實例:
“`java
Properties props = new Properties();
props.put(“group.id”, “test”);
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf(“offset = %d, key = %s, value = %s”, record.offset(), record.key(), record.value());
}
}
“`
四、使用Batch Consumer
Batch Consumer每次消費多個消息,以提高消息處理效率和優化網絡IO。它可以一次消費多條消息,然後一次性提交Offset,從而能夠提高效率,減少IO操作次數。
實例:
“`java
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class BatchConsumerExampler {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “test”);
props.put(“enable.auto.commit”, “false”);
props.put(“auto.offset.reset”, “earliest”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
final KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList(“foo”, “bar”));
try {
while (true) {
final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100));
if (consumerRecords.count() == 0) {
continue;
}
int count = 0;
for (ConsumerRecord consumerRecord : consumerRecords) {
System.out.printf(“ConsumerRecord:(%d, %s, %d, %d, %s)\n”,
consumerRecord.key(), consumerRecord.value(),
consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp());
count++;
}
System.out.println(“succeed to consume ” + count + ” records”);
consumer.commitAsync();
}
} finally {
consumer.close();
}
}
}
“`
五、總結
本文從消息持久化、冪等性、Consumer Group、Batch Consumer等方面對Kafka如何保證數據不重複消費又不丟失進行了詳細的闡述。相信在實際工作中,針對不同場景,讀者能夠合理地使用這些技術,保障數據的安全性和處理效率。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/309947.html