RocketMQTemplate.convertAndSend是Spring RocketMQ的消息發送模板。它提供了多種消息發送的方法,支持發送同步消息、異步消息、單向消息和順序消息等。RocketMQTemplate.convertAndSend方法具有極高的靈活性,可以讓開發者自主選擇序列化方式、topic、tag等相關參數。下面將從多個方面詳細介紹該方法。
一、方法參數與返回值
RocketMQTemplate.convertAndSend方法是Spring RocketMQ的消息發送模板,它的參數中包含以下幾個關鍵參數:
- topic:消息主題
- message:消息內容,可以為任何Java對象
- MessagePostProcessor:消息處理器,用於處理消息的元數據如消息頭、消息tag等
發送消息成功後,該方法的返回值是一個org.springframework.messaging.Message對象。可以通過該對象獲取發送的消息的元數據,如消息ID、消息發送時間等信息。
二、消息內容序列化
RocketMQTemplate.convertAndSend方法會將Java對象序列化成位元組數組,以便RocketMQ可以進行傳輸和持久化。Spring RocketMQ支持多種消息序列化方式,包括JDK默認序列化、JSON序列化、Protobuf序列化等。默認情況下,Spring RocketMQ使用JDK默認序列化方式。
如果想要更換序列化方式,可以使用org.springframework.messaging.converter.MessageConverter接口進行定製。例如,使用JSON序列化方式:
@Configuration
public class RocketMQConfig {
@Bean
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper objectMapper) {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setObjectMapper(objectMapper);
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setMessageConverter(messageConverter);
rocketMQTemplate.setProducer(mqProducer);
return rocketMQTemplate;
}
}
三、消息處理器MessagePostProcessor
RocketMQTemplate.convertAndSend方法之所以設計MessagePostProcessor參數,是為了讓開發者能夠對消息的元數據進行處理。例如,可以將消息中的tag設置為業務相關的信息,方便後續處理、查詢等操作。
以下是一個MessagePostProcessor的示例代碼:
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.getMessageProperties().setUserProperty("tag", "important");
return message;
}
};
rocketMQTemplate.convertAndSend("myTopic", "hello world", messagePostProcessor);
四、同步、異步和單向消息發送
RocketMQTemplate.convertAndSend方法支持同步、異步和單向消息發送。其中,同步消息會等待MQ返迴響應結果,返回消息的Message對象,是可以從MQ Broker中正常獲得的完整信息。異步消息不會等待MQ返回,通過傳入一個SendCallback回調函數,僅在發送完成或發送失敗時被回調。單向消息則是最簡單、也是性能最好的一種消息發送模式,方法只負責把消息發送給MQ Broker,不關心消息是否發送成功或失敗。下面是這三種發送方式的示例代碼:
// 同步消息發送
Message syncMessage = rocketMQTemplate.syncSend("myTopic", "hello world");
System.out.println(syncMessage.getPayload());
// 異步消息發送
rocketMQTemplate.asyncSend("myTopic", "hello world", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("send success: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("send fail" + e.getMessage());
}
});
// 單向消息發送
rocketMQTemplate.sendOneWay("myTopic", "hello world");
五、順序消息發送
對於需要保證消息順序的業務場景,RocketMQ支持順序消息的發送。Spring RocketMQ同樣提供了順序消息發送的支持。下面是一個發送順序消息的示例:
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
@Override
public MessageQueue select(List list, Message message, Object o) {
String orderId = (String) o;
int index = Math.abs(orderId.hashCode()) % list.size();
return list.get(index);
}
});
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("hello world").build(), "order-001");
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("hello MQ").build(), "order-002");
rocketMQTemplate.send("myTopic", MessageBuilder.withPayload("rocketmq").build(), "order-003");
以上就是對RocketMQTemplate.convertAndSend方法的詳細介紹。通過該方法,開發者可以輕鬆完成RocketMQ的消息發送,同時也可以根據業務需求選擇相關的參數和發送方式,提高整個應用的效率和可維護性。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/280627.html
微信掃一掃
支付寶掃一掃