在 Spring Boot 项目中使用 Canal,主要是为了监听 MySQL 的 binlog 日志,实现数据变更的实时同步或触发业务逻辑(如更新缓存、记录日志、数据同步等)。Canal 是阿里巴巴开源的 MySQL binlog 增量订阅 & 消费组件。
下面是 Spring Boot 集成 Canal 的基本步骤:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
如果是使用docker的话,修改完配置文件后需要运行docker restart来重启容器,重新加载配置。
重启后,通过数据库工具连接到数据,执行一下SQL语句,确保binlog选项已经开启了
SHOW VARIABLES LIKE 'log_bin';

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
你可以选择:
docker run -d --name canal-server \
-e canal.instance.master.address=your-mysql-host:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-p 11111:11111 \
canal/canal-server
Canal Server 默认监听 11111 端口,客户端通过 TCP 或 Kafka/ RocketMQ 消费 binlog。


<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
</dependency>
@Component
public class CanalClient {
@PostConstruct
public void run() {
// 创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe("your_database\\.your_table"); // 正则匹配
connector.rollback(); // 回滚到未确认位置
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
if (emptyCount > 10) {
Thread.sleep(1000);
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 确认消费
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChg;
try {
rowChg = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("解析 binlog 出错", e);
}
CanalEntry.EventType eventType = rowChg.getEventType();
String tableName = entry.getHeader().getTableName();
for (CanalEntry.RowData rowData : rowChg.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
// 处理删除
handleDelete(tableName, rowData);
} else if (eventType == CanalEntry.EventType.INSERT) {
// 处理插入
handleInsert(tableName, rowData);
} else if (eventType == CanalEntry.EventType.UPDATE) {
// 处理更新
handleUpdate(tableName, rowData);
}
}
}
}
private void handleInsert(String tableName, CanalEntry.RowData rowData) {
System.out.println("INSERT on table: " + tableName);
rowData.getAfterColumnsList().forEach(col ->
System.out.println(col.getName() + " = " + col.getValue()));
}
private void handleUpdate(String tableName, CanalEntry.RowData rowData) {
System.out.println("UPDATE on table: " + tableName);
System.out.println("Before: ");
rowData.getBeforeColumnsList().forEach(col ->
System.out.println(col.getName() + " = " + col.getValue()));
System.out.println("After: ");
rowData.getAfterColumnsList().forEach(col ->
System.out.println(col.getName() + " = " + col.getValue()));
}
private void handleDelete(String tableName, CanalEntry.RowData rowData) {
System.out.println("DELETE on table: " + tableName);
rowData.getBeforeColumnsList().forEach(col ->
System.out.println(col.getName() + " = " + col.getValue()));
}
}
注意:
@PostConstruct启动后会阻塞主线程,生产环境建议使用单独线程或异步任务。
Canal Server 可配置将 binlog 投递到 Kafka,Spring Boot 项目作为 Kafka 消费者监听即可,解耦更清晰。
canal.properties 中启用 Kafka: canal.serverMode = kafka
canal.mq.servers = localhost:9092
instance.properties 中指定 topic: canal.mq.topic=your-binlog-topic
@KafkaListener 消费消息(需引入 Spring Kafka)。如需我提供一个完整的 Spring Boot + Canal 示例项目结构,也可以告诉我!