一、Kafka消息傳輸保障機制概述
Kafka 是一個分散式發布/訂閱消息系統,其最大不同點是基於 pull 的消息傳輸機制,在保證高性能的同時實現了數據的可靠性。Kafka 的消息傳輸保障機制主要包括 3 種模式:
1、At most once:最多一次,消息發送者無論消息是否成功投遞,都不會對消息進行重試,卡夫卡集群最終不一定會收到該消息。該模式的優點在於消息的延遲最小,性能最高,但是會出現消息丟失的情況,適合那些對於消息的可靠性要求不高的業務場景。在實際中,最多一次這種模式會被用來傳送一些臨時的狀態消息,比如說心跳確認等。
2、At least once:最少一次,消息會被重試直到成功將其發送到 Kafka 集群。但是,在發送重試期間,同一條消息可能會被寫入多次,會產生數據冗餘,適合那些對於數據的一致性要求比較高,但是允許數據防止重複的業務場景,比如說電商平台中的訂單提交。
3、Exactly once:恰好一次,這是目前 Kafka 隊列的默認傳輸保障模式,它同時具備 At most once 和 At least once 兩種模式的優點,並且沒有它們的缺點。它保證了數據的一致性和可靠性,適用於金融、醫療、物流等對數據可靠性要求極高的行業。在實際中,使用此模式需要使得發送者以冪等(idempotent)的方式向 Kafka 進行數據發送,即同一條消息不會被重複投遞。
二、Kafka消息傳輸保障機制實現
實現Kafka的消息傳輸保障機制需要兩個關鍵組件:生產者 API 和消費者 API。
為了保證每一條消息都能按照想要的模式投遞到 Kafka 集群,生產者 API 具備了重試機制。在發送消息時,如果網路不穩定或者 Kafka 集群出現宕機等突髮狀況,生產者會在配置的時間間隔內進行重試。生產者 API 還允許配置冪等性(idempotent)保證消息不重複,減少對數據的冗餘寫入。
消費者 API 需要做到消費的消息是通過已經成功提交的 offset 來標識的。在 Kafka 中,offset 是用來標識每一條消息,在消費數據時,使用 offset 來記錄消費的位置。
三、Kafka消息傳輸保障機制代碼示例
以下為 Kafka 的 Java API 的代碼示例:
public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); } } public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
四、結論
本文詳細分析了 Kafka 消息傳輸保障機制,從三個方面探討了 Kafka 的消息傳輸保障機制概述、實現和示例代碼。Kafka 採用 pull 模式的消息拉取機制,在保證高性能的同時也確保了消息的可靠性。闡述了 At most once、At least once 和 Exactly once 這三種不同的消息傳輸保障機制,並且從生產者 API 和消費者 API 這兩個關鍵組件分別進行了闡述。最後,通過示例代碼演示了如何在實際項目中使用 Kafka 的消息傳輸保障機制。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/289351.html