RocketMQ創建Topic詳解

一、RocketMQ創建Topic命令

創建topic是使用RocketMQ的基本操作之一,具體的操作可以通過控制台或者使用命令行完成。使用命令行創建topic,可以使用下面的命令:

./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}

其中,readQueueNums表示消息隊列數量,writeQueueNums表示寫入隊列數量。這裡需要注意的是:每個Broker目前默認最大創建的隊列數是1000個,如果需要創建更多隊列,請調整相關配置文件。

二、RocketMQ創建Topic指定Broker

在RocketMQ中,topic是以broker為單位進行管理的,所以可以通過指定broker來創建和管理topic。指定broker需要指定其IP地址,具體的命令如下:

./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}

其中,clusterName是指定的集群名稱,readQueueNums和writeQueueNums同上。

三、RocketMQ創建Topic過程

RocketMQ的Topic是由NameServer進行統一管理的,創建Topic需要執行以下操作:

  1. 在創建Topic的時候,需要向NameServer註冊Topic信息。
  2. 創建Topic的時候,會在每個Broker上創建與Topic相關的隊列。
  3. 指定寫入隊列數量,對應Broker上會創建與之相應的flush線程。
  4. 每個消息隊列都會創建一個與之對應的消費隊列,消費隊列會與消費者之間建立連接。

四、RocketMQ創建Topic立刻就能使用

創建Topic之後,需要在Producer和Consumer的代碼中啟用對應的Topic,代碼示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",// topic
        "TagA",// tag
        "OrderID001",// key
        ("Hello RocketMQ ").getBytes());// body
SendResult sendResult = producer.send(msg);

//Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

五、RocketMQ手動創建Topic

在RocketMQ控制台中,可以手動創建和管理Topic。在RocketMQ Console的左側菜單中找到Topics一級菜單,在此處可以查看已經創建的Topic,也可以手動創建Topic。

六、RocketMQ創建Topic命令 -o

在使用mqadmin創建Topic的時候,可以使用-o選項指定創建Topic配置文件的路徑,以達到快速創建Topic的目的。具體操作如下:

./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file

七、RocketMQ創建Topic網站配置

在RocketMQ的配置文件中,可以通過指定topicMap的方式進行topic的配置。在${ROCKET_HOME}/conf/broker.conf文件中添加如下配置:

topicMaps=101:topicA;102:topicB

其中,每一個topic的創建方式和mqadmin指令相同。

八、RocketMQ創建Group

在創建Topic的時候,可以通過設置Group的值進行分組管理。同一Group中的Consumer會消費Topic的不同Message,具體的使用方法可以參考下面的代碼示例:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);

// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

九、RocketMQ創建鏈接

使用RocketMQ進行消息傳遞需要創建鏈接,代碼示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);

// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

總結

通過本篇文章,我們詳細地了解了RocketMQ創建Topic的各個方面,包括使用命令線、指定broker、創建過程、立刻使用、手動創建、創建命令等等,同時還介紹了創建Group和鏈接的方法。通過本文的學習,我們可以更加深入地理解RocketMQ的使用方法。

原創文章,作者:QBRH,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/136963.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
QBRH的頭像QBRH
上一篇 2024-10-04 00:16
下一篇 2024-10-04 00:17

相關推薦

  • KafkaTemplate配置:發送兩個Topic

    本文將從以下幾個方面詳細闡述如何使用KafkaTemplate配置發送兩個Topic。 一、KafkaTemplate簡介 KafkaTemplate是Spring Kafka提供…

    編程 2025-04-27
  • 神經網絡代碼詳解

    神經網絡作為一種人工智能技術,被廣泛應用於語音識別、圖像識別、自然語言處理等領域。而神經網絡的模型編寫,離不開代碼。本文將從多個方面詳細闡述神經網絡模型編寫的代碼技術。 一、神經網…

    編程 2025-04-25
  • Linux sync詳解

    一、sync概述 sync是Linux中一個非常重要的命令,它可以將文件系統緩存中的內容,強制寫入磁盤中。在執行sync之前,所有的文件系統更新將不會立即寫入磁盤,而是先緩存在內存…

    編程 2025-04-25
  • git config user.name的詳解

    一、為什麼要使用git config user.name? git是一個非常流行的分布式版本控制系統,很多程序員都會用到它。在使用git commit提交代碼時,需要記錄commi…

    編程 2025-04-25
  • Python安裝OS庫詳解

    一、OS簡介 OS庫是Python標準庫的一部分,它提供了跨平台的操作系統功能,使得Python可以進行文件操作、進程管理、環境變量讀取等系統級操作。 OS庫中包含了大量的文件和目…

    編程 2025-04-25
  • 詳解eclipse設置

    一、安裝與基礎設置 1、下載eclipse並進行安裝。 2、打開eclipse,選擇對應的工作空間路徑。 File -> Switch Workspace -> [選擇…

    編程 2025-04-25
  • Python輸入輸出詳解

    一、文件讀寫 Python中文件的讀寫操作是必不可少的基本技能之一。讀寫文件分別使用open()函數中的’r’和’w’參數,讀取文件…

    編程 2025-04-25
  • Java BigDecimal 精度詳解

    一、基礎概念 Java BigDecimal 是一個用於高精度計算的類。普通的 double 或 float 類型只能精確表示有限的數字,而對於需要高精度計算的場景,BigDeci…

    編程 2025-04-25
  • nginx與apache應用開發詳解

    一、概述 nginx和apache都是常見的web服務器。nginx是一個高性能的反向代理web服務器,將負載均衡和緩存集成在了一起,可以動靜分離。apache是一個可擴展的web…

    編程 2025-04-25
  • Linux修改文件名命令詳解

    在Linux系統中,修改文件名是一個很常見的操作。Linux提供了多種方式來修改文件名,這篇文章將介紹Linux修改文件名的詳細操作。 一、mv命令 mv命令是Linux下的常用命…

    編程 2025-04-25

發表回復

登錄後才能評論