RocketMQ死信隊列詳解

一、RocketMQ死信隊列概述

RocketMQ是一個高性能、可靠性、可擴展性 broker 致力於處理大量數據,包括流式數據和批量數據,並且能夠實現在線擴容。RocketMQ支持死信隊列(Dead Letter Queue,DLQ),可以將無法被使用者消費的消息轉發到指定的 topic 或者 queue 中去,死信隊列就是為了解決這些問題而生的。

對於一些無法被用戶正常消費的消息,RocketMQ會根據一些設定規則,最終將這些問題消息發送到RocketMQ死信隊列中,從而引導一系列的流程操作。

二、RocketMQ死信隊列要自己配

RocketMQ死信隊列需要手動配製,這個最好在消費者端配製,如果在消費者配置消費者類為順序消息消費者,需要在組消費者中增加一個成員來消費死信消息。以下是使用Java SDK 實現創建消費者及相關配製的示例:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic_name", "*");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setInstanceName("consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeTimeout(60);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
  public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    // business logic
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
});
consumer.start();

// 死信隊列配製
String groupName = "your_group_name";
String topic = "your_topic_name";
String origTopic = "your_topic_name";
String subExpression = "your_sub_expression";
int maxRetryTimes = 5;
long retryInterval = 60L;

String dlqTopic = MixAll.getRetryTopic(groupName);
String dlqRealTopic = String.format("%s%s", dlqTopic, System.currentTimeMillis());
String dlqSubExpression = String.format("%s&&%s", subExpression, MessageConst.PROPERTY_RETRY_TOPIC + "==" + dlqTopic);
CreateTopicKey topicKey = new CreateTopicKey(dlqTopic, org.apache.rocketmq.common.protocol.RequestCode.UPDATE_AND_CREATE_TOPIC);
TopicConfig topicConfig = new TopicConfig(dlqTopic);
groupConfig.getTopicConfigTable().put(dlqTopic, topicConfig);
table.put(topicKey, topicConfig.buildTopicSetting());

producer.send(new Message(origTopic, dlqSubExpression, "", new byte[] {1, 2, 3}), new SendCallback() {
  public void onSuccess(SendResult sendResult) {
    // do something
  }
  public void onException(Throwable e) {
    // do something
  }
});

三、RocketMQ死信隊列保證消息被消費

RocketMQ生產者將消息發送到 broker 後,broker會將消息持久化到磁盤,保證消息不會丟失。RocketMQ消費者一旦收到消息後,就會自動提交 offset,表明將此條消息消費完整,所以在 broker處於重啟等一些情況下,已經消費成功但未來得及提交 offset 的消息會再次被 broker 發送給消費者消費。

RocketMQ的重複刪除機制有兩種,第一種是使用默認的消費重試次數機制,重試這個機制在一定時間內會不重不漏地把消息發送給消費者,保障消息不丟失。如果運行消費者的服務器出現宕機,這個時間間隔可能會被限制。第二種是使用死信隊列。

四、RocketMQ死信隊列消費

RocketMQ的死信隊列機製為保障消息不丟失提供了一種方式,分別從定義、實現、應用場景幾個方面介紹死信隊列的消費機制。

定義:

public enum ConsumeStatus {
    SUCCESS,
    FAIL,
    EXCEPTION,
    ;
}

實現:

public interface MessageListener {
    ConsumeStatus consumeMessage(MessageExt messageExt);
}

應用場景:

比如一個新用戶提交了訂單,但是由於某些原因,該訂單的數據中缺少了一些必須的字段。這部分訂單消息就可以通過 RocketMQ發往死信隊列,等待相關的人員來處理。又比如當 RocketMQ 消費端出現異常,RocketMQ的死信隊列可以幫助消費者重新消費這些過期或異常的消息,確保消息不丟失。

五、RocketMQ死信隊列處理

RocketMQ消費者接收到消息後,可以判斷一下消費是否成功。如果消費失敗,則可以根據消息中的關聯信息嘗試進行重試、或者將消息發送到死信隊列。

業務處理時如果出現異常,可以在消費端將消息發送到RocketMQ死信隊列中,業務處理時,調用方法:reconsumeLater();後的傳入為設置該消息再次消費的時間,此時消息會被放入到 RocketMQ 的 「重試隊列」,若多次發生了消費失敗,則最終會被發送到 DLQ 中。

以下是使用Java SDK在消費者端將消息發送到RocketMQ死信隊列的示例:

public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
  // 處理業務邏輯
  return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 如果消費失敗,將該消息告知 broker,等待 broker 根據系統設置進行後續操作,如重新消費或進入死信隊列
}

六、RocketMQ死信隊列的使用場景

RocketMQ死信隊列的具體使用場景如下:

1.消息延遲消費

我們可以將一些帶有延遲消費的消息通過 RocketMQ定時儲存在 broker 中,等到指定時間後再由 RocketMQ 發送給消費方進行消費。如果在指定時間內消費成功,則 broker 會將消息標記為已消費。否則,就會自動發送到死信隊列。

2.異常消息重新消費

考慮到消費者在消息處理過程中,可能會出現臟數據、數據異常或者處理失敗的情況,在消息重試了幾次之後,依然不能被正常地消費,那麼就可以將這些消費失敗、過期數據等通過設置重試機制和死信隊列機制來優雅地處理。RocketMQ死信隊列就是可以讓這些消費無法完成的消息正常被消費的解決方案。

3.流量負載與限流

可以通過死信隊列來實現簡單的負載均衡,將某些 topic 的消息發送到這個隊列,這樣的話就可以避免在消息消費處理失敗過程中對 consumer 造成過於嚴重的負載壓力,從而降低服務的風險,提高整個架構的穩定性。

七、RabbitMQ死信隊列使用場景

