一、什么是RocketMQ事务消息
RocketMQ事务消息是指在消息发送方发送消息时,延迟将消息状态提交给broker,由broker进行二次确认,以确保消息不会因发送失败而丢失,同时可以保证消息的一致性。
具体操作流程为:在事务消息发送完毕后,transactionProducer会等待确认回调中返回TransactionStatus的状态,根据回调的状态执行send或者rollback操作。
实现RocketMQ事务消息至少需要三步操作:执行本地事务 – 提交half消息 – 提交事务。
TransactionMQProducer transactionProducer = new TransactionMQProducer("group_name");
transactionProducer.setTransactionListener(new TransactionListenerImpl());
transactionProducer.start();
Message message = new Message("topic_name", "message_body".getBytes(Charset.defaultCharset()));
transactionProducer.sendMessageInTransaction(message, null);
二、如何实现RocketMQ事务消息
1. 实现本地事务
实现RocketMQ事务消息首先需要实现本地事务,即执行SQL或者其他操作,将需要发送的消息与该事务绑定在一起。
实现本地事务需要保证事务的原子性、一致性、隔离性和持久性。
public class TransactionListenerImpl implements TransactionListener {
private final ConcurrentHashMap localTrans = new ConcurrentHashMap();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = "transaction_id";
// 执行本地事务操作,返回事务状态
int status = executeLocalTransaction(transactionId);
localTrans.put(transactionId, status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = "transaction_id";
Integer status = localTrans.get(transactionId);
if (status != null) {
switch (status) {
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.UNKNOW;
}
}
return LocalTransactionState.UNKNOW;
}
}
2. 提交Half消息
Half消息是指在交易“半成品”,是指事务消息发送成功之后,但是transactionProducer等待确认回调的过程中,半成品的消息实体。
提交Half消息需要先构建消息体,然后使用transactionProducer的sendMessageInTransaction方法发送半成品消息。
Message message = new Message("topic_name", "message_body".getBytes(Charset.defaultCharset()));
transactionProducer.sendMessageInTransaction(message, null);
3. 提交事务消息
当本地事务提交成功之后,即可调用sendMessage方法将半成品消息实体提交为事务消息。
producer.sendMessage(message);
三、RocketMQ事务消息的优缺点
1. 优点
事务消息保证了消息发送的可靠性,满足了一些数据一致性比较高的业务场景。
对于分布式事务,RocketMQ使用可靠消息最终一致性方案,以避免分布式锁的使用。
使用RocketMQ事务消息可以在业务代码中实现一些基于消息的分布式事务。
2. 缺点
使用事务消息会增加数据的处理延迟,因为必须等待严格的维护。
事务消息的处理还需要保证SQL的执行过程是幂等的,以避免由于transactionProducer重试而引起数据不一致问题。
事务消息在提高数据可靠性的同时,会带来一些额外的系统复杂度,需要谨慎使用。
四、总结
顾名思义,事务消息通过事务的方式保证消息的可靠性,从而解决了消息发送可靠性的问题,尤其是在某些业务场景下数据一致性非常重要的应用场景。
在具体实践中,需要注意消息的幂等性以及其他一些额外复杂度,谨慎使用。
原创文章,作者:UBYYU,如若转载,请注明出处:https://www.506064.com/n/372070.html
微信扫一扫
支付宝扫一扫