CanalRocketMQ:实现Canal与RocketMQ的无缝衔接

一、CanalRocketMQ介绍

CanalRocketMQ是一个用于将Canal的数据变化订阅转化成RocketMQ消息的工具。Canal是阿里巴巴开源的基于数据库增量日志解析,提供增量数据订阅和消费的组件。而RocketMQ是阿里巴巴开源的分布式消息系统。CanalRocketMQ能够将Canal解析出来的数据变化,转换成RocketMQ消息,进而通过RocketMQ实现数据同步,为数据处理提供可靠的基础设施。

二、CanalRocketMQ的配置与使用

CanalRocketMQ的配置与使用相对简单。使用时,需要在Canal和RocketMQ的基础上,增加CanalRocketMQ这一组件。

1.引入CanalRocketMQ

在项目中,需要引入CanalRocketMQ的依赖:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal-rocketmq</artifactId>
    <version>1.1.4</version>
</dependency>

2.配置Canal,并开启Canal的RocketMQ适配器

Canal的配置需要在canal.properties中进行配置。需要将Canal的模式设置成RocketMQ,并配置RocketMQ的相关信息。

# 定义canal server模式为 RocketMQ
canal.serverMode = rocketmq
 
# 配置rocketmq的相关信息
canal.rocketmq.nameServer = 127.0.0.1:9876
canal.rocketmq.producerGroup = canal-producer-group

同时,还需要在Canal的启动脚本中,添加RocketMQ的适配器。启动脚本如下:

bin/startup.sh --canal.instance.rmq.topic=example \
--canal.instance.rmq.group=test \
--canal.instance.rmq.nameServer=127.0.0.1:9876 \
--canal.instance.filter.regex=.*\\..* \
--canal.adapter.rocketmq \
--canal.adapter.rocketmq.namesrvAddr=127.0.0.1:9876 \
--canal.adapter.rocketmq.producerGroup=canal-producer-group

3.配置RocketMQ

在RocketMQ的配置文件rocketmq.proerties中,需要进行如下配置:

messageTraceTopic=rmq_sys_TRACE_DATA_777
brokerClusterName=DefaultCluster
autoCreateTopicEnable=true

4.代码示例

下面是一个简单的代码示例,说明如何使用CanalRocketMQ:

public class CanalRocketMQTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(CanalRocketMQTest.class);

    @Test
    public void test() {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
                11111), "example", "root", "123456");
        connector.connect();
        connector.subscribe(".*\\..*");

        MessageProducer messageProducer = RocketMQMessageProducer.getInstance();
        messageProducer.start();

        while (true) {
            Message message = connector.getWithoutAck(100);
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId != -1 && size > 0) {
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        CanalEntry.RowChange rowChange = null;
                        try {
                            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        } catch (InvalidProtocolBufferException e) {
                            LOGGER.error("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                    e);
                            continue;
                        }

                        String tableName = entry.getHeader().getTableName();

                        if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                Map beforeMap = buildMap(rowData.getBeforeColumnsList());
                                MessageData messageData = new MessageData(tableName, beforeMap);
                                messageProducer.send(messageData);
                            }
                        } else if (rowChange.getEventType() == CanalEntry.EventType.INSERT
                                || rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                Map afterMap = buildMap(rowData.getAfterColumnsList());
                                MessageData messageData = new MessageData(tableName, afterMap);
                                messageProducer.send(messageData);
                            }
                        }
                    }
                }

                connector.ack(batchId);
            } else {
                LOGGER.info("No message received!");
            }
        }
    }

    private Map buildMap(List columns) {
        Map map = new HashMap();
        for (CanalEntry.Column column : columns) {
            map.put(column.getName(), column.getValue());
        }
        return map;
    }

}

三、CanalRocketMQ的优点

从使用CanalRocketMQ的角度来看,它具有以下几点优点:

1.无缝衔接

使用CanalRocketMQ可以实现Canal和RocketMQ的无缝衔接,从而可以实现数据同步。这种无缝衔接代表了不同组件之间的良好协作,让使用方更加便捷地接入数据同步系统。

2.多样性

CanalRocketMQ既能够使用Canal解析出来的数据,又能够通过RocketMQ发送消息,具有多种用途。例如,可以实现数据的备份,也可以实现多个系统的数据同步。

3.高性能

CanalRocketMQ在消费Canal的数据变化时,使用了多线程的方式进行处理,从而提高了消费的效率和速度。

四、总结

CanalRocketMQ作为Canal和RocketMQ的无缝衔接工具,具有多样性、高性能等优点。使用过程相对简单,配置也相对清晰明了,能够为数据同步提供可靠的基础设施。

原创文章,作者:HRCT,如若转载,请注明出处:https://www.506064.com/n/135878.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
HRCTHRCT
上一篇 2024-10-04 00:15
下一篇 2024-10-04 00:15

相关推荐

  • RocketMQ消息堆积解决方案

    一、RocketMQ消息堆积小标题 RocketMQ消息堆积是指消息在消费者没有正常消费的情况下,持续积累的现象,导致消息队列越来越多,积累量越来越大。消息堆积的原因可能是由于消息…

    编程 2025-04-24
  • 深入了解RocketMQ事务消息

    一、什么是RocketMQ事务消息 RocketMQ事务消息是指在消息发送方发送消息时,延迟将消息状态提交给broker,由broker进行二次确认,以确保消息不会因发送失败而丢失…

    编程 2025-04-24
  • Canal RocketMQ详解

    一、Canal的介绍 Canal是阿里巴巴开源的基于数据库增量日志解析,提供增量数据订阅和消费的组件。Canal主要用来解决数据库异构之间的数据复制问题,通过增量的方式将数据同步到…

    编程 2025-04-22
  • RocketMQ控制台使用详解

    一、安装与启动控制台 1、安装步骤:首先需要从Apache RocketMQ的官网下载RocketMQ,下载链接:http://rocketmq.apache.org/releas…

    编程 2025-04-12
  • RocketMQ Windows安装

    一、安装前准备工作 在安装RocketMQ之前,需要进行以下准备工作: 1. 安装JDK 首先需要到Oracle官网下载JDK安装文件,选择适合自己操作系统的文件进行下载并安装。安…

    编程 2025-04-12
  • RocketMQ如何实现消息延迟发送功能

    一、RocketMQ消息的基本特性 RocketMQ是一种广泛使用的分布式消息中间件,它支持低延迟、高吞吐量的分布式消息传递。在消息传递方面,它有以下的基本特性: 可靠的消息传递:…

    编程 2025-02-25
  • RocketMQ Windows 开发指南

    一、RocketMQ 简介 RocketMQ是阿里巴巴团队开发的一款开源的分布式消息中间件。它具有低延迟、高吞吐量、高可靠性等特点,广泛应用于电商、金融、物流等领域中。 Rocke…

    编程 2025-02-17
  • RocketMQ延时队列详解

    一、延时队列介绍 延时队列,在分布式系统中经常被使用,可以很好的解决延迟任务问题。RocketMQ中提供了延时队列的功能。 二、RocketMQ延时队列实现 RocketMQ是通过…

    编程 2025-02-15
  • RocketMQ源码解析

    一、简介 RocketMQ是一个分布式消息传递解决方案,具有高性能、高可靠、高可扩展性和分布式特性。本篇文章将从多个方面对RocketMQ的源码进行解析,帮助更好的理解Rocket…

    编程 2025-02-11
  • Docker安装RocketMQ

    一、docker安装rocketmq复杂吗 在说复杂不复杂之前,我们需要明确一点:Docker是一个开源的应用容器引擎,可以让开发者打包他们的应用程序,以及相关依赖包到一个轻量级、…

    编程 2025-02-05

发表回复

登录后才能评论