一、activemq簡介
Apache ActiveMQ是Apache Software Foundation(ASF)的一個開源消息代理項目,它是使用Java語言編寫的消息中間件,基於JMS(Java Message Service)規範,提供了常見的消息傳遞模型(點對點模型和發布/訂閱模型)來支持應用程序之間的通信。
二、springboot整合activemq基礎
在使用springboot整合activemq之前,需要引入以下依賴。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
在application.properties或application.yml文件中配置activemq的基本屬性。
spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.in-memory=true
其中,broker-url是activemq連接的地址,user和password是登錄activemq的用戶名和密碼,in-memory表示是否使用內存存儲消息,默認為false表示使用持久化存儲。
在代碼中注入JmsTemplate來發送和接收消息。以下是一個簡單的例子。
@Service
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessage(String queueName, String message) {
jmsTemplate.send(queueName, session -> session.createTextMessage(message));
}
}
Producer中使用JmsTemplate發送消息,其中queueName為消息隊列的名稱,message為發送的消息內容。接收消息的代碼如下。
@Service
public class Consumer {
@JmsListener(destination = "myqueue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
Consumer中使用@JmsListener註解來監聽消息隊列,destination表示監聽的隊列名稱,receiveMessage方法接收消息內容。
三、activemq高級用法
1. 事務支持
在消息處理過程中,需要保證消息的可靠性。使用activemq的事務支持可以滿足此需求。
@Service
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessage(String queueName, String message) {
jmsTemplate.execute(session -> {
Message msg = session.createTextMessage(message);
session.createProducer(session.createQueue(queueName)).send(msg);
return null;
});
}
}
在發送消息的方法中,使用jmsTemplate.execute來開啟一個事務,在事務中發送消息,並通過return null提交事務。如果提交成功,消息會被成功發送;否則,消息將被回滾。
2. 發布/訂閱模型
在使用發布/訂閱模型時,需要定義主題(topic)。
@Configuration
public class TopicConfig {
@Bean
public Topic topic() {
return new ActiveMQTopic("mytopic");
}
}
定義主題的代碼如上所示。在Producer中發送消息時,使用convertAndSend方法,而不是send方法。
@Service
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessage(String topicName, String message) {
jmsTemplate.convertAndSend(topicName, message);
}
}
在Consumer中接收消息時,使用@JmsListener註解,並設置containerFactory為JmsListenerContainerFactory。
@Service
public class Consumer {
@Autowired
JmsListenerContainerFactory jmsListenerContainerFactory;
@JmsListener(destination = "mytopic", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
3. 消息過濾
activemq支持根據一定的條件過濾消息,從而只選擇特定的消息進行消費。在定義JmsListener時,可以設置selector參數,該參數定義了選擇哪些消息進行消費。
@Service
public class Consumer {
@JmsListener(destination = "myqueue", selector = "color = 'blue'")
public void receiveBlueMessage(String message) {
System.out.println("Received message: " + message);
}
@JmsListener(destination = "myqueue", selector = "color = 'red'")
public void receiveRedMessage(String message) {
System.out.println("Received message: " + message);
}
}
以上代碼中,JmsListener的selector參數分別設置為color=’blue’和color=’red’,表示只選擇color屬性為blue或red的消息進行消費。
4. 死信隊列
activemq支持死信隊列(Dead Letter Queue,DLQ),當消息處理失敗時,可以將該消息發送到DLQ中。在使用DLQ時,需要定義主題和隊列。
@Configuration
public class TopicConfig {
@Bean
public Topic topic() {
return new ActiveMQTopic("mytopic.dlq");
}
}
@Configuration
public class QueueConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue("myqueue");
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setInitialRedeliveryDelay(3000);
redeliveryPolicy.setRedeliveryDelay(3000);
redeliveryPolicy.setUseExponentialBackOff(false);
return redeliveryPolicy;
}
@Bean
public RedeliveryPolicyMap redeliveryPolicyMap() {
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy());
return redeliveryPolicyMap;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("3-10"); // 控制消費者個數
factory.setSessionTransacted(true);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
// 配置死信隊列
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap());
RedeliveryPlugin[] plugins = { redeliveryPlugin };
factory.setPlugins(plugins);
return factory;
}
}
以上代碼中,定義了主題為mytopic.dlq的死信隊列,在QueueConfig中配置了隊列,並定義了RedeliveryPolicy和RedeliveryPolicyMap,用於配置消息重發的策略。在JmsListenerContainerFactory中設置了sessionTransacted為true,sessionAcknowledgeMode為CLIENT_ACKNOWLEDGE,表示使用事務和客戶端確認模式,並通過RedeliveryPlugin來將未處理的消息發送到DLQ中。
四、總結
本文介紹了springboot整合activemq的基礎和高級用法,包括事務支持、發布/訂閱模型、消息過濾和死信隊列。通過本文的學習,讀者可以更加深入地了解activemq的應用場景和使用方法。
原創文章,作者:ERWAK,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/332906.html
微信掃一掃
支付寶掃一掃