一、RocketMQ創建Topic命令
創建topic是使用RocketMQ的基本操作之一,具體的操作可以通過控制台或者使用命令行完成。使用命令行創建topic,可以使用下面的命令:
./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}
其中,readQueueNums表示消息隊列數量,writeQueueNums表示寫入隊列數量。這裡需要注意的是:每個Broker目前默認最大創建的隊列數是1000個,如果需要創建更多隊列,請調整相關配置文件。
二、RocketMQ創建Topic指定Broker
在RocketMQ中,topic是以broker為單位進行管理的,所以可以通過指定broker來創建和管理topic。指定broker需要指定其IP地址,具體的命令如下:
./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}
其中,clusterName是指定的集群名稱,readQueueNums和writeQueueNums同上。
三、RocketMQ創建Topic過程
RocketMQ的Topic是由NameServer進行統一管理的,創建Topic需要執行以下操作:
- 在創建Topic的時候,需要向NameServer註冊Topic信息。
- 創建Topic的時候,會在每個Broker上創建與Topic相關的隊列。
- 指定寫入隊列數量,對應Broker上會創建與之相應的flush線程。
- 每個消息隊列都會創建一個與之對應的消費隊列,消費隊列會與消費者之間建立連接。
四、RocketMQ創建Topic立刻就能使用
創建Topic之後,需要在Producer和Consumer的代碼中啟用對應的Topic,代碼示例如下:
// Producer DefaultMQProducer producer = new DefaultMQProducer("producerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest",// topic "TagA",// tag "OrderID001",// key ("Hello RocketMQ ").getBytes());// body SendResult sendResult = producer.send(msg); //Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
五、RocketMQ手動創建Topic
在RocketMQ控制台中,可以手動創建和管理Topic。在RocketMQ Console的左側菜單中找到Topics一級菜單,在此處可以查看已經創建的Topic,也可以手動創建Topic。
六、RocketMQ創建Topic命令 -o
在使用mqadmin創建Topic的時候,可以使用-o選項指定創建Topic配置文件的路徑,以達到快速創建Topic的目的。具體操作如下:
./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file
七、RocketMQ創建Topic網站配置
在RocketMQ的配置文件中,可以通過指定topicMap的方式進行topic的配置。在${ROCKET_HOME}/conf/broker.conf文件中添加如下配置:
topicMaps=101:topicA;102:topicB
其中,每一個topic的創建方式和mqadmin指令相同。
八、RocketMQ創建Group
在創建Topic的時候,可以通過設置Group的值進行分組管理。同一Group中的Consumer會消費Topic的不同Message,具體的使用方法可以參考下面的代碼示例:
// Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes()); SendResult sendResult = producer.send(msg); // Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
九、RocketMQ創建鏈接
使用RocketMQ進行消息傳遞需要創建鏈接,代碼示例如下:
// Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes()); SendResult sendResult = producer.send(msg); // Consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
總結
通過本篇文章,我們詳細地了解了RocketMQ創建Topic的各個方面,包括使用命令線、指定broker、創建過程、立刻使用、手動創建、創建命令等等,同時還介紹了創建Group和鏈接的方法。通過本文的學習,我們可以更加深入地理解RocketMQ的使用方法。
原創文章,作者:QBRH,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/136963.html