RocketMQ事务消息原理

一、RocketMQ概述

RocketMQ是一个高可用、高吞吐量、高性能的分布式消息队列系统,消息队列的分布式部署使得其可以满足一些异步处理的需求,如大数据量的日志分析,海量的数据传输等。

RocketMQ的特点:

  1. 高可用性:支持主从切换,防止单点故障
  2. 高吞吐量:通过横向扩展,保证业务高速运转
  3. 高性能:通过搭建物理网络,保证了传输速度
  4. 一致性:支持多种消息类型,保证消息的不同阶段能够保持一致

二、RocketMQ的事务消息原理

RocketMQ的事务消息是保证消息可靠传输的一种机制。当我们需要一个消息在多个阶段的整个流程中,保证消息可靠性时,利用RocketMQ的事务消息机制会是一种不错的选择。

在RocketMQ发送一个事务消息时,会将消息状态保存在Half Message中。在Half Message发送给消息消费端时,消费端会进行确认,确认之后,消息会从Half Message中删除,同时添加进消息存储中。如果消费端没有确认,那么消息的状态将一直处于Half Message状态,不会被其他消费端接收。

与普通消息不同,事务消息还需要实现两个关键接口:事务半消息发送和事务半消息确认。

三、RocketMQ事务消息的实现

1. 事务半消息发送

    public abstract class TransactionMQProducer extends DefaultMQProducer {
    
        public TransactionSendResult sendMessageInTransaction(Message message, LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
            
            // 发送半消息
            SendResult sendResult = this.send(message, new LocalTransactionMessageChecker(localTransactionExecuter), arg);
            
            // 返回结果
            return new TransactionSendResult(sendResult.getSendStatus(), sendResult.getMessageQueue(), sendResult.getMsgId(), sendResult.getMessage());
        }
    }

以上是一个事务半消息发送的实现,借助RocketMQ的事务消息接口,我们可以通过LocalTransactionMessageChecker来进行事务消息的半消息发送。半消息发送完成之后,我们需要等待消息消费端确认。事务消息在发送时,会带上业务系统自定义的参数 arg,这个参数用于在 commit 或 rollback 后,让业务系统通知 RocketMQ 相应的提交或回滚操作。

