隊列模式(點對點模式,P2P)特點:
1、客戶端包括生產者和消費者;
2、隊列中的消息只能被一個消費者消費;
3、消費者可以隨時消費隊列中的消息;

隊列模式和主題模式的區別:
1、提前訂閱,隊列模式:消費者不需要提前訂閱也可以消費消息;主題模式:只有提前進行訂閱的消費者才能成功消費消息;
2、多個消費者分配消息:隊列模式:只能平均消費消息,被別的消費者消費的消息不能重複被其他的消費者消費;主題模式:每個訂閱者都可以消費主題模式中的每一條消息;
案例代碼:
生產者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQProducer {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//創建連接工廠 ,,按照定的url地址給定默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通過連接工廠獲取connection連接 並啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//創建會話session 需要兩個參數,第一個事務,第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地(選擇是隊列還是主題)
Queue queue = session.createQueue(QUEUE_NAME);
//創建消息的生產者
MessageProducer messageProducer = session.createProducer(queue);
//通過使用消息生產者messageProducer生產3條消息發送到隊列中
for (int i = 1; i <= 7; i++) {
//創建消息 一個字符串消息
TextMessage textMessage = session.createTextMessage("msg---->" + i);
//通過messageProducer 發佈消息
messageProducer.send(textMessage);
}
//關閉資源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息發送到MQ成功");
}
}
消費者1:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQConsumer {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String QUEUE_NAME="queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通過連接工廠獲取connection連接 並啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//創建會話session 需要兩個參數,第一個事務,第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地(選擇是隊列還是主題)
Queue queue = session.createQueue(QUEUE_NAME);
//創建消息的消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
//從隊列中獲取消息 receive未設置最大時間 是阻塞的,
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage !=null){
System.out.println("消費者接受到消息---->"+textMessage.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}輸出:
INFO | Successfully connected to tcp://192.168.1.17:61616
消費者接受到消息---->msg---->2
消費者接受到消息---->msg---->4
消費者接受到消息---->msg---->6消費者2:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class ActiveMQConsumerListener {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通過連接工廠獲取connection連接 並啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//創建會話session 需要兩個參數,第一個事務,第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地(選擇是隊列還是主題)
Queue queue = session.createQueue(QUEUE_NAME);
//創建消息的消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
//通過監聽的機制消費消息
messageConsumer.setMessageListener((message) -> {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費者接受到消息---->" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//不關閉控制台 如果不加這句話,在下面可能在連接的時候直接關閉了,造成無法消費的問題
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
輸出:
INFO | Successfully connected to tcp://192.168.1.17:61616
消費者接受到消息---->msg---->1
消費者接受到消息---->msg---->3
消費者接受到消息---->msg---->5
消費者接受到消息---->msg---->7
Number Of Consumers:表示消費者數量;
Number Of Pending Messages:等待消費的消息,這個是當前未出隊列的數量;
Messages Enqueued:進入隊列的消息;( 這個數量只增不減,重啟後會清零);
Messages Dequeued:出了隊列的消息 可以理解為是消費者消費掉的數量 (重啟後會清零);
持久化案例代碼:
ActiveMQ持久化,生產者產生的數據,在沒有被消費者消費時,先保存到數據庫中,當數據被消費者消費後,再從數據庫中刪除。
生產者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQProducer {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String QUEUE_NAME = "queue02";
public static void main(String[] args) throws JMSException {
//創建連接工廠 ,,按照定的url地址給定默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通過連接工廠獲取connection連接 並啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//創建會話session 需要兩個參數,第一個事務,第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地(選擇是隊列還是主題)
Queue queue = session.createQueue(QUEUE_NAME);
//創建消息的生產者
MessageProducer messageProducer = session.createProducer(queue);
// 消息持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//通過使用消息生產者messageProducer生產3條消息發送到隊列中
for (int i = 1; i <= 7; i++) {
//創建消息 一個字符串消息
TextMessage textMessage = session.createTextMessage("msg---->" + i);
//通過messageProducer 發佈消息
messageProducer.send(textMessage);
}
//關閉資源
messageProducer.close();
session.close();
connection.close();
System.out.println("消息發送到MQ成功");
}
}代碼:
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
消費者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQConsumer {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String QUEUE_NAME="queue02";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通過連接工廠獲取connection連接 並啟動訪問
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("client-queue02-01");
connection.start();
//創建會話session 需要兩個參數,第一個事務,第二個簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地(選擇是隊列還是主題)
Queue queue = session.createQueue(QUEUE_NAME);
//創建消息的消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
//從隊列中獲取消息 receive未設置最大時間 是阻塞的,
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage !=null){
System.out.println("消費者接受到消息---->"+textMessage.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}測試:
1、先運行生產者,ActiveMQProducer
2、查看數據庫:

3、在運行消費者,ActiveMQConsumer,輸出:
INFO | Successfully connected to tcp://192.168.1.17:61616
消費者接受到消息---->msg---->1
消費者接受到消息---->msg---->2
消費者接受到消息---->msg---->3
消費者接受到消息---->msg---->4
消費者接受到消息---->msg---->5
消費者接受到消息---->msg---->6
消費者接受到消息---->msg---->74、再次查看數據庫,消息已刪除。
原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/251222.html
微信掃一掃
支付寶掃一掃