activemq詳解「activemq埠怎麼調」

一、下載與安裝

直接去官網(
http://activemq.apache.org/)下載最新版本即可,由於這是免安裝的,只需要解壓就行了。安裝完之後進入bin目錄,雙擊 activemq.bat文件(linux下在bin目錄下執行 activemq start)

二、訪問控制台

在瀏覽器輸入:http://ip:8161/admin/,出現如下界面表示啟動成功,默認的用戶名密碼都是admin

Activemq的學習

三、修改埠號

61616為對外服務埠號

8161為控制器埠號

當埠號衝突時,可以修改這兩個埠號。cd conf ,修改activemq.xml 修改裡面的61616埠。修改jetty.xml,修改裡面的8161埠。

queue隊列模式:

和rabbitmq簡單隊列模式一樣,若是有多個消費者消費同一個隊列中的消息的話,默認也是輪詢機制的消費

示例代碼:

public class Productor {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws JMSException {
        //創建工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
        //創建tcp連接
        Connection connection = factory.createConnection();
        //建立連接
        connection.start();
        /**
         * 創建會話,1.是否開啟事務,2.簽收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //創建隊列(消息的目的地)
        Queue queue = session.createQueue(QUEUE_NAME);

        //創建生產者
        MessageProducer producer = session.createProducer(queue);

        //消息非持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        //消息持久化 默認是持久化的
//        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        //創建消息
        TextMessage message = session.createTextMessage("你好嗎");

        //發送消息
        producer.send(message);

        producer.close();
        session.close();
        connection.close();

        System.out.println("發送成功!");

    }
}


public class Consumer {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws JMSException {
        //創建工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);
        //創建tcp連接
        Connection connection = factory.createConnection();
        //建立連接
        connection.start();
        /**
         * 創建會話,1.是否開啟事務,2.簽收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //創建/聲明隊列(消息的目的地)
        Queue queue = session.createQueue(QUEUE_NAME);

        //創建消費者
        MessageConsumer consumer = session.createConsumer(queue);

        /*while (true) {
            //receive會阻塞線程
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println("接收到消息:" + message.getText());
        }*/

        //監聽的方式消費
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("1號接收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

topic隊列模式:

稱為發布訂閱模式,生產者把消息發送給訂閱給某個topic主題的消費者,是分發的模式,這種模式默認需要先啟動消費者,不然就算生產者發布了某個topic主題的消息,消費者也消費不了;除非消費者提前訂閱,並且做了消息持久化的處理,這樣後啟動消費者才能消費提前推送的消息。

代碼:

public class Productor {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic1";
    public static void main(String[] args) throws JMSException {
        //創建工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);

        //非同步投遞
        factory.setUseAsyncSend(true);

        //創建tcp連接
        Connection connection = factory.createConnection();
        /**
         * 創建會話,1.是否開啟事務,2.簽收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //創建/聲明topic(消息的目的地)
        Topic topic = session.createTopic(TOPIC_NAME);

        //創建生產者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);

        //持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        //建立連接
        connection.start();


        //創建消息
        TextMessage message = session.createTextMessage("你好嗎");

        //發送消息,非同步發送回調函數
        producer.send(message, new AsyncCallback() {
            @Override
            public void onSuccess() {
                System.out.println("success");
            }

            @Override
            public void onException(JMSException e) {
                System.out.println("fail");
            }
        });

        producer.close();
        session.close();
        connection.close();

        System.out.println("發送成功!");

    }
}

public class Consumer1 {
    public static final String BORKER_URL = "tcp://127.0.0.1:61616";
    public static final String TOPIC_NAME = "topic1";
    public static void main(String[] args) throws JMSException {
        //創建工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);

        //創建tcp連接
        Connection connection = factory.createConnection();

        //制定clientId
        connection.setClientID("my");

        /**
         * 創建會話,1.是否開啟事務,2.簽收模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //創建/聲明topic(消息的目的地)
        Topic topic = session.createTopic(TOPIC_NAME);

        //訂閱主題
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark");
        //建立連接
        connection.start();
        while (true) {
            //receive會阻塞線程
            //接收訂閱的消息
            TextMessage message = (TextMessage) subscriber.receive();
            System.out.println("接收到消息:" + message.getText());
        }

        /*//創建消費者
        MessageConsumer consumer = session.createConsumer(topic);

        //建立連接
        connection.start();

        *//*while (true) {
            //receive會阻塞線程
            TextMessage message = (TextMessage)consumer.receive();
            System.out.println("接收到消息:" + message.getText());
        }*//*

        //監聽的方式消費
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("1號接收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });*/
    }
}

如何保證消息的可靠性

回答這個問題主要從持久化,事務,簽收這幾個方面入手

消息持久化的核心代碼:

//queue模式的消息持久化 默認是持久化的
 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
 /**
 * topic模式的持久化
 */
Topic topic = session.createTopic(TOPIC_NAME);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();

事務的核心代碼(偏生產者):

//參數設置成true
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//事務提交
session.commit();

簽收的核心代碼(偏消費者):

//參數設置成手動提交
connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//消息簽收
message.acknowledge();

注意:若是既開啟事務,又開啟手動簽收,以事務為準,只要事務被提交了也默認消息被簽收了

性能提升:

1.利用nio的協議比tcp的性能高,

  • 配置方式:在conf目錄下activemq.xml照著下面配置
<broker>
  ...
  <transportConnectors>
    <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>  
  </<transportConnectors>
  ...
</broker>
  • 第二步是代碼訪問方式由tcp改為nio
//創建工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio://127.0.0.1:61616");

2.jdbc+Journaling提高只有jdbc持久化的性能,它在做持久化入資料庫之前,會先將數據保存到Journaling文件中,之後才慢慢同步到資料庫中,等於中間加了一層緩衝層。

  • 把資料庫mysql的驅動包放到lib目錄下
  • 配置方式:在conf目錄下activemq.xml照著下面配置,其中有個createTablesOnStartup屬性,默認值是true,表示每次啟動後去資料庫自動建表

<persistenceAdapter>
  <kahaDB directory="${activemq.data}/kahadb"/>
 </persistenceAdapter>
 //上面是默認配置找到改成下面的配置
<persistenceAdapter> 
  <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/activemq-data" dataSource="#mysql-ds"/>
</persistenceAdapter> 

//下面的代碼寫在<beans>節點中
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  <property name="username" value="activemq"/>
  <property name="password" value="activemq"/>
  <property name="poolPreparedStatements" value="true"/>
</bean>

原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/203746.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
投稿專員的頭像投稿專員
上一篇 2024-12-07 12:11
下一篇 2024-12-07 12:11

相關推薦

發表回復

登錄後才能評論