RocketMQ事務消息原理

一、RocketMQ概述

RocketMQ是一個高可用、高吞吐量、高性能的分散式消息隊列系統,消息隊列的分散式部署使得其可以滿足一些非同步處理的需求,如大數據量的日誌分析,海量的數據傳輸等。

RocketMQ的特點:

  1. 高可用性:支持主從切換,防止單點故障
  2. 高吞吐量:通過橫向擴展,保證業務高速運轉
  3. 高性能:通過搭建物理網路,保證了傳輸速度
  4. 一致性:支持多種消息類型,保證消息的不同階段能夠保持一致

二、RocketMQ的事務消息原理

RocketMQ的事務消息是保證消息可靠傳輸的一種機制。當我們需要一個消息在多個階段的整個流程中,保證消息可靠性時,利用RocketMQ的事務消息機制會是一種不錯的選擇。

在RocketMQ發送一個事務消息時,會將消息狀態保存在Half Message中。在Half Message發送給消息消費端時,消費端會進行確認,確認之後,消息會從Half Message中刪除,同時添加進消息存儲中。如果消費端沒有確認,那麼消息的狀態將一直處於Half Message狀態,不會被其他消費端接收。

與普通消息不同,事務消息還需要實現兩個關鍵介面:事務半消息發送和事務半消息確認。

三、RocketMQ事務消息的實現

1. 事務半消息發送

    public abstract class TransactionMQProducer extends DefaultMQProducer {
    
        public TransactionSendResult sendMessageInTransaction(Message message, LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
            
            // 發送半消息
            SendResult sendResult = this.send(message, new LocalTransactionMessageChecker(localTransactionExecuter), arg);
            
            // 返回結果
            return new TransactionSendResult(sendResult.getSendStatus(), sendResult.getMessageQueue(), sendResult.getMsgId(), sendResult.getMessage());
        }
    }

以上是一個事務半消息發送的實現,藉助RocketMQ的事務消息介面,我們可以通過LocalTransactionMessageChecker來進行事務消息的半消息發送。半消息發送完成之後,我們需要等待消息消費端確認。事務消息在發送時,會帶上業務系統自定義的參數 arg,這個參數用於在 commit 或 rollback 後,讓業務系統通知 RocketMQ 相應的提交或回滾操作。

