一、什麼是Spring Cloud RocketMQ
Spring Cloud RocketMQ是Spring Cloud家族中的一個微服務框架,它集成了RocketMQ、Spring Cloud Stream以及Spring Boot等技術,為開發人員提供了一個完整的、基於消息驅動的分佈式系統解決方案。
Spring Cloud RocketMQ支持消息的生產與消費,通過消息中間件RocketMQ實現跨服務的消息傳遞和通信。使用Spring Cloud Stream統一封裝了與消息中間件的交互細節,使得開發者聚焦於業務邏輯的實現上。而Spring Boot的集成又為開發者帶來了便捷、靈活和高效的開發體驗。
二、Spring Cloud RocketMQ基本應用
1、引入依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
<version>3.0.3</version>
</dependency>
需要注意的是,Spring Cloud Stream Binder RocketMQ的版本與RocketMQ的版本是相關聯的。因此,在使用之前需要先確認版本的兼容性。
2、定義生產者
@Component
@EnableBinding(Source.class)
public class RocketMQProducer {
@Autowired
private Source source;
public void send(String messageContent) {
Message message = MessageBuilder.withPayload(messageContent.getBytes()).build();
source.output().send(message);
}
}
生產者的實現很簡單,只需要通過@EnableBinding註解綁定Source,並使用MessageBuilder創建消息並發送即可。
3、定義消費者
@Component
@EnableBinding(Sink.class)
public class RocketMQConsumer {
@StreamListener(Sink.INPUT)
public void receive(Message message) {
System.out.println("Received Message: " + new String((byte[]) message.getPayload()));
}
}
消費者同樣很簡單,只需要使用@EnableBinding註解綁定Sink,並使用@StreamListener註解接收消息即可。
三、Spring Cloud RocketMQ進階應用
1、消息確認機制
在實際應用中,消息往往一次性發送給多個消費者,出現消息處理失敗等情況時需要有一種確認機制保證消息不會丟失。Spring Cloud RocketMQ提供了兩種不同的確認機制,即自動確認機制和手動確認機制。
@Configuration
@EnableBinding(MyCustomChannel.class)
public class RocketMQConfirmConfiguration {
@Autowired
private MyCustomChannel myCustomChannel;
@Bean
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer defaultMQProducer) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(defaultMQProducer);
rocketMQTemplate.setSendMsgTimeout(3000);
rocketMQTemplate.setCheckThreadPoolExecutor(Executors.newFixedThreadPool(50));
return rocketMQTemplate;
}
@Bean
public DefaultMQProducer defaultMQProducer() throws Exception {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setProducerGroup("default_group");
defaultMQProducer.setNamesrvAddr("localhost:9876");
defaultMQProducer.setInstanceName("default");
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(1);
defaultMQProducer.start();
return defaultMQProducer;
}
@Bean
public RocketMQTransactionListener rocketMQTransactionListener() {
return new RocketMQTransactionListenerImpl();
}
@Bean
public TransactionMQProducer transactionMQProducer(DefaultMQProducer defaultMQProducer,
RocketMQTransactionListener rocketMQTransactionListener) throws Exception {
TransactionMQProducer transactionMQProducer = new TransactionMQProducer("trans_group");
transactionMQProducer.setNamesrvAddr("localhost:9876");
transactionMQProducer.setProducerGroup("trans_group");
transactionMQProducer.setTransactionListener(rocketMQTransactionListener);
transactionMQProducer.setExecutorService(new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
}));
transactionMQProducer.start();
return transactionMQProducer;
}
@Bean
public RocketMQLocalTransactionCheckerImpl rocketMQLocalTransactionChecker() {
return new RocketMQLocalTransactionCheckerImpl();
}
@Bean
public RocketMQLocalTransactionStateConverterImpl rocketMQLocalTransactionStateConverter() {
return new RocketMQLocalTransactionStateConverterImpl();
}
@Bean
public JmsMessagingTemplate jmsMessagingTemplate() {
return new JmsMessagingTemplate(myCustomChannel.output());
}
}
上面是確認機制的配置代碼,我們需要在配置類中定義Producer,根據需求選擇自動確認或手動確認機制,設置相關屬性並啟動Producer。具體代碼實現過程如下:
- 自動確認機制
rocketMQTemplate.convertAndSend("myTopic", "Hello, World!");
使用RocketMQTemplate的convertAndSend()方法即可發送消息,默認使用自動確認機制。
rocketMQTemplate.sendMessageInTransaction("trans_topic", message, null);
使用RocketMQTemplate的sendMessageInTransaction()方法即可啟用手動確認機制。
2、消息分組機制
消息分組功能允許將一組Topic歸類為一個Group,方便在服務端進行管理,同時也方便客戶端使用,並且還能提高消息發送和接收的效率。具體實現方式如下:
@Configuration
public class RocketMQConfig {
@Bean(name = "rocketMQMessageSender")
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer("TestGroup");
producer.setNamesrvAddr("localhost:9876");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
}
配置文件中指定名稱為testGroup的DefaultMQProducer。
3、消息過濾機制
Spring Cloud RocketMQ支持使用SQL92定義消息過濾規則,讓客戶端可以按照指定的規則篩選出滿足條件的消息。具體實現方式如下:
3.1、消息消費者
@Component
@EnableBinding(value = { Sink.class })
public class RocketMQConsumerFilter {
@Autowired
private Sink sink;
@StreamListener(target = Sink.INPUT, condition = "headers['tag'] == 'tagA'")
public void handleA(String message) {
System.out.println("received message A: " + message);
}
@StreamListener(target = Sink.INPUT, condition = "headers['tag'] == 'tagB'")
public void handleB(String message) {
System.out.println("received message B: " + message);
}
}
這裡定義了兩個消費者,每個消費者根據tag值進行過濾。當tag值為tagA時,調用handleA()方法,當tag值為tagB時,調用handleB()方法。
3.2、消息生產者
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTagMessage(String message, String tag) {
Message msg = MessageBuilder.withPayload(message).setHeader(MessageConst.PROPERTY_TAGS, tag).build();
SendResult sendResult = rocketMQTemplate.syncSend("test-tag-topic", msg);
System.out.printf("Send tag message: %s; send result: %s \n", message, sendResult);
}
這裡使用MessageBuilder設置消息的tag值,$tags屬性可以用於過濾和重複消費。
四、總結
本文詳細闡述了Spring Cloud RocketMQ的基本應用和進階應用,包括消息確認機制、消息分組機制和消息過濾機制。Spring Cloud RocketMQ集成了RocketMQ、Spring Cloud Stream以及Spring Boot等技術,為開發者提供了一個完整的、基於消息驅動的分佈式系統解決方案。通過本文的介紹,相信讀者已經掌握了Spring Cloud RocketMQ的基礎知識和高級特性,能夠在實際項目中應用。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/242168.html
微信掃一掃
支付寶掃一掃