一、activemq简介
Apache ActiveMQ是Apache Software Foundation(ASF)的一个开源消息代理项目,它是使用Java语言编写的消息中间件,基于JMS(Java Message Service)规范,提供了常见的消息传递模型(点对点模型和发布/订阅模型)来支持应用程序之间的通信。
二、springboot整合activemq基础
在使用springboot整合activemq之前,需要引入以下依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
在application.properties或application.yml文件中配置activemq的基本属性。
spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.in-memory=true
其中,broker-url是activemq连接的地址,user和password是登录activemq的用户名和密码,in-memory表示是否使用内存存储消息,默认为false表示使用持久化存储。
在代码中注入JmsTemplate来发送和接收消息。以下是一个简单的例子。
@Service
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessage(String queueName, String message) {
jmsTemplate.send(queueName, session -> session.createTextMessage(message));
}
}
Producer中使用JmsTemplate发送消息,其中queueName为消息队列的名称,message为发送的消息内容。接收消息的代码如下。
@Service
public class Consumer {
@JmsListener(destination = "myqueue")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
Consumer中使用@JmsListener注解来监听消息队列,destination表示监听的队列名称,receiveMessage方法接收消息内容。
三、activemq高级用法
1. 事务支持
在消息处理过程中,需要保证消息的可靠性。使用activemq的事务支持可以满足此需求。
@Service
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessage(String queueName, String message) {
jmsTemplate.execute(session -> {
Message msg = session.createTextMessage(message);
session.createProducer(session.createQueue(queueName)).send(msg);
return null;
});
}
}
在发送消息的方法中,使用jmsTemplate.execute来开启一个事务,在事务中发送消息,并通过return null提交事务。如果提交成功,消息会被成功发送;否则,消息将被回滚。
2. 发布/订阅模型
在使用发布/订阅模型时,需要定义主题(topic)。
@Configuration
public class TopicConfig {
@Bean
public Topic topic() {
return new ActiveMQTopic("mytopic");
}
}
定义主题的代码如上所示。在Producer中发送消息时,使用convertAndSend方法,而不是send方法。
@Service
public class Producer {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessage(String topicName, String message) {
jmsTemplate.convertAndSend(topicName, message);
}
}
在Consumer中接收消息时,使用@JmsListener注解,并设置containerFactory为JmsListenerContainerFactory。
@Service
public class Consumer {
@Autowired
JmsListenerContainerFactory jmsListenerContainerFactory;
@JmsListener(destination = "mytopic", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
3. 消息过滤
activemq支持根据一定的条件过滤消息,从而只选择特定的消息进行消费。在定义JmsListener时,可以设置selector参数,该参数定义了选择哪些消息进行消费。
@Service
public class Consumer {
@JmsListener(destination = "myqueue", selector = "color = 'blue'")
public void receiveBlueMessage(String message) {
System.out.println("Received message: " + message);
}
@JmsListener(destination = "myqueue", selector = "color = 'red'")
public void receiveRedMessage(String message) {
System.out.println("Received message: " + message);
}
}
以上代码中,JmsListener的selector参数分别设置为color=’blue’和color=’red’,表示只选择color属性为blue或red的消息进行消费。
4. 死信队列
activemq支持死信队列(Dead Letter Queue,DLQ),当消息处理失败时,可以将该消息发送到DLQ中。在使用DLQ时,需要定义主题和队列。
@Configuration
public class TopicConfig {
@Bean
public Topic topic() {
return new ActiveMQTopic("mytopic.dlq");
}
}
@Configuration
public class QueueConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue("myqueue");
}
@Bean
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setInitialRedeliveryDelay(3000);
redeliveryPolicy.setRedeliveryDelay(3000);
redeliveryPolicy.setUseExponentialBackOff(false);
return redeliveryPolicy;
}
@Bean
public RedeliveryPolicyMap redeliveryPolicyMap() {
RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy());
return redeliveryPolicyMap;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("3-10"); // 控制消费者个数
factory.setSessionTransacted(true);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
// 配置死信队列
RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap());
RedeliveryPlugin[] plugins = { redeliveryPlugin };
factory.setPlugins(plugins);
return factory;
}
}
以上代码中,定义了主题为mytopic.dlq的死信队列,在QueueConfig中配置了队列,并定义了RedeliveryPolicy和RedeliveryPolicyMap,用于配置消息重发的策略。在JmsListenerContainerFactory中设置了sessionTransacted为true,sessionAcknowledgeMode为CLIENT_ACKNOWLEDGE,表示使用事务和客户端确认模式,并通过RedeliveryPlugin来将未处理的消息发送到DLQ中。
四、总结
本文介绍了springboot整合activemq的基础和高级用法,包括事务支持、发布/订阅模型、消息过滤和死信队列。通过本文的学习,读者可以更加深入地了解activemq的应用场景和使用方法。
原创文章,作者:ERWAK,如若转载,请注明出处:https://www.506064.com/n/332906.html
微信扫一扫
支付宝扫一扫