RabbitMQ同樣支持死信隊列,但它與RocketMQ實現方式並不相同。RabbitMQ的 DLQ 需要為每個需要死信隊列的 queue 配置一個單獨的 exchange,而配置與代碼示例如下:

//在生產端聲明死信exchange
channel.exchangeDeclare("dlx.exchange", "topic", true);

//聲明隊列,並設置隊列的 x-dead-letter-exchange 和 x-dead-letter-routing-key
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
arguments.put("x-dead-letter-routing-key", "dlqKey");
channel.queueDeclare("test.queue", true, false, false, arguments);

以上示例中death_exchange就是死信exchange,必須先綁定一個死信的exchange,然後在設置原隊列的死信隊列綁定到它上面,這樣就完成了死信的設置。

八、RocketMQ重試隊列

生產者在向 RocketMQ發送消息時,如果由於一些異常導致發送失敗的話,RocketMQ 會自動嘗試重試,RocketMQ 會將消息發送到「重試主題」(Retry Topic)中。重試主題是與原主題(Original Topic)對應的,在RocketMQ中指定了一次重試的時間一般在 15s 和 300s 之間。

RocketMQ的重試次數是可以自己控制的,可以根據實際情況自己設置重試次數。

九、RocketMQ消息隊列

RocketMQ是一個分佈式、多線程、高吞吐量的消息隊列系統,擁有極高的性能以及魯棒性,因此可以適用於各種消息隊列相關的場景。消息隊列可以幫助我們快速的支持異步消息通知、削峰填谷、訂單處理、用戶行為記錄、數據同步、大數據處理等複雜的業務場景。

消息隊列的用途主要有兩個方向:

1. 在進行業務邏輯的設計時,可以採用消息隊列將業務邏輯進行拆解和解耦,實現高內聚、低耦合的設計理念。這樣可以避免代碼量的過多臃腫,使邏輯處理可讀性更好。我們將業務邏輯拆解為一個個簡單的處理單元,每個處理單元之間通過消息隊列進行溝通交流,實現松耦合、高內聚的性質。

2. 在互聯網大數據時代,消息隊列可以進行商業化的運用,將消息隊列用於推送廣告、推送信息等,實現信息流向管控和推送。如果使用 RocketMQ 等高性能、可擴展性的系統,則可以更好的解決各種問題。

十、RocketMQ查看消息隊列

對於消息隊列我們也可以設置監控,這樣我們就可以知道消息隊列在生產和消費過程中的情況,並且可以對問題進行快速的排查。

RocketMQ提供了許多接口來監控各種運維指標,通過這些接口我們可以快速了解集群狀態,同時還可以進行橫向擴展和縱向擴展,來滿足我們不斷變換的業務需求。

我們可以通過 RocketMQ 官網提供的 Console 控制台工具實現消息隊列的查看、監控等等功能。

參考文獻

  1. 阿里巴巴 IT 技術圈。RocketMQ的死信隊列深入淺出。https://yq.aliyun.com/articles/718931
  2. 阿里巴巴 IT 技術圈。RocketMQ主題隊列深入淺出。https://yq.aliyun.com/articles/718892</

    原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/155356.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-11-17 02:40
下一篇 2024-11-17 02:40

相關推薦

  • Python中的隊列定義

    本篇文章旨在深入闡述Python中隊列的定義及其應用,包括隊列的定義、隊列的類型、隊列的操作以及隊列的應用。同時,我們也會為您提供Python代碼示例。 一、隊列的定義 隊列是一種…

    編程 2025-04-29
  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29
  • 神經網絡代碼詳解

    神經網絡作為一種人工智能技術,被廣泛應用於語音識別、圖像識別、自然語言處理等領域。而神經網絡的模型編寫,離不開代碼。本文將從多個方面詳細闡述神經網絡模型編寫的代碼技術。 一、神經網…

    編程 2025-04-25
  • Linux sync詳解

    一、sync概述 sync是Linux中一個非常重要的命令,它可以將文件系統緩存中的內容,強制寫入磁盤中。在執行sync之前,所有的文件系統更新將不會立即寫入磁盤,而是先緩存在內存…

    編程 2025-04-25
  • C語言貪吃蛇詳解

    一、數據結構和算法 C語言貪吃蛇主要運用了以下數據結構和算法: 1. 鏈表 typedef struct body { int x; int y; struct body *nex…

    編程 2025-04-25
  • MPU6050工作原理詳解

    一、什麼是MPU6050 MPU6050是一種六軸慣性傳感器,能夠同時測量加速度和角速度。它由三個傳感器組成:一個三軸加速度計和一個三軸陀螺儀。這個組合提供了非常精細的姿態解算,其…

    編程 2025-04-25
  • git config user.name的詳解

    一、為什麼要使用git config user.name? git是一個非常流行的分佈式版本控制系統,很多程序員都會用到它。在使用git commit提交代碼時,需要記錄commi…

    編程 2025-04-25
  • Linux修改文件名命令詳解

    在Linux系統中,修改文件名是一個很常見的操作。Linux提供了多種方式來修改文件名,這篇文章將介紹Linux修改文件名的詳細操作。 一、mv命令 mv命令是Linux下的常用命…

    編程 2025-04-25
  • Java BigDecimal 精度詳解

    一、基礎概念 Java BigDecimal 是一個用於高精度計算的類。普通的 double 或 float 類型只能精確表示有限的數字,而對於需要高精度計算的場景,BigDeci…

    編程 2025-04-25
  • Python輸入輸出詳解

    一、文件讀寫 Python中文件的讀寫操作是必不可少的基本技能之一。讀寫文件分別使用open()函數中的’r’和’w’參數,讀取文件…

    編程 2025-04-25

發表回復

登錄後才能評論