一、什麼是RocketMQ事務消息
RocketMQ事務消息是指在消息發送方發送消息時,延遲將消息狀態提交給broker,由broker進行二次確認,以確保消息不會因發送失敗而丟失,同時可以保證消息的一致性。
具體操作流程為:在事務消息發送完畢後,transactionProducer會等待確認回調中返回TransactionStatus的狀態,根據回調的狀態執行send或者rollback操作。
實現RocketMQ事務消息至少需要三步操作:執行本地事務 – 提交half消息 – 提交事務。
TransactionMQProducer transactionProducer = new TransactionMQProducer("group_name"); transactionProducer.setTransactionListener(new TransactionListenerImpl()); transactionProducer.start(); Message message = new Message("topic_name", "message_body".getBytes(Charset.defaultCharset())); transactionProducer.sendMessageInTransaction(message, null);
二、如何實現RocketMQ事務消息
1. 實現本地事務
實現RocketMQ事務消息首先需要實現本地事務,即執行SQL或者其他操作,將需要發送的消息與該事務綁定在一起。
實現本地事務需要保證事務的原子性、一致性、隔離性和持久性。
public class TransactionListenerImpl implements TransactionListener { private final ConcurrentHashMap localTrans = new ConcurrentHashMap(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transactionId = "transaction_id"; // 執行本地事務操作,返回事務狀態 int status = executeLocalTransaction(transactionId); localTrans.put(transactionId, status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String transactionId = "transaction_id"; Integer status = localTrans.get(transactionId); if (status != null) { switch (status) { case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.UNKNOW; } } return LocalTransactionState.UNKNOW; } }
2. 提交Half消息
Half消息是指在交易「半成品」,是指事務消息發送成功之後,但是transactionProducer等待確認回調的過程中,半成品的消息實體。
提交Half消息需要先構建消息體,然後使用transactionProducer的sendMessageInTransaction方法發送半成品消息。
Message message = new Message("topic_name", "message_body".getBytes(Charset.defaultCharset())); transactionProducer.sendMessageInTransaction(message, null);
3. 提交事務消息
當本地事務提交成功之後,即可調用sendMessage方法將半成品消息實體提交為事務消息。
producer.sendMessage(message);
三、RocketMQ事務消息的優缺點
1. 優點
事務消息保證了消息發送的可靠性,滿足了一些數據一致性比較高的業務場景。
對於分散式事務,RocketMQ使用可靠消息最終一致性方案,以避免分散式鎖的使用。
使用RocketMQ事務消息可以在業務代碼中實現一些基於消息的分散式事務。
2. 缺點
使用事務消息會增加數據的處理延遲,因為必須等待嚴格的維護。
事務消息的處理還需要保證SQL的執行過程是冪等的,以避免由於transactionProducer重試而引起數據不一致問題。
事務消息在提高數據可靠性的同時,會帶來一些額外的系統複雜度,需要謹慎使用。
四、總結
顧名思義,事務消息通過事務的方式保證消息的可靠性,從而解決了消息發送可靠性的問題,尤其是在某些業務場景下數據一致性非常重要的應用場景。
在具體實踐中,需要注意消息的冪等性以及其他一些額外複雜度,謹慎使用。
原創文章,作者:UBYYU,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/372070.html