前段时间做分布式任务调度系统,需要跨服务保证任务的唯一性执行。试过 Redis 分布式锁,但集群环境下偶尔会出现锁漂移的问题。后来想到 OceanBase 本身是分布式数据库,事务支持完善,索性基于它实现了一套分布式锁,没想到稳定性远超预期。今天就聊聊这个过程中的实操经验。
从表结构开始的设计
最开始参考了 MySQL 的悲观锁思路,但 OceanBase 的分布式事务特性让我换了个角度。建了张最简单的锁表,就三个字段:
CREATE TABLE distributed_lock (
lock_key VARCHAR(64) PRIMARY KEY,
lock_value VARCHAR(64) NOT NULL,
expire_time TIMESTAMP NOT NULL
);
这里有个关键点,lock_value 用 UUID 生成,释放锁时要校验这个值,避免误释放别人持有的锁。expire_time 是防死锁用的,就算服务挂了,过期后锁也能自动释放。
刚开始没加索引,结果高并发下 insert 经常超时。后来给 lock_key 加了主键索引(上面 SQL 已经包含),性能立刻上来了。OceanBase 的主键索引是聚簇索引,查询速度比普通索引快不少。
加锁解锁的核心逻辑
加锁其实就是 insert 一条记录,用唯一键冲突来保证互斥:
public boolean tryLock(String key, String value, long expireSeconds) {
String sql = "INSERT INTO distributed_lock VALUES(?, ?, ADDDATE(NOW(), INTERVAL ? SECOND))";
try {
jdbcTemplate.update(sql, key, value, expireSeconds);
return true;
} catch (DuplicateKeyException e) {
// 键冲突说明已被锁定
return false;
}
}
但光这样还不够,得处理过期锁。所以加锁前先清一波过期的:
// 加锁前先删除过期锁
String cleanSql = "DELETE FROM distributed_lock
WHERE lock_key = ? AND expire_time < NOW()";
jdbcTemplate.update(cleanSql, key);
解锁必须校验 value,这步很关键:
public boolean unlock(String key, String value) {
String sql = "DELETE FROM distributed_lock
WHERE lock_key = ? AND lock_value = ?";
int rows = jdbcTemplate.update(sql, key, value);
return rows > 0;
}
这里踩过一个坑:刚开始用了SELECT FOR UPDATE做悲观锁,结果在 OceanBase 的读写分离架构下,偶尔会出现锁不住的情况。后来换成乐观锁思路(靠唯一键冲突),稳定性立刻提升了。
处理分布式事务的细节
有次线上出现锁超时但任务还在执行的情况,排查发现是事务没提交导致的。OceanBase 的事务默认是 READ COMMITTED 级别,加锁操作必须放在独立事务里:
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean tryLock(...) {
// 加锁逻辑
}
另外,分布式环境下时钟可能不一致,expire_time 最好用数据库的时间,别传客户端时间。之前因为服务器时区没同步,出现过锁提前过期的问题,改成ADDDATE(NOW(), …)就好了。
性能优化的小技巧
高并发场景下,单纯的 insert 可能导致热点竞争。可以用 OceanBase 的分区表特性,按 lock_key 哈希分区:
CREATE TABLE distributed_lock (
lock_key VARCHAR(64) PRIMARY KEY,
lock_value VARCHAR(64) NOT NULL,
expire_time TIMESTAMP NOT NULL
) PARTITION BY HASH(lock_key) PARTITIONS 8;
这样不同的 lock_key 会落到不同分区,减少锁竞争。实测下来,分区后 QPS 能提升 3 倍多。
还有个小发现,OceanBase 的INSERT … ON DUPLICATE KEY UPDATE语句性能很好,可以用它实现锁的续约:
// 续约锁的SQL
String renewSql = "INSERT INTO distributed_lock VALUES(?, ?, ADDDATE(NOW(), INTERVAL ? SECOND)) " +
"ON DUPLICATE KEY UPDATE expire_time = ADDDATE(NOW(), INTERVAL ? SECOND)";
踩过的坑总结