CanalRocketMQ:實現Canal與RocketMQ的無縫銜接

一、CanalRocketMQ介紹

CanalRocketMQ是一個用於將Canal的數據變化訂閱轉化成RocketMQ消息的工具。Canal是阿里巴巴開源的基於資料庫增量日誌解析,提供增量數據訂閱和消費的組件。而RocketMQ是阿里巴巴開源的分散式消息系統。CanalRocketMQ能夠將Canal解析出來的數據變化,轉換成RocketMQ消息,進而通過RocketMQ實現數據同步,為數據處理提供可靠的基礎設施。

二、CanalRocketMQ的配置與使用

CanalRocketMQ的配置與使用相對簡單。使用時,需要在Canal和RocketMQ的基礎上,增加CanalRocketMQ這一組件。

1.引入CanalRocketMQ

在項目中,需要引入CanalRocketMQ的依賴:

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal-rocketmq</artifactId>
    <version>1.1.4</version>
</dependency>

2.配置Canal,並開啟Canal的RocketMQ適配器

Canal的配置需要在canal.properties中進行配置。需要將Canal的模式設置成RocketMQ,並配置RocketMQ的相關信息。

# 定義canal server模式為 RocketMQ
canal.serverMode = rocketmq
 
# 配置rocketmq的相關信息
canal.rocketmq.nameServer = 127.0.0.1:9876
canal.rocketmq.producerGroup = canal-producer-group

同時,還需要在Canal的啟動腳本中,添加RocketMQ的適配器。啟動腳本如下:

bin/startup.sh --canal.instance.rmq.topic=example \
--canal.instance.rmq.group=test \
--canal.instance.rmq.nameServer=127.0.0.1:9876 \
--canal.instance.filter.regex=.*\\..* \
--canal.adapter.rocketmq \
--canal.adapter.rocketmq.namesrvAddr=127.0.0.1:9876 \
--canal.adapter.rocketmq.producerGroup=canal-producer-group

3.配置RocketMQ

在RocketMQ的配置文件rocketmq.proerties中,需要進行如下配置:

messageTraceTopic=rmq_sys_TRACE_DATA_777
brokerClusterName=DefaultCluster
autoCreateTopicEnable=true

4.代碼示例

下面是一個簡單的代碼示例,說明如何使用CanalRocketMQ:

public class CanalRocketMQTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(CanalRocketMQTest.class);

    @Test
    public void test() {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost",
                11111), "example", "root", "123456");
        connector.connect();
        connector.subscribe(".*\\..*");

        MessageProducer messageProducer = RocketMQMessageProducer.getInstance();
        messageProducer.start();

        while (true) {
            Message message = connector.getWithoutAck(100);
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId != -1 && size > 0) {
                for (CanalEntry.Entry entry : message.getEntries()) {
                    if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                        CanalEntry.RowChange rowChange = null;
                        try {
                            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        } catch (InvalidProtocolBufferException e) {
                            LOGGER.error("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                    e);
                            continue;
                        }

                        String tableName = entry.getHeader().getTableName();

                        if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                Map beforeMap = buildMap(rowData.getBeforeColumnsList());
                                MessageData messageData = new MessageData(tableName, beforeMap);
                                messageProducer.send(messageData);
                            }
                        } else if (rowChange.getEventType() == CanalEntry.EventType.INSERT
                                || rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                                Map afterMap = buildMap(rowData.getAfterColumnsList());
                                MessageData messageData = new MessageData(tableName, afterMap);
                                messageProducer.send(messageData);
                            }
                        }
                    }
                }

                connector.ack(batchId);
            } else {
                LOGGER.info("No message received!");
            }
        }
    }

    private Map buildMap(List columns) {
        Map map = new HashMap();
        for (CanalEntry.Column column : columns) {
            map.put(column.getName(), column.getValue());
        }
        return map;
    }

}

三、CanalRocketMQ的優點

從使用CanalRocketMQ的角度來看,它具有以下幾點優點:

1.無縫銜接

