1.什么是幂等性
幂等性原本是数学上的概念,即使公式:f(x)=f(f(x)) 能够成立的数学性质。用在编程领域,则意为对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
接口幂等性:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外),即第一次请求的时候对资源产生了副作用,但是以后的多次请求都不会再对资源产生副作用。简单来说,幂等就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的
2.幂等性的使用场景
- 前端重复提交表单: 在填写一些表格时候,用户填写完成提交,很多时候会因网络波动没有及时对用户做出提交成功响应,致使用户认为没有成功提交,然后一直点提交按钮,这时就会发生重复提交表单请求。
- 用户恶意进行刷单: 例如在实现用户投票这种功能时,如果用户针对一个用户进行重复提交投票,这样会导致接口接收到用户重复提交的投票信息,这样会使投票结果与事实严重不符。
- 接口超时重复提交: 很多时候 HTTP 客户端工具都默认开启超时重试的机制,尤其是第三方调用接口时候,为了防止网络波动超时等造成的请求失败,都会添加重试机制,导致一个请求提交多次。
- 消息进行重复消费: 当使用 MQ 消息中间件时候,如果发生消息中间件出现错误未及时提交消费信息,导致发生重复消费。
使用幂等性最大的优势在于使接口保证任何幂等性操作,免去因重试等造成系统产生的未知的问题。
项目推荐:基于SpringBoot2.x、SpringCloud和SpringCloudAlibaba企业级系统架构底层框架封装,解决业务开发时常见的非功能性需求,防止重复造轮子,方便业务快速开发和企业技术栈框架统一管理。引入组件化的思想实现高内聚低耦合并且高度可配置化,做到可插拔。严格控制包依赖和统一版本管理,做到最少化依赖。注重代码规范和注释,非常适合个人学习和企业使用
Github地址:https://github.com/plasticene/plasticene-boot-starter-parent
Gitee地址:https://gitee.com/plasticene3/plasticene-boot-starter-parent
微信公众号:Shepherd进阶笔记
交流探讨群:Shepherd_126
3.幂等性的实现方案
个人觉得实现接口幂等性的解决方法大致有三种:1)基于数据库层面实现 2)基于中间件token令牌机制 3)代码层面实现状态机
1)基于数据库层面的实现
建立唯一索引
绝大数情况下,为了防止重复数据的产生,我们都会在表中加唯一索引,这是一个非常简单,并且有效的方案。比如说我们在订单系统的订单表表里面的订单号order_no是唯一的,那么就可以给订单号创建唯一索引,sql如下所示:
ALTER TABLE `mall`.`order`
ADD UNIQUE INDEX `uniq_order_no`(`order_no`) USING BTREE;
添加索引之后,我们重复提交订单由于订单号唯一,所以第二次开始的提交就会报错了,,如果是java
程序需要捕获:DuplicateKeyException
异常,如果使用了spring
框架还需要捕获:MySQLIntegrityConstraintViolationException
异常。
悲观锁
获取数据的时候加锁获取。select * from table_xxx where id='xxx' for update 悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,性能不行,需根据实际情况选用,慎重使用
- 多个请求同时根据id查询用户信息。
- 判断余额是否不足100,如果余额不足,则直接返回余额不足。
- 如果余额充足,则通过for update再次查询用户信息,并且尝试获取锁。
- 只有第一个请求能获取到行锁,其余没有获取锁的请求,则等待下一次获取锁的机会。
- 第一个请求获取到锁之后,判断余额是否不足100,如果余额足够,则进行update操作。
- 如果余额不足,说明是重复请求,则直接返回成功。
需要特别注意的是:如果使用的是mysql数据库,存储引擎必须用innodb,因为它才支持事务。此外,这里id字段一定要是主键或者唯一索引,不然会锁住整张表。
乐观锁
乐观锁其实就是给表里面添加一个版本号字段version,每次操作version都加1,乐观锁是天然实现幂等性,但是乐观锁不易在操作频繁的表添加,因为会产生大量冲突。乐观锁的使用如下:
update amount = amount + 100, version = version + 1 where user_id=1 and version = 1
例如之前我在工作开发过得一个撤销重做功能:在开发bi工具每一个操作都是实时保存,为了支持撤销重做,我在相应设计的业务点主表里面设置了version版本字段,然后每次操作接口都需要带上版本号字段,部分接口代码示例如下:
@ApiOperation("增加图册元素")
@PostMapping("/{atlasId}/{version}/atlasElement")
public String addAtlasElement(@RequestBody @Validated AtlasElementVO atlasElementVO, @PathVariable("version") Long version, @PathVariable("atlasId") Long atlasId) {
UserForm currentUser = RequestUserHolder.getCurrentUser();
atlasElementVO.setCreator(currentUser.getUserId());
atlasElementVO.setCreatorName(currentUser.getUsername());
atlasElementVO.setAtlasId(atlasId);
atlasElementVO.setIsRepair(AtlasConstant.NOT_REPAIR);
JSONObject rst = new JSONObject();
Long id = atlasService.addAtlasElement(IasBeanUtils.copy(atlasElementVO, AtlasElementDTO.class), version);
rst.put("atlasElement", atlasService.getAtlasElement(id));
rst.put("version", version + 1);
return rst.toJSONString();
}
@ApiOperation("删除图册元素")
@DeleteMapping("/{atlasId}/{version}/atlasElement/{atlasElementId}")
public String delAtlasElement(@PathVariable("version") Long version, @PathVariable("atlasElementId") Long atlasElementId, @PathVariable("atlasId") Long atlasId) {
UserForm currentUser = RequestUserHolder.getCurrentUser();
if (!atlasService.delAtlasElement(atlasElementId, version, atlasId, currentUser.getUserId())) {
throw new BusinessException(ErrorCodeEnum.DELETE_ERROR.getCode(), ErrorCodeEnum.DELETE_ERROR.getMessage());
}
JSONObject rst = new JSONObject();
rst.put("version", version + 1);
return rst.toJSONString();
}
@ApiOperation("增加联动")
@PostMapping("/{atlasId}/{version}/atlasElementLink")
public AtlasVO addAtlasElementLink(@PathVariable("version") Long version, @PathVariable("atlasId") Long atlasId, @RequestBody @Validated AtlasElementLinkVO atlasElementLinkVO) {
atlasElementLinkVO.setAtlasId(atlasId);
UserForm currentUser = RequestUserHolder.getCurrentUser();
atlasElementLinkVO.setCreator(currentUser.getUserId());
atlasService.addAtlasElementLink(atlasId, version, IasBeanUtils.copy(atlasElementLinkVO, AtlasElementLinkDTO.class));
return IasBeanUtils.copy(atlasService.get(atlasId, null), AtlasVO.class);
}
@ApiOperation("删除联动")
@DeleteMapping("/{atlasId}/{version}/atlasElementLink/{atlasElementLinkId}")
public AtlasVO delAtlasElementLink(@PathVariable("version") Long version, @PathVariable("atlasElementLinkId") Long atlasElementLinkId, @PathVariable("atlasId") Long atlasId) {
UserForm currentUser = RequestUserHolder.getCurrentUser();
atlasService.delAtlasElementLink(atlasElementLinkId, version, atlasId, currentUser.getUserId());
return IasBeanUtils.copy(atlasService.get(atlasId, null), AtlasVO.class);
}
@ApiOperation("更新联动")
@PutMapping("/{atlasId}/{version}/atlasElementLink/{atlasElementLinkId}")
public AtlasVO updateAtlasElementLink(@PathVariable("atlasId") Long atlasId, @PathVariable("version") Long version, @PathVariable("atlasElementLinkId") Long atlasElementLinkId, @RequestBody AtlasElementLinkVO atlasElementLinkVO) {
atlasElementLinkVO.setAtlasId(atlasId);
atlasElementLinkVO.setAtlasElementLinkId(atlasElementLinkId);
UserForm currentUser = RequestUserHolder.getCurrentUser();
atlasService.updateAtlasElementLink(version, IasBeanUtils.copy(atlasElementLinkVO, AtlasElementLinkDTO.class), currentUser.getUserId());
return IasBeanUtils.copy(atlasService.get(atlasId, null), AtlasVO.class);
}
先查询再修改
这种方法大多用在并发不高的后台系统,或者一些定时任务JOB,为了支持幂等,支持重复执行,简单的处理方法是,先查询下一些关键数据,判断是否已经执行过,在进行业务处理,就可以了。注意:核心高并发流程不要用这种方法。
例如我们在定时任务中为了避免任务被重复执行,我们往往会根据状态(未执行)去查询一波任务,例如下面的代码示例:
@Scheduled(cron = "0 0/3 * * * ?")
public void task(){
log.info("execute task time: "+ IasDateUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
//1.根据任务状态先查询出为操作的任务
List<DataBusTask> dataBusTaskList = dataBusTaskService.getListByStatus();
//2.对任务进行后续修改操作
dataBusTaskList.forEach(dataBusTask -> {
String uuid = dataBusTaskService.findTaskByName(dataBusTask.getName() ,dataBusTask.getUserId());
if( uuid != null ) {
dataBusTask.setStatus(DataBusConstant.TASK_SUCCESS);
dataBusTask.setUuid(uuid);
int i = dataBusTaskDAO.updateById(dataBusTask);
log.info("update dataBusTask success");
}else {
delBatch(dataBusTask);
}
});
}
2)基于中间件实现
token令牌机制
针对客户端连续点击或者调用方的超时重试等情况,例如提交订单,此种操作就可以用 Token 的机制实现防止重复提交。
简单的说就是调用方在调用接口的时候先向后端请求一个全局 ID(Token),请求的时候携带这个全局 ID 一起请求(Token 最好将其放到 Headers 中),后端需要对这个 Token 作为 Key,用户信息作为 Value 到 Redis 中进行键值内容校验,如果 Key 存在且 Value 匹配就执行删除命令,然后正常执行后面的业务逻辑。如果不存在对应的 Key 或 Value 不匹配就返回重复执行的错误信息,这样来保证幂等操作
public void submitOrder(OrderSubmitDTO orderSubmit) {
String orderToken = orderSubmit.getToken();
Long userId = orderSubmit.getUserId();
//1.验证令牌,保证接口幂等性,防止重复提交订单,注意:【令牌的对比和删除必须保证原子性,否则高并发下会出现多次提交验证通过的情况】
String redisToken = stringRedisTemplate.opsForValue().get(USER_ORDER_TOKEN_PREFIX + userId);
if (Objects.equals(orderToken, redisToken)) {
stringRedisTemplate.delete(USER_ORDER_TOKEN_PREFIX + userId);
}
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String s = stringRedisTemplate.opsForValue().get(USER_ORDER_TOKEN_PREFIX + userId);
Long result = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Lists.newArrayList(USER_ORDER_TOKEN_PREFIX + userId), orderToken);
if (result < 1) {
throw new BusinessException("订单令牌已过期或者不正确,请刷新重试提交");
}
.......
}
注意,在并发情况下,执行 Redis 查找数据与删除需要保证原子性,否则很可能在并发下无法保证幂等性。其实现方法可以使用分布式锁或者使用 Lua 表达式来注销查询与删除操作,不然没办严格保证幂等性
分布式锁
如果是分布是系统,构建全局唯一索引比较困难,唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper, redisson等等),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁
3)状态机
很多时候业务表是有状态的,比如订单表中有:1-下单、2-已支付、3-完成、4-撤销等状态。如果这些状态的值是有规律的,按照业务节点正好是从小到大,我们就能通过它来保证接口的幂等性。
update `order` set status = 3, update_time = now() where user_id=1 and status = 2
该方案仅限于要更新的表有状态字段
,并且刚好要更新状态字段
的这种特殊情况,并非所有场景都适用。