一、RocketMQ 簡介
RocketMQ是阿里巴巴團隊開發的一款開源的分散式消息中間件。它具有低延遲、高吞吐量、高可靠性等特點,廣泛應用於電商、金融、物流等領域中。
RocketMQ基於發布訂閱模式,支持順序消息、事務消息、定時消息等特性,可橫向擴展,提高系統的可用性和可靠性。
本文將主要介紹 RocketMQ 在 Windows 環境下的開發和部署。
二、RocketMQ Windows 安裝
1. 下載壓縮包
wget http://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
2. 解壓縮
unzip rocketmq-all-4.3.2-bin-release.zip
3. 配置環境變數
在系統環境變數中添加以下兩個變數:
ROCKETMQ_HOME = 解壓縮後的目錄路徑,例如:C:\rocketmq-all-4.3.2-bin-release
PATH = %ROCKETMQ_HOME%\bin;
4. 測試是否安裝成功
mqnamesrv
mqbroker
以上兩個命令分別啟動了 RocketMQ 的namesrv和broker服務,如果能夠正常啟動則表示安裝成功。
三、RocketMQ Windows 常用命令
1. 啟動 namesrv 服務
mqnamesrv
2. 啟動 broker 服務
mqbroker -n localhost:9876
3. 查看 broker 狀態
mqadmin topicStatus -t TopicTest -n localhost:9876
4. 創建 topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t TopicTest
5. 發送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg);
producer.shutdown();
}
}
6. 消費消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("Receive message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started.");
}
}
四、RocketMQ Windows 開發注意事項
1. RocketMQ 的 Producer 和 Consumer 都需要啟動namesrv服務,否則無法正常連接 broker。
2. 在 Windows 下,RocketMQ 的shell腳本無法使用,需要使用命令行工具或者Java API進行操作。
3. Producer 和 Consumer 的線程池配置需要根據實際需求進行調整,避免線程池開銷過大影響系統性能。
五、總結
RocketMQ是一款非常優秀的分散式消息中間件,支持多種消息類型和特性,適用於各類場景。在 Windows 環境下,我們可以使用Java API進行開發、部署和測試,如果需要使用Shell命令,則需要使用Cygwin等工具。
原創文章,作者:MVCPI,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/351516.html