一、RocketMQ死信隊列概述
RocketMQ是一個高性能、可靠性、可擴展性 broker 致力於處理大量數據,包括流式數據和批量數據,並且能夠實現在線擴容。RocketMQ支持死信隊列(Dead Letter Queue,DLQ),可以將無法被使用者消費的消息轉發到指定的 topic 或者 queue 中去,死信隊列就是為了解決這些問題而生的。
對於一些無法被用戶正常消費的消息,RocketMQ會根據一些設定規則,最終將這些問題消息發送到RocketMQ死信隊列中,從而引導一系列的流程操作。
二、RocketMQ死信隊列要自己配
RocketMQ死信隊列需要手動配製,這個最好在消費者端配製,如果在消費者配置消費者類為順序消息消費者,需要在組消費者中增加一個成員來消費死信消息。以下是使用Java SDK 實現創建消費者及相關配製的示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("localhost:9876");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("topic_name", "*");
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setInstanceName("consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeTimeout(60);
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
consumer.setMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
// business logic
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
// 死信隊列配製
String groupName = "your_group_name";
String topic = "your_topic_name";
String origTopic = "your_topic_name";
String subExpression = "your_sub_expression";
int maxRetryTimes = 5;
long retryInterval = 60L;
String dlqTopic = MixAll.getRetryTopic(groupName);
String dlqRealTopic = String.format("%s%s", dlqTopic, System.currentTimeMillis());
String dlqSubExpression = String.format("%s&&%s", subExpression, MessageConst.PROPERTY_RETRY_TOPIC + "==" + dlqTopic);
CreateTopicKey topicKey = new CreateTopicKey(dlqTopic, org.apache.rocketmq.common.protocol.RequestCode.UPDATE_AND_CREATE_TOPIC);
TopicConfig topicConfig = new TopicConfig(dlqTopic);
groupConfig.getTopicConfigTable().put(dlqTopic, topicConfig);
table.put(topicKey, topicConfig.buildTopicSetting());
producer.send(new Message(origTopic, dlqSubExpression, "", new byte[] {1, 2, 3}), new SendCallback() {
public void onSuccess(SendResult sendResult) {
// do something
}
public void onException(Throwable e) {
// do something
}
});
三、RocketMQ死信隊列保證消息被消費
RocketMQ生產者將消息發送到 broker 後,broker會將消息持久化到磁碟,保證消息不會丟失。RocketMQ消費者一旦收到消息後,就會自動提交 offset,表明將此條消息消費完整,所以在 broker處於重啟等一些情況下,已經消費成功但未來得及提交 offset 的消息會再次被 broker 發送給消費者消費。
RocketMQ的重複刪除機制有兩種,第一種是使用默認的消費重試次數機制,重試這個機制在一定時間內會不重不漏地把消息發送給消費者,保障消息不丟失。如果運行消費者的伺服器出現宕機,這個時間間隔可能會被限制。第二種是使用死信隊列。
四、RocketMQ死信隊列消費
RocketMQ的死信隊列機製為保障消息不丟失提供了一種方式,分別從定義、實現、應用場景幾個方面介紹死信隊列的消費機制。
定義:
public enum ConsumeStatus {
SUCCESS,
FAIL,
EXCEPTION,
;
}
實現:
public interface MessageListener {
ConsumeStatus consumeMessage(MessageExt messageExt);
}
應用場景:
比如一個新用戶提交了訂單,但是由於某些原因,該訂單的數據中缺少了一些必須的欄位。這部分訂單消息就可以通過 RocketMQ發往死信隊列,等待相關的人員來處理。又比如當 RocketMQ 消費端出現異常,RocketMQ的死信隊列可以幫助消費者重新消費這些過期或異常的消息,確保消息不丟失。
五、RocketMQ死信隊列處理
RocketMQ消費者接收到消息後,可以判斷一下消費是否成功。如果消費失敗,則可以根據消息中的關聯信息嘗試進行重試、或者將消息發送到死信隊列。
業務處理時如果出現異常,可以在消費端將消息發送到RocketMQ死信隊列中,業務處理時,調用方法:reconsumeLater();
後的傳入為設置該消息再次消費的時間,此時消息會被放入到 RocketMQ 的 「重試隊列」,若多次發生了消費失敗,則最終會被發送到 DLQ 中。
以下是使用Java SDK在消費者端將消息發送到RocketMQ死信隊列的示例:
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
// 處理業務邏輯
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 如果消費失敗,將該消息告知 broker,等待 broker 根據系統設置進行後續操作,如重新消費或進入死信隊列
}
六、RocketMQ死信隊列的使用場景
RocketMQ死信隊列的具體使用場景如下:
1.消息延遲消費
我們可以將一些帶有延遲消費的消息通過 RocketMQ定時儲存在 broker 中,等到指定時間後再由 RocketMQ 發送給消費方進行消費。如果在指定時間內消費成功,則 broker 會將消息標記為已消費。否則,就會自動發送到死信隊列。
2.異常消息重新消費
考慮到消費者在消息處理過程中,可能會出現臟數據、數據異常或者處理失敗的情況,在消息重試了幾次之後,依然不能被正常地消費,那麼就可以將這些消費失敗、過期數據等通過設置重試機制和死信隊列機制來優雅地處理。RocketMQ死信隊列就是可以讓這些消費無法完成的消息正常被消費的解決方案。
3.流量負載與限流
可以通過死信隊列來實現簡單的負載均衡,將某些 topic 的消息發送到這個隊列,這樣的話就可以避免在消息消費處理失敗過程中對 consumer 造成過於嚴重的負載壓力,從而降低服務的風險,提高整個架構的穩定性。
七、RabbitMQ死信隊列使用場景
RabbitMQ同樣支持死信隊列,但它與RocketMQ實現方式並不相同。RabbitMQ的 DLQ 需要為每個需要死信隊列的 queue 配置一個單獨的 exchange,而配置與代碼示例如下:
//在生產端聲明死信exchange
channel.exchangeDeclare("dlx.exchange", "topic", true);
//聲明隊列,並設置隊列的 x-dead-letter-exchange 和 x-dead-letter-routing-key
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
arguments.put("x-dead-letter-routing-key", "dlqKey");
channel.queueDeclare("test.queue", true, false, false, arguments);
以上示例中death_exchange就是死信exchange,必須先綁定一個死信的exchange,然後在設置原隊列的死信隊列綁定到它上面,這樣就完成了死信的設置。
八、RocketMQ重試隊列
生產者在向 RocketMQ發送消息時,如果由於一些異常導致發送失敗的話,RocketMQ 會自動嘗試重試,RocketMQ 會將消息發送到「重試主題」(Retry Topic)中。重試主題是與原主題(Original Topic)對應的,在RocketMQ中指定了一次重試的時間一般在 15s 和 300s 之間。
RocketMQ的重試次數是可以自己控制的,可以根據實際情況自己設置重試次數。
九、RocketMQ消息隊列
RocketMQ是一個分散式、多線程、高吞吐量的消息隊列系統,擁有極高的性能以及魯棒性,因此可以適用於各種消息隊列相關的場景。消息隊列可以幫助我們快速的支持非同步消息通知、削峰填谷、訂單處理、用戶行為記錄、數據同步、大數據處理等複雜的業務場景。
消息隊列的用途主要有兩個方向:
1. 在進行業務邏輯的設計時,可以採用消息隊列將業務邏輯進行拆解和解耦,實現高內聚、低耦合的設計理念。這樣可以避免代碼量的過多臃腫,使邏輯處理可讀性更好。我們將業務邏輯拆解為一個個簡單的處理單元,每個處理單元之間通過消息隊列進行溝通交流,實現松耦合、高內聚的性質。
2. 在互聯網大數據時代,消息隊列可以進行商業化的運用,將消息隊列用於推送廣告、推送信息等,實現信息流向管控和推送。如果使用 RocketMQ 等高性能、可擴展性的系統,則可以更好的解決各種問題。
十、RocketMQ查看消息隊列
對於消息隊列我們也可以設置監控,這樣我們就可以知道消息隊列在生產和消費過程中的情況,並且可以對問題進行快速的排查。
RocketMQ提供了許多介面來監控各種運維指標,通過這些介面我們可以快速了解集群狀態,同時還可以進行橫向擴展和縱向擴展,來滿足我們不斷變換的業務需求。
我們可以通過 RocketMQ 官網提供的 Console 控制台工具實現消息隊列的查看、監控等等功能。
參考文獻
- 阿里巴巴 IT 技術圈。RocketMQ的死信隊列深入淺出。https://yq.aliyun.com/articles/718931
- 阿里巴巴 IT 技術圈。RocketMQ主題隊列深入淺出。https://yq.aliyun.com/articles/718892</
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/155356.html