activemq和kafka區別「activemq消息持久化方式」

隊列模式(點對點模式,P2P)特點:

1、客戶端包括生產者和消費者;

2、隊列中的消息只能被一個消費者消費;

3、消費者可以隨時消費隊列中的消息;

Java,ActiveMQ,隊列/點對點模式,消息持久化案例

隊列模式和主題模式的區別:

1、提前訂閱,隊列模式:消費者不需要提前訂閱也可以消費消息;主題模式:只有提前進行訂閱的消費者才能成功消費消息;

2、多個消費者分配消息:隊列模式:只能平均消費消息,被別的消費者消費的消息不能重複被其他的消費者消費;主題模式:每個訂閱者都可以消費主題模式中的每一條消息;

案例代碼:

生產者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQProducer {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //創建連接工廠 ,,按照定的url地址給定默認的用戶名和密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通過連接工廠獲取connection連接 並啟動訪問
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //創建會話session  需要兩個參數,第一個事務,第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建目的地(選擇是隊列還是主題)
        Queue queue = session.createQueue(QUEUE_NAME);
        //創建消息的生產者
        MessageProducer messageProducer = session.createProducer(queue);
        //通過使用消息生產者messageProducer生產3條消息發送到隊列中
        for (int i = 1; i <= 7; i++) {
            //創建消息   一個字符串消息
            TextMessage textMessage = session.createTextMessage("msg---->" + i);
            //通過messageProducer 發佈消息
            messageProducer.send(textMessage);
        }
        //關閉資源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息發送到MQ成功");
    }

}

消費者1:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQConsumer {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String QUEUE_NAME="queue01";
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通過連接工廠獲取connection連接 並啟動訪問
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //創建會話session  需要兩個參數,第一個事務,第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建目的地(選擇是隊列還是主題)
        Queue queue = session.createQueue(QUEUE_NAME);
        //創建消息的消費者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while (true){
            //從隊列中獲取消息  receive未設置最大時間 是阻塞的,
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage !=null){
                System.out.println("消費者接受到消息---->"+textMessage.getText());
            }else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }

}

輸出:

 INFO | Successfully connected to tcp://192.168.1.17:61616
消費者接受到消息---->msg---->2
消費者接受到消息---->msg---->4
消費者接受到消息---->msg---->6

消費者2:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class ActiveMQConsumerListener {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通過連接工廠獲取connection連接 並啟動訪問
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //創建會話session  需要兩個參數,第一個事務,第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建目的地(選擇是隊列還是主題)
        Queue queue = session.createQueue(QUEUE_NAME);
        //創建消息的消費者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //通過監聽的機制消費消息
        messageConsumer.setMessageListener((message) -> {
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消費者接受到消息---->" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //不關閉控制台  如果不加這句話,在下面可能在連接的時候直接關閉了,造成無法消費的問題
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }

}

輸出:

 INFO | Successfully connected to tcp://192.168.1.17:61616
消費者接受到消息---->msg---->1
消費者接受到消息---->msg---->3
消費者接受到消息---->msg---->5
消費者接受到消息---->msg---->7
Java,ActiveMQ,隊列/點對點模式,消息持久化案例

Number Of Consumers:表示消費者數量;

Number Of Pending Messages:等待消費的消息,這個是當前未出隊列的數量;

Messages Enqueued:進入隊列的消息;( 這個數量只增不減,重啟後會清零);

Messages Dequeued:出了隊列的消息 可以理解為是消費者消費掉的數量 (重啟後會清零);

持久化案例代碼:

ActiveMQ持久化,生產者產生的數據,在沒有被消費者消費時,先保存到數據庫中,當數據被消費者消費後,再從數據庫中刪除。

生產者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQProducer {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String QUEUE_NAME = "queue02";

    public static void main(String[] args) throws JMSException {
        //創建連接工廠 ,,按照定的url地址給定默認的用戶名和密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通過連接工廠獲取connection連接 並啟動訪問
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //創建會話session  需要兩個參數,第一個事務,第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建目的地(選擇是隊列還是主題)
        Queue queue = session.createQueue(QUEUE_NAME);
        //創建消息的生產者
        MessageProducer messageProducer = session.createProducer(queue);
        // 消息持久化
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        //通過使用消息生產者messageProducer生產3條消息發送到隊列中
        for (int i = 1; i <= 7; i++) {
            //創建消息   一個字符串消息
            TextMessage textMessage = session.createTextMessage("msg---->" + i);
            //通過messageProducer 發佈消息
            messageProducer.send(textMessage);
        }
        //關閉資源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息發送到MQ成功");
    }

}

代碼:
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

消費者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQConsumer {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String QUEUE_NAME="queue02";
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通過連接工廠獲取connection連接 並啟動訪問
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("client-queue02-01");
        connection.start();
        //創建會話session  需要兩個參數,第一個事務,第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建目的地(選擇是隊列還是主題)
        Queue queue = session.createQueue(QUEUE_NAME);
        //創建消息的消費者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while (true){
            //從隊列中獲取消息  receive未設置最大時間 是阻塞的,
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage !=null){
                System.out.println("消費者接受到消息---->"+textMessage.getText());
            }else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }

}

測試:

1、先運行生產者,ActiveMQProducer

2、查看數據庫:

Java,ActiveMQ,隊列/點對點模式,消息持久化案例

3、在運行消費者,ActiveMQConsumer,輸出:

 INFO | Successfully connected to tcp://192.168.1.17:61616
消費者接受到消息---->msg---->1
消費者接受到消息---->msg---->2
消費者接受到消息---->msg---->3
消費者接受到消息---->msg---->4
消費者接受到消息---->msg---->5
消費者接受到消息---->msg---->6
消費者接受到消息---->msg---->7

4、再次查看數據庫,消息已刪除。

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/251222.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
投稿專員的頭像投稿專員
上一篇 2024-12-13 17:22
下一篇 2024-12-13 17:22

相關推薦

發表回復

登錄後才能評論