第十三节 SpringBoot集成Canal同步MySQL数据

亮子 2025-11-06 09:09:16 12 0 0 0

在 Spring Boot 项目中使用 Canal,主要是为了监听 MySQL 的 binlog 日志,实现数据变更的实时同步或触发业务逻辑(如更新缓存、记录日志、数据同步等)。Canal 是阿里巴巴开源的 MySQL binlog 增量订阅 & 消费组件。

下面是 Spring Boot 集成 Canal 的基本步骤:


一、前提条件

  1. MySQL 开启 binlog(必须为 ROW 模式):
   [mysqld]
   log-bin=mysql-bin
   binlog-format=ROW
   server-id=1

如果是使用docker的话,修改完配置文件后需要运行docker restart来重启容器,重新加载配置。

重启后,通过数据库工具连接到数据,执行一下SQL语句,确保binlog选项已经开启了

SHOW VARIABLES LIKE 'log_bin';

image.png

  1. 创建 Canal 所需的 MySQL 用户(具有 REPLICATION 权限):
   CREATE USER canal IDENTIFIED BY 'canal';
   GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
   FLUSH PRIVILEGES;

二、部署 Canal Server

你可以选择:

  • 使用 Docker 快速启动:
  docker run -d --name canal-server \
    -e canal.instance.master.address=your-mysql-host:3306 \
    -e canal.instance.dbUsername=canal \
    -e canal.instance.dbPassword=canal \
    -e canal.instance.connectionCharset=UTF-8 \
    -p 11111:11111 \
    canal/canal-server
  • 或者下载 Canal Server 手动部署(参考官方文档:https://github.com/alibaba/canal)

Canal Server 默认监听 11111 端口,客户端通过 TCP 或 Kafka/ RocketMQ 消费 binlog。

image.png

image.png


三、Spring Boot 项目中集成 Canal Client

方式一:使用 TCP 直连 Canal Server(简单场景)

  1. 添加依赖(Maven):
   <dependency>
       <groupId>com.alibaba.otter</groupId>
       <artifactId>canal.client</artifactId>
       <version>1.1.7</version>
   </dependency>
  1. 编写 Canal 客户端监听类
@Component
public class CanalClient {

    @PostConstruct
    public void run() {
        // 创建连接
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("localhost", 11111), "example", "", "");

        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe("your_database\\.your_table"); // 正则匹配
            connector.rollback(); // 回滚到未确认位置

            while (true) {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();

                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    if (emptyCount > 10) {
                        Thread.sleep(1000);
                    }
                } else {
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 确认消费
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    private void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChg;
            try {
                rowChg = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException("解析 binlog 出错", e);
            }

            CanalEntry.EventType eventType = rowChg.getEventType();
            String tableName = entry.getHeader().getTableName();

            for (CanalEntry.RowData rowData : rowChg.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    // 处理删除
                    handleDelete(tableName, rowData);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    // 处理插入
                    handleInsert(tableName, rowData);
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    // 处理更新
                    handleUpdate(tableName, rowData);
                }
            }
        }
    }

    private void handleInsert(String tableName, CanalEntry.RowData rowData) {
        System.out.println("INSERT on table: " + tableName);
        rowData.getAfterColumnsList().forEach(col -> 
            System.out.println(col.getName() + " = " + col.getValue()));
    }

    private void handleUpdate(String tableName, CanalEntry.RowData rowData) {
        System.out.println("UPDATE on table: " + tableName);
        System.out.println("Before: ");
        rowData.getBeforeColumnsList().forEach(col -> 
            System.out.println(col.getName() + " = " + col.getValue()));
        System.out.println("After: ");
        rowData.getAfterColumnsList().forEach(col -> 
            System.out.println(col.getName() + " = " + col.getValue()));
    }

    private void handleDelete(String tableName, CanalEntry.RowData rowData) {
        System.out.println("DELETE on table: " + tableName);
        rowData.getBeforeColumnsList().forEach(col -> 
            System.out.println(col.getName() + " = " + col.getValue()));
    }
}

注意:@PostConstruct 启动后会阻塞主线程,生产环境建议使用单独线程或异步任务。


方式二:通过 Kafka/RocketMQ 消费(推荐用于生产)

Canal Server 可配置将 binlog 投递到 Kafka,Spring Boot 项目作为 Kafka 消费者监听即可,解耦更清晰。

  1. canal.properties 中启用 Kafka:
   canal.serverMode = kafka
   canal.mq.servers = localhost:9092
  1. instance.properties 中指定 topic:
   canal.mq.topic=your-binlog-topic
  1. Spring Boot 中使用 @KafkaListener 消费消息(需引入 Spring Kafka)。

四、注意事项

  • 幂等性处理:binlog 可能重复消费,业务逻辑需幂等。
  • 异常处理:网络中断、反序列化失败等需重试或告警。
  • 性能:高并发场景建议使用消息队列模式(Kafka/RocketMQ)。
  • 字段类型:binlog 中的值均为字符串,需自行转换类型。

五、参考资源

  • Canal GitHub:https://github.com/alibaba/canal
  • Canal 官方文档:https://github.com/alibaba/canal/wiki

如需我提供一个完整的 Spring Boot + Canal 示例项目结构,也可以告诉我!