一、RocketMQ創建Topic命令
創建topic是使用RocketMQ的基本操作之一,具體的操作可以通過控制台或者使用命令行完成。使用命令行創建topic,可以使用下面的命令:
./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}
其中,readQueueNums表示消息隊列數量,writeQueueNums表示寫入隊列數量。這裡需要注意的是:每個Broker目前默認最大創建的隊列數是1000個,如果需要創建更多隊列,請調整相關配置文件。
二、RocketMQ創建Topic指定Broker
在RocketMQ中,topic是以broker為單位進行管理的,所以可以通過指定broker來創建和管理topic。指定broker需要指定其IP地址,具體的命令如下:
./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}
其中,clusterName是指定的集群名稱,readQueueNums和writeQueueNums同上。
三、RocketMQ創建Topic過程
RocketMQ的Topic是由NameServer進行統一管理的,創建Topic需要執行以下操作:
- 在創建Topic的時候,需要向NameServer註冊Topic信息。
- 創建Topic的時候,會在每個Broker上創建與Topic相關的隊列。
- 指定寫入隊列數量,對應Broker上會創建與之相應的flush線程。
- 每個消息隊列都會創建一個與之對應的消費隊列,消費隊列會與消費者之間建立連接。
四、RocketMQ創建Topic立刻就能使用
創建Topic之後,需要在Producer和Consumer的代碼中啟用對應的Topic,代碼示例如下:
// Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"OrderID001",// key
("Hello RocketMQ ").getBytes());// body
SendResult sendResult = producer.send(msg);
//Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
五、RocketMQ手動創建Topic
在RocketMQ控制台中,可以手動創建和管理Topic。在RocketMQ Console的左側菜單中找到Topics一級菜單,在此處可以查看已經創建的Topic,也可以手動創建Topic。
六、RocketMQ創建Topic命令 -o
在使用mqadmin創建Topic的時候,可以使用-o選項指定創建Topic配置文件的路徑,以達到快速創建Topic的目的。具體操作如下:
./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file
七、RocketMQ創建Topic網站配置
在RocketMQ的配置文件中,可以通過指定topicMap的方式進行topic的配置。在${ROCKET_HOME}/conf/broker.conf文件中添加如下配置:
topicMaps=101:topicA;102:topicB
其中,每一個topic的創建方式和mqadmin指令相同。
八、RocketMQ創建Group
在創建Topic的時候,可以通過設置Group的值進行分組管理。同一Group中的Consumer會消費Topic的不同Message,具體的使用方法可以參考下面的代碼示例:
// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);
// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
九、RocketMQ創建鏈接
使用RocketMQ進行消息傳遞需要創建鏈接,代碼示例如下:
// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);
// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
總結
通過本篇文章,我們詳細地了解了RocketMQ創建Topic的各個方面,包括使用命令線、指定broker、創建過程、立刻使用、手動創建、創建命令等等,同時還介紹了創建Group和鏈接的方法。通過本文的學習,我們可以更加深入地理解RocketMQ的使用方法。
原創文章,作者:QBRH,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/136963.html
微信掃一掃
支付寶掃一掃