一、RocketMQ概述
RocketMQ是一個高可用、高吞吐量、高性能的分散式消息隊列系統,消息隊列的分散式部署使得其可以滿足一些非同步處理的需求,如大數據量的日誌分析,海量的數據傳輸等。
RocketMQ的特點:
- 高可用性:支持主從切換,防止單點故障
- 高吞吐量:通過橫向擴展,保證業務高速運轉
- 高性能:通過搭建物理網路,保證了傳輸速度
- 一致性:支持多種消息類型,保證消息的不同階段能夠保持一致
二、RocketMQ的事務消息原理
RocketMQ的事務消息是保證消息可靠傳輸的一種機制。當我們需要一個消息在多個階段的整個流程中,保證消息可靠性時,利用RocketMQ的事務消息機制會是一種不錯的選擇。
在RocketMQ發送一個事務消息時,會將消息狀態保存在Half Message中。在Half Message發送給消息消費端時,消費端會進行確認,確認之後,消息會從Half Message中刪除,同時添加進消息存儲中。如果消費端沒有確認,那麼消息的狀態將一直處於Half Message狀態,不會被其他消費端接收。
與普通消息不同,事務消息還需要實現兩個關鍵介面:事務半消息發送和事務半消息確認。
三、RocketMQ事務消息的實現
1. 事務半消息發送
public abstract class TransactionMQProducer extends DefaultMQProducer { public TransactionSendResult sendMessageInTransaction(Message message, LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { // 發送半消息 SendResult sendResult = this.send(message, new LocalTransactionMessageChecker(localTransactionExecuter), arg); // 返回結果 return new TransactionSendResult(sendResult.getSendStatus(), sendResult.getMessageQueue(), sendResult.getMsgId(), sendResult.getMessage()); } }
以上是一個事務半消息發送的實現,藉助RocketMQ的事務消息介面,我們可以通過LocalTransactionMessageChecker來進行事務消息的半消息發送。半消息發送完成之後,我們需要等待消息消費端確認。事務消息在發送時,會帶上業務系統自定義的參數 arg,這個參數用於在 commit 或 rollback 後,讓業務系統通知 RocketMQ 相應的提交或回滾操作。
2.事務半消息確認
public interface TransactionListener { LocalTransactionState executeLocalTransaction(Message msg, Object arg); LocalTransactionState checkLocalTransaction(MessageExt msg); } public abstract class TransactionMQListener implements MessageListenerOrderly { public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { MessageExt msg = msgs.get(0); try { TransactionListener transactionListener = getTransactionListener(); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); LocalTransactionState localTransactionState = transactionListener.executeLocalTransaction(msg, null); if (LocalTransactionState.COMMIT_MESSAGE.equals(localTransactionState)) { // 如果是提交狀態,則調用 commitTransaction 方法 commitTransaction(msg); } else if (LocalTransactionState.ROLLBACK_MESSAGE.equals(localTransactionState)) { // 如果是回滾狀態,則調用 rollbackTransaction 方法 rollbackTransaction(msg); } else if (LocalTransactionState.UNKNOW.equals(localTransactionState)) { log.warn("unknown local transaction status, message:{}", msg); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } catch (Throwable e) {: log.warn("executeLocalTransaction Exception", e); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }
以上是一個事務半消息確認的實現。事務消息被認為是提交的消息,需要調用LocalTransactionListener的executeLocalTransaction方法進行事務確認。如果事務消息未被確認,則需要調用 rollbackTransaction方法進行事務回滾。
四、RocketMQ事務消息的應用場景
步驟:
- 生產者發送prepare消息到RocketMQ,RocketMQ會將消息狀態保存在Half Message中,返回Producer本地事務狀態
- 生產者執行本地Transaction,也就是開始執行正式的業務操作,比如向資料庫中插入數據
- 如果本地Transaction執行成功,則向RocketMQ發送COMMIT消息,這裡需要注意,在返回COMMIT前,RocketMQ不會將消息提交到消費端
- 如果本地Transaction出現異常,則向RocketMQ發送ROLLBACK消息,這裡需要注意,在返回ROLLBACK前,RocketMQ不會將消息提交到消費端
- 消費者正常消費消息,完成消息消費
事務型消息廣泛應用於分散式事務場景中,可以解決原先分散式系統中不可避免的一系列事務問題。由於事務型消息具有較高的可靠性和數據一致性,因此在一些對數據準確性要求高的應用場景中得到了廣泛的應用。
五、總結
RocketMQ的事務消息機制在分散式事務的場景下能夠發揮出它的優越性。它通過事務半消息發送和事務半消息確認兩個關鍵介面實現消息的事務性處理,保證了消息的可靠性和數據一致性,使其在一些對數據準確性要求較高的應用場景中得到了廣泛的應用。
原創文章,作者:ROXEW,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/333384.html