2.事务半消息确认

    public interface TransactionListener {
        LocalTransactionState executeLocalTransaction(Message msg, Object arg);
        LocalTransactionState checkLocalTransaction(MessageExt msg);
    }
    
    public abstract class TransactionMQListener implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            MessageExt msg = msgs.get(0);
            try {
                TransactionListener transactionListener = getTransactionListener();
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
                LocalTransactionState localTransactionState = transactionListener.executeLocalTransaction(msg, null);
                if (LocalTransactionState.COMMIT_MESSAGE.equals(localTransactionState)) {
                    // 如果是提交状态,则调用 commitTransaction 方法
                    commitTransaction(msg);
                } else if (LocalTransactionState.ROLLBACK_MESSAGE.equals(localTransactionState)) {
                    // 如果是回滚状态,则调用 rollbackTransaction 方法
                    rollbackTransaction(msg);
                } else if (LocalTransactionState.UNKNOW.equals(localTransactionState)) {
                    log.warn("unknown local transaction status, message:{}", msg);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            } catch (Throwable e) {: 
                log.warn("executeLocalTransaction Exception", e);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

以上是一个事务半消息确认的实现。事务消息被认为是提交的消息,需要调用LocalTransactionListener的executeLocalTransaction方法进行事务确认。如果事务消息未被确认,则需要调用 rollbackTransaction方法进行事务回滚。

四、RocketMQ事务消息的应用场景

步骤:

  1. 生产者发送prepare消息到RocketMQ,RocketMQ会将消息状态保存在Half Message中,返回Producer本地事务状态
  2. 生产者执行本地Transaction,也就是开始执行正式的业务操作,比如向数据库中插入数据
  3. 如果本地Transaction执行成功,则向RocketMQ发送COMMIT消息,这里需要注意,在返回COMMIT前,RocketMQ不会将消息提交到消费端
  4. 如果本地Transaction出现异常,则向RocketMQ发送ROLLBACK消息,这里需要注意,在返回ROLLBACK前,RocketMQ不会将消息提交到消费端
  5. 消费者正常消费消息,完成消息消费

事务型消息广泛应用于分布式事务场景中,可以解决原先分布式系统中不可避免的一系列事务问题。由于事务型消息具有较高的可靠性和数据一致性,因此在一些对数据准确性要求高的应用场景中得到了广泛的应用。

五、总结

RocketMQ的事务消息机制在分布式事务的场景下能够发挥出它的优越性。它通过事务半消息发送和事务半消息确认两个关键接口实现消息的事务性处理,保证了消息的可靠性和数据一致性,使其在一些对数据准确性要求较高的应用场景中得到了广泛的应用。

原创文章,作者:ROXEW,如若转载,请注明出处:https://www.506064.com/n/333384.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
ROXEWROXEW
上一篇 2025-02-01 13:34
下一篇 2025-02-01 13:34

相关推荐

  • Harris角点检测算法原理与实现

    本文将从多个方面对Harris角点检测算法进行详细的阐述,包括算法原理、实现步骤、代码实现等。 一、Harris角点检测算法原理 Harris角点检测算法是一种经典的计算机视觉算法…

    编程 2025-04-29
  • RabbitMQ和Yii2的消息队列应用

    本文将探讨RabbitMQ和Yii2之间的消息队列应用。从概念、安装和配置、使用实例等多个方面详细讲解,帮助读者了解和掌握RabbitMQ和Yii2的消息队列应用。 一、Rabbi…

    编程 2025-04-29
  • 瘦脸算法 Python 原理与实现

    本文将从多个方面详细阐述瘦脸算法 Python 实现的原理和方法,包括该算法的意义、流程、代码实现、优化等内容。 一、算法意义 随着科技的发展,瘦脸算法已经成为了人们修图中不可缺少…

    编程 2025-04-29
  • 神经网络BP算法原理

    本文将从多个方面对神经网络BP算法原理进行详细阐述,并给出完整的代码示例。 一、BP算法简介 BP算法是一种常用的神经网络训练算法,其全称为反向传播算法。BP算法的基本思想是通过正…

    编程 2025-04-29
  • Java Hmily分布式事务解决方案

    分布式系统是现在互联网公司架构中的必备项,但随着业务的不断扩展,分布式事务的问题也日益凸显。为了解决分布式事务问题,Java Hmily分布式事务解决方案应运而生。本文将对Java…

    编程 2025-04-28
  • ROS线程发布消息异常解决方法

    针对ROS线程发布消息异常问题,我们可以从以下几个方面进行分析和解决。 一、检查ROS代码是否正确 首先,我们需要检查ROS代码是否正确。可能会出现的问题包括: 是否正确初始化RO…

    编程 2025-04-28
  • 使用Python发送微信消息给别人

    问题:如何使用Python发送微信消息给别人? 一、配置微信开发者平台 首先,要想发送微信消息,需要在微信开发者平台中进行配置,来获取对应的授权信息。具体步骤如下: 1、登录微信公…

    编程 2025-04-28
  • GloVe词向量:从原理到应用

    本文将从多个方面对GloVe词向量进行详细的阐述,包括其原理、优缺点、应用以及代码实现。如果你对词向量感兴趣,那么这篇文章将会是一次很好的学习体验。 一、原理 GloVe(Glob…

    编程 2025-04-27
  • 编译原理语法分析思维导图

    本文将从以下几个方面详细阐述编译原理语法分析思维导图: 一、语法分析介绍 1.1 语法分析的定义 语法分析是编译器中将输入的字符流转换成抽象语法树的一个过程。该过程的目的是确保输入…

    编程 2025-04-27
  • MariaDB XA事务的使用方法

    本文将从多个方面对MariaDB XA事务进行详细的阐述,包括XA事务的定义、特点、使用方法以及示例代码等。通过本文的阅读,读者将能够更好地理解和应用MariaDB XA事务。 一…

    编程 2025-04-27

发表回复

登录后才能评论