使用CanalRocketMQ可以實現Canal和RocketMQ的無縫銜接,從而可以實現數據同步。這種無縫銜接代表了不同組件之間的良好協作,讓使用方更加便捷地接入數據同步系統。

2.多樣性

CanalRocketMQ既能夠使用Canal解析出來的數據,又能夠通過RocketMQ發送消息,具有多種用途。例如,可以實現數據的備份,也可以實現多個系統的數據同步。

3.高性能

CanalRocketMQ在消費Canal的數據變化時,使用了多線程的方式進行處理,從而提高了消費的效率和速度。

四、總結

CanalRocketMQ作為Canal和RocketMQ的無縫銜接工具,具有多樣性、高性能等優點。使用過程相對簡單,配置也相對清晰明了,能夠為數據同步提供可靠的基礎設施。

原創文章,作者:HRCT,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/135878.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
HRCT的頭像HRCT
上一篇 2024-10-04 00:15
下一篇 2024-10-04 00:15

相關推薦

  • RocketMQ消息堆積解決方案

    一、RocketMQ消息堆積小標題 RocketMQ消息堆積是指消息在消費者沒有正常消費的情況下,持續積累的現象,導致消息隊列越來越多,積累量越來越大。消息堆積的原因可能是由於消息…

    編程 2025-04-24
  • 深入了解RocketMQ事務消息

    一、什麼是RocketMQ事務消息 RocketMQ事務消息是指在消息發送方發送消息時,延遲將消息狀態提交給broker,由broker進行二次確認,以確保消息不會因發送失敗而丟失…

    編程 2025-04-24
  • Canal RocketMQ詳解

    一、Canal的介紹 Canal是阿里巴巴開源的基於資料庫增量日誌解析,提供增量數據訂閱和消費的組件。Canal主要用來解決資料庫異構之間的數據複製問題,通過增量的方式將數據同步到…

    編程 2025-04-22
  • RocketMQ控制台使用詳解

    一、安裝與啟動控制台 1、安裝步驟:首先需要從Apache RocketMQ的官網下載RocketMQ,下載鏈接:http://rocketmq.apache.org/releas…

    編程 2025-04-12
  • RocketMQ Windows安裝

    一、安裝前準備工作 在安裝RocketMQ之前,需要進行以下準備工作: 1. 安裝JDK 首先需要到Oracle官網下載JDK安裝文件,選擇適合自己操作系統的文件進行下載並安裝。安…

    編程 2025-04-12
  • RocketMQ如何實現消息延遲發送功能

    一、RocketMQ消息的基本特性 RocketMQ是一種廣泛使用的分散式消息中間件,它支持低延遲、高吞吐量的分散式消息傳遞。在消息傳遞方面,它有以下的基本特性: 可靠的消息傳遞:…

    編程 2025-02-25
  • RocketMQ Windows 開發指南

    一、RocketMQ 簡介 RocketMQ是阿里巴巴團隊開發的一款開源的分散式消息中間件。它具有低延遲、高吞吐量、高可靠性等特點,廣泛應用於電商、金融、物流等領域中。 Rocke…

    編程 2025-02-17
  • RocketMQ延時隊列詳解

    一、延時隊列介紹 延時隊列,在分散式系統中經常被使用,可以很好的解決延遲任務問題。RocketMQ中提供了延時隊列的功能。 二、RocketMQ延時隊列實現 RocketMQ是通過…

    編程 2025-02-15
  • RocketMQ源碼解析

    一、簡介 RocketMQ是一個分散式消息傳遞解決方案,具有高性能、高可靠、高可擴展性和分散式特性。本篇文章將從多個方面對RocketMQ的源碼進行解析,幫助更好的理解Rocket…

    編程 2025-02-11
  • Docker安裝RocketMQ

    一、docker安裝rocketmq複雜嗎 在說複雜不複雜之前,我們需要明確一點:Docker是一個開源的應用容器引擎,可以讓開發者打包他們的應用程序,以及相關依賴包到一個輕量級、…

    編程 2025-02-05

發表回復

登錄後才能評論