2.事務半消息確認

    public interface TransactionListener {
        LocalTransactionState executeLocalTransaction(Message msg, Object arg);
        LocalTransactionState checkLocalTransaction(MessageExt msg);
    }
    
    public abstract class TransactionMQListener implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            MessageExt msg = msgs.get(0);
            try {
                TransactionListener transactionListener = getTransactionListener();
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
                LocalTransactionState localTransactionState = transactionListener.executeLocalTransaction(msg, null);
                if (LocalTransactionState.COMMIT_MESSAGE.equals(localTransactionState)) {
                    // 如果是提交狀態,則調用 commitTransaction 方法
                    commitTransaction(msg);
                } else if (LocalTransactionState.ROLLBACK_MESSAGE.equals(localTransactionState)) {
                    // 如果是回滾狀態,則調用 rollbackTransaction 方法
                    rollbackTransaction(msg);
                } else if (LocalTransactionState.UNKNOW.equals(localTransactionState)) {
                    log.warn("unknown local transaction status, message:{}", msg);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            } catch (Throwable e) {: 
                log.warn("executeLocalTransaction Exception", e);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

以上是一個事務半消息確認的實現。事務消息被認為是提交的消息,需要調用LocalTransactionListener的executeLocalTransaction方法進行事務確認。如果事務消息未被確認,則需要調用 rollbackTransaction方法進行事務回滾。

四、RocketMQ事務消息的應用場景

步驟:

  1. 生產者發送prepare消息到RocketMQ,RocketMQ會將消息狀態保存在Half Message中,返回Producer本地事務狀態
  2. 生產者執行本地Transaction,也就是開始執行正式的業務操作,比如向資料庫中插入數據
  3. 如果本地Transaction執行成功,則向RocketMQ發送COMMIT消息,這裡需要注意,在返回COMMIT前,RocketMQ不會將消息提交到消費端
  4. 如果本地Transaction出現異常,則向RocketMQ發送ROLLBACK消息,這裡需要注意,在返回ROLLBACK前,RocketMQ不會將消息提交到消費端
  5. 消費者正常消費消息,完成消息消費

事務型消息廣泛應用於分散式事務場景中,可以解決原先分散式系統中不可避免的一系列事務問題。由於事務型消息具有較高的可靠性和數據一致性,因此在一些對數據準確性要求高的應用場景中得到了廣泛的應用。

五、總結

RocketMQ的事務消息機制在分散式事務的場景下能夠發揮出它的優越性。它通過事務半消息發送和事務半消息確認兩個關鍵介面實現消息的事務性處理,保證了消息的可靠性和數據一致性,使其在一些對數據準確性要求較高的應用場景中得到了廣泛的應用。

原創文章,作者:ROXEW,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/333384.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
ROXEW的頭像ROXEW
上一篇 2025-02-01 13:34
下一篇 2025-02-01 13:34

相關推薦

  • Harris角點檢測演算法原理與實現

    本文將從多個方面對Harris角點檢測演算法進行詳細的闡述,包括演算法原理、實現步驟、代碼實現等。 一、Harris角點檢測演算法原理 Harris角點檢測演算法是一種經典的計算機視覺演算法…

    編程 2025-04-29
  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29
  • 瘦臉演算法 Python 原理與實現

    本文將從多個方面詳細闡述瘦臉演算法 Python 實現的原理和方法,包括該演算法的意義、流程、代碼實現、優化等內容。 一、演算法意義 隨著科技的發展,瘦臉演算法已經成為了人們修圖中不可缺少…

    編程 2025-04-29
  • 神經網路BP演算法原理

    本文將從多個方面對神經網路BP演算法原理進行詳細闡述,並給出完整的代碼示例。 一、BP演算法簡介 BP演算法是一種常用的神經網路訓練演算法,其全稱為反向傳播演算法。BP演算法的基本思想是通過正…

    編程 2025-04-29
  • Java Hmily分散式事務解決方案

    分散式系統是現在互聯網公司架構中的必備項,但隨著業務的不斷擴展,分散式事務的問題也日益凸顯。為了解決分散式事務問題,Java Hmily分散式事務解決方案應運而生。本文將對Java…

    編程 2025-04-28
  • ROS線程發布消息異常解決方法

    針對ROS線程發布消息異常問題,我們可以從以下幾個方面進行分析和解決。 一、檢查ROS代碼是否正確 首先,我們需要檢查ROS代碼是否正確。可能會出現的問題包括: 是否正確初始化RO…

    編程 2025-04-28
  • 使用Python發送微信消息給別人

    問題:如何使用Python發送微信消息給別人? 一、配置微信開發者平台 首先,要想發送微信消息,需要在微信開發者平台中進行配置,來獲取對應的授權信息。具體步驟如下: 1、登錄微信公…

    編程 2025-04-28
  • GloVe詞向量:從原理到應用

    本文將從多個方面對GloVe詞向量進行詳細的闡述,包括其原理、優缺點、應用以及代碼實現。如果你對詞向量感興趣,那麼這篇文章將會是一次很好的學習體驗。 一、原理 GloVe(Glob…

    編程 2025-04-27
  • 編譯原理語法分析思維導圖

    本文將從以下幾個方面詳細闡述編譯原理語法分析思維導圖: 一、語法分析介紹 1.1 語法分析的定義 語法分析是編譯器中將輸入的字元流轉換成抽象語法樹的一個過程。該過程的目的是確保輸入…

    編程 2025-04-27
  • MariaDB XA事務的使用方法

    本文將從多個方面對MariaDB XA事務進行詳細的闡述,包括XA事務的定義、特點、使用方法以及示例代碼等。通過本文的閱讀,讀者將能夠更好地理解和應用MariaDB XA事務。 一…

    編程 2025-04-27

發表回復

登錄後才能評論