RocketMQ创建Topic详解

一、RocketMQ创建Topic命令

创建topic是使用RocketMQ的基本操作之一,具体的操作可以通过控制台或者使用命令行完成。使用命令行创建topic,可以使用下面的命令:

./mqadmin updateTopic -c {clusterName}nameSrvAddr {nameSrvAddr} -t {topicName} -n {readQueueNums} -r {writeQueueNums}

其中,readQueueNums表示消息队列数量,writeQueueNums表示写入队列数量。这里需要注意的是:每个Broker目前默认最大创建的队列数是1000个,如果需要创建更多队列,请调整相关配置文件。

二、RocketMQ创建Topic指定Broker

在RocketMQ中,topic是以broker为单位进行管理的,所以可以通过指定broker来创建和管理topic。指定broker需要指定其IP地址,具体的命令如下:

./mqadmin updateTopic -b {brokerAddr} -n {readQueueNums} -r {writeQueueNums} -t {topicName} -c {clusterName}

其中,clusterName是指定的集群名称,readQueueNums和writeQueueNums同上。

三、RocketMQ创建Topic过程

RocketMQ的Topic是由NameServer进行统一管理的,创建Topic需要执行以下操作:

  1. 在创建Topic的时候,需要向NameServer注册Topic信息。
  2. 创建Topic的时候,会在每个Broker上创建与Topic相关的队列。
  3. 指定写入队列数量,对应Broker上会创建与之相应的flush线程。
  4. 每个消息队列都会创建一个与之对应的消费队列,消费队列会与消费者之间建立连接。

四、RocketMQ创建Topic立刻就能使用

创建Topic之后,需要在Producer和Consumer的代码中启用对应的Topic,代码示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest",// topic
        "TagA",// tag
        "OrderID001",// key
        ("Hello RocketMQ ").getBytes());// body
SendResult sendResult = producer.send(msg);

//Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

五、RocketMQ手动创建Topic

在RocketMQ控制台中,可以手动创建和管理Topic。在RocketMQ Console的左侧菜单中找到Topics一级菜单,在此处可以查看已经创建的Topic,也可以手动创建Topic。

六、RocketMQ创建Topic命令 -o

在使用mqadmin创建Topic的时候,可以使用-o选项指定创建Topic配置文件的路径,以达到快速创建Topic的目的。具体操作如下:

./mqadmin updateTopic -c {clusterName} -n {readQueueNums} -r {writeQueueNums} -t {topic} -u true -o path/to/config/file

七、RocketMQ创建Topic网站配置

在RocketMQ的配置文件中,可以通过指定topicMap的方式进行topic的配置。在${ROCKET_HOME}/conf/broker.conf文件中添加如下配置:

topicMaps=101:topicA;102:topicB

其中,每一个topic的创建方式和mqadmin指令相同。

八、RocketMQ创建Group

在创建Topic的时候,可以通过设置Group的值进行分组管理。同一Group中的Consumer会消费Topic的不同Message,具体的使用方法可以参考下面的代码示例:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);

// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

九、RocketMQ创建链接

使用RocketMQ进行消息传递需要创建链接,代码示例如下:

// Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();    
Message msg = new Message("TopicTest","TagA","OrderID001",("Hello RocketMQ ").getBytes());
SendResult sendResult = producer.send(msg);

// Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List msgs,
            ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

总结

通过本篇文章,我们详细地了解了RocketMQ创建Topic的各个方面,包括使用命令线、指定broker、创建过程、立刻使用、手动创建、创建命令等等,同时还介绍了创建Group和链接的方法。通过本文的学习,我们可以更加深入地理解RocketMQ的使用方法。

原创文章,作者:QBRH,如若转载,请注明出处:https://www.506064.com/n/136963.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
QBRHQBRH
上一篇 2024-10-04 00:16
下一篇 2024-10-04 00:17

相关推荐

  • KafkaTemplate配置:发送两个Topic

    本文将从以下几个方面详细阐述如何使用KafkaTemplate配置发送两个Topic。 一、KafkaTemplate简介 KafkaTemplate是Spring Kafka提供…

    编程 2025-04-27
  • 神经网络代码详解

    神经网络作为一种人工智能技术,被广泛应用于语音识别、图像识别、自然语言处理等领域。而神经网络的模型编写,离不开代码。本文将从多个方面详细阐述神经网络模型编写的代码技术。 一、神经网…

    编程 2025-04-25
  • Linux sync详解

    一、sync概述 sync是Linux中一个非常重要的命令,它可以将文件系统缓存中的内容,强制写入磁盘中。在执行sync之前,所有的文件系统更新将不会立即写入磁盘,而是先缓存在内存…

    编程 2025-04-25
  • git config user.name的详解

    一、为什么要使用git config user.name? git是一个非常流行的分布式版本控制系统,很多程序员都会用到它。在使用git commit提交代码时,需要记录commi…

    编程 2025-04-25
  • Python安装OS库详解

    一、OS简介 OS库是Python标准库的一部分,它提供了跨平台的操作系统功能,使得Python可以进行文件操作、进程管理、环境变量读取等系统级操作。 OS库中包含了大量的文件和目…

    编程 2025-04-25
  • 详解eclipse设置

    一、安装与基础设置 1、下载eclipse并进行安装。 2、打开eclipse,选择对应的工作空间路径。 File -> Switch Workspace -> [选择…

    编程 2025-04-25
  • Python输入输出详解

    一、文件读写 Python中文件的读写操作是必不可少的基本技能之一。读写文件分别使用open()函数中的’r’和’w’参数,读取文件…

    编程 2025-04-25
  • Java BigDecimal 精度详解

    一、基础概念 Java BigDecimal 是一个用于高精度计算的类。普通的 double 或 float 类型只能精确表示有限的数字,而对于需要高精度计算的场景,BigDeci…

    编程 2025-04-25
  • nginx与apache应用开发详解

    一、概述 nginx和apache都是常见的web服务器。nginx是一个高性能的反向代理web服务器,将负载均衡和缓存集成在了一起,可以动静分离。apache是一个可扩展的web…

    编程 2025-04-25
  • Linux修改文件名命令详解

    在Linux系统中,修改文件名是一个很常见的操作。Linux提供了多种方式来修改文件名,这篇文章将介绍Linux修改文件名的详细操作。 一、mv命令 mv命令是Linux下的常用命…

    编程 2025-04-25

发表回复

登录后才能评论