先强调一点,不要在事务内生成数据
之所以不要这样做,是因为事务如果回滚,我们就找不到这条数据了。特别是在分布式系统中,事务内插入数据后,调用其他服务,如果出现 RPC 异常,事务回滚导致当前服务的数据找不到,但是下游的数据可能生成成功了。也就是,当前服务无数据,下游服务有数据,这种数据的不一致,一般都不能接受,并且要花精力去修复。
正确的做法是,先生成数据,然后进入事务。
幂等类型的数据生成
这类数据本身有幂等标识,数据库本身会加必要的唯一索引。例如:
- 用户维度的账户余额,一个用户在 DB 中只会有一条余额记录。DB 中会在 用户标识 字段上加唯一索引。
- 上游传过来的一笔扣款请求,带有幂等标识。一笔请求在DB中只会存一条记录。DB 中会在 用户标识 + 幂等标识 上加唯一索引。
以初始化用户余额为例,用户余额表设计如下:
create table `user_balance` (
`id` bigint unsigned not null auto_increment comment '自增ID',
`user_id` varchar(32) not null comment '用户标识',
`balance` bigint not null default '0' comment '余额',
primary key (`id`),
unique key (`user_id`)
) engine = InnoDB character set = utf8mb4;
方案1: 无则插入
Java 伪代码:
// select * from user_balance where user_id = ?
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record == null) {
// insert into user_balance(user_id, balance) values( ? , 0);
UserBalanceRepo.init(usertId);
}
问题:
- 并发时,会出现唯一索引冲突异常;
- 出现唯一索引冲突时,会导致id出现跳跃。这个是可接受的。
方案2: 无则插入,吞掉异常后,再反查一次
Java 伪代码:
// select * from user_balance where user_id = ?
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record == null) {
try {
// insert into user_balance(user_id, balance) values( ? , 0);
UserBalanceRepo.init(usertId);
} catch (Exception ex) {
// 吞掉该异常。根据需要进行打日志等操作
// 出现该异常的原因:1、唯一索引冲突;2、数据库连接异常;3、相关代码、组件 bug 等
}
record = UserBalanceRepo.select(userId);
if (record == null) {
throw new RuntimeException("初始化失败");
}
}
方案3: 无则插入,但使用 insert ignore 插入
使用insert ignore
插入数据时,若主键冲突或者唯一索引冲突,则不再插入,同时不会报错。从这点上,和 redis 的 setnx 指令很像。
Java 伪代码:
// select * from user_balance where user_id = ?
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record == null) {
// insert ignore into user_balance(user_id, balance) values( ? , 0);
UserBalanceRepo.initIfNotDuplicate(usertId);
// 反查一次
record = UserBalanceRepo.select(userId);
if (record == null) {
throw new RuntimeException("初始化失败");
}
// 根据需要,增加校验关键字段的逻辑
}
个人认为这是最好的一个方案。
需要注意的是,反查到数据后,可能需要增加校验关键字段的逻辑。比如,在扣款场景中,根据用户标识 + 幂等标识
查询到数据后,需要将金额字段和请求中的金额字段做下比较,不一致时要报错,然后人工处理。
方案4: 使用分布式锁保证插入不会出现唯一索引冲突异常
比如,使用 redis 作为分布式锁。
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record != null) {
// 已经初始化,直接返回
return;
}
String lockKey = "初始化:" + userId;
// 锁 60 s
boolean isLockSuccess = RedisUtil.lock(lockKey, 60);
if (!isLockSuccess) {
throw new RuntimeException("加锁失败");
}
try {
// select * from user_balance where user_id = ?
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record == null) {
// insert into user_balance(user_id, balance) values( ? , 0);
UserBalanceRepo.init(usertId);
}
} finally {
RedisUtil.unlock(lockKey);
}
这个和设计模式单例模式
中的双检锁思路相同。
优点:
基本解决
了会出现唯一索引冲突异常的可能。- 减少了对 MySQL 的读写。
问题:
- 没有完全解决出现唯一索引冲突异常的可能。因为加锁成功后的操作,不能完全保证是在 60s 之内完成。极端情况下,可能因为 GC、网络抖动、组件Bug等原因,导致超过 60s ,此时相当于redis锁失效,还是可能出现唯一索引冲突异常。
- 引入 redis 后,系统可用性降低。redis 加锁、结果可能因为网络原因出现异常。比如解锁失败,会导致该账号加锁时间过长。
- 虽然没有了唯一索引冲突异常,但是多了redis加锁异常。使用该方案后,只是从一个异常变成另外一个异常而已。
- 业务逻辑对应的接口正常响应 QPS 变低。
方案5: 使用分布式重试锁保证插入不会出现唯一索引冲突异常
该方案比方案4多了重试
两个字。
重试锁,加锁会重试,直至加锁成功,或者到达一定重试次数为止。
伪代码:
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record != null) {
// 已经初始化,直接返回
return;
}
String lockKey = "初始化用户余额:" + userId;
// 重试锁,最多重试6次,重试之间间隔 6ms, 锁 60 s
boolean isLockSuccess = RedisUtil.lockWithRetry(lockKey, 6, 5, 60);
if (!isLockSuccess) {
throw new RuntimeException("加锁失败");
}
try {
// select * from user_balance where user_id = ?
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record == null) {
// insert into user_balance(user_id, balance) values( ? , 0);
UserBalanceRepo.init(usertId);
}
} finally {
RedisUtil.unlock(lockKey);
}
问题和方案4类似。 同时多了一些优点和缺点。
优点:
- 加锁失败的概率变低了。
缺点:
- 重试锁,真的发生重试时,会导致耗时增加。
方案6: 分布式重试锁 + 锁失败后再查一次
在方案5中,在非网络异常的情况下,锁失败时,已经比开始尝试加锁时过去了 30ms 左右,此时另外一个加锁成功的线程极有可能已经成功初始化数据,所以可以再查一次。
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record != null) {
// 已经初始化,直接返回
return;
}
String lockKey = "初始化:" + userId;
// 重试锁,最多重试6次,重试之间间隔 6ms, 锁 60 s
boolean isLockSuccess = RedisUtil.lockWithRetry(lockKey, 6, 5, 60);
if (!isLockSuccess) {
// 锁失败时,再查一次,大概率能查到
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record != null) {
// 已经初始化,直接返回
return;
}
throw new RuntimeException("加锁失败");
}
try {
// select * from user_balance where user_id = ?
UserBalanceDO record = UserBalanceRepo.select(userId);
if (record == null) {
// insert into user_balance(user_id, balance) values( ? , 0);
UserBalanceRepo.init(usertId);
}
} finally {
RedisUtil.unlock(lockKey);
}
实际业务中,我倾向选择方案2和方案3。
非幂等类型的数据生成
非幂等型数据一般来自与用户交互的地方。比如博客系统中,用户写一篇博客,连续点了两次【保存】按钮,博客系统可能会保存两篇一模一样的文章。
如何保证只出现一篇文章呢?
方案1: 限频
比如博客系统中,限制用户5秒内只能提交一次。
技术上,用 redis 锁可以实现。
方案2: 预生成单号
在业务系统之前加一层预生成单据层。
比如博客系统中,预生成单据表和文章表可以这样设计:
-- 预生成单据表
create table `article_prepare_record` (
`id` bigint unsigned not null auto_increment comment '主键ID',
`user_id` varchar(64) not null default '' comment '用户ID',
`pre_id` varchar(64) not null default '' comment '预生成单号',
`biz_id` varchar(64) not null default '' comment '下层业务ID,即文章的全局唯一ID',
`created_at` int NOT NULL DEFAULT unix_timestamp() COMMENT '创建时间',
`updated_at` bigint NOT NULL DEFAULT unix_timestamp() ON UPDATE unix_timestamp() COMMENT '修改时间',
primary key (`id`),
unique key idx_user_id(`user_id`),
unique key idx_pre_id(`pre_id`),
unique key idx_biz_Id(`biz_id`)
) engine = InnoDB character set = utf8mb4;
-- 文章表
create table `article_record` (
`id` bigint unsigned not null auto_increment comment '主键ID',
`user_id` varchar(64) not null default '' comment '用户ID',
`biz_id` varchar(64) not null default '' comment '业务ID,文章的全局唯一ID',
`title` varchar(2048) not null default '' comment '文章标题',
`content` text not null default '' comment '文章内容',
`created_at` int NOT NULL DEFAULT unix_timestamp() COMMENT '创建时间',
`updated_at` bigint NOT NULL DEFAULT unix_timestamp() ON UPDATE unix_timestamp() COMMENT '修改时间',
primary key (`id`),
unique key idx_biz_Id(`biz_id`)
) engine = InnoDB character set = utf8mb4;
用户点击【新建博客】按钮时,后台在 article_prepare_record
表中生成一条记录,其中user_id
是用户ID,基于用户登录态获取,pre_id
是随机生成或者基于ID生成器生成的全局唯一ID,biz_id 为空。
pre_id 会传给前端。
用户写完博客后,点击【保存】按钮,前端将pre_id
、title
、content
一起传到后台。 后台先校验 article_prepare_record
中是否有 user_id
、 pre_id
对应的记录:
- 无则报错。
- 有,但是
biz_id
不为空,报错。 - 有,且
biz_id
为空,则在article_record
插入文章记录,其中biz_id
用ID生成器生成。然后将biz_id
写入article_prepare_record
表中的biz_id
字段。
注意,这里要用到数据库事务,并在事务内锁住 article_prepare_record
中的记录,保证数据正确。
article_prepare_record
表中的旧数据可以定期清理掉。比如用户的一次操作流程不应该超过 12 个小时,那么可以将 12小时之前生成的数据清理掉,不影响业务,也节省磁盘。
熟悉网络安全的同学会发现,这个方案和 CSRF 攻击的一个防御手段是类似的。
这个方案可以用于很多类似的场景中,比如支付系统中,用户下单时,生成一个预下单号,用户通过预下单号进入支付页面进行支付,预下单号传到后台后会过渡到真正的订单号。