前言
可乐他们团队最近在做一个文章社区平台,由于人手不够,后端部分也是由前端同学来实现,使用的是 nest
。
今天他接到了一个需求,就是在用户点开文章详情的时候,把阅读量 +1
,这里不需要判断用户是否阅读过,无脑 +1
就行。
它心想:这么简单,这不是跟 1+1
一样么。
初版实现
我们用的 orm
框架是 typeorm
,然后他看了一眼官方文档,下面是一个更新的例子。
文章表里有一个字段 views
,表示该文章的阅读量。那我是不是把这篇文章的 views
取出来,然后 +1
,再塞回去就可以了呢?
啪一下,就写出了下面这样的代码:
async addView(articleId: number) { const entity = await this.articleRepository.findOne({ where: { id: articleId }, select: ['views', 'id'], }); entity.views = entity.views + 1; await this.articleRepository.save(entity); }
然后就美滋滋的提测,继续摸鱼去了。
并发bug
不出意外的话意外就要发生了,测试下午就找到了可乐,说这个实现有 bug
,具体复现是在并发压测的时候。
这里用一个 node
脚本来模拟一下并发的请求:
import axios from "axios"; const axiosInstance = axios.create({ withCredentials: true, headers: { Cookie: "your cookie", }, }); for (let i = 0; i < 10; i++) { axiosInstance.get("http://localhost:3000/api/articles/getArticleInfo?id=2"); }
本来应该阅读量加 10
的,结果只加了 1
。
可乐当时脑瓜子嗡嗡的,这还能有 bug
?
具体来说,这是 mysql
处理并发的机制—— MVCC
,它有几种默认的隔离级别。默认的隔离级别是可重复读,在可重复读的隔离级别下,会出现以下这种情况:
也就是说,当多个客户端同时读取相同的文章实体,然后分别对其浏览次数进行增加,并尝试保存回数据库,这有可能前面的提交会被后提交的操作覆盖,导致阅读量的的更新丢失。
update...set
最简单的解决方案就是不要取出来再更新,而是使用 mysql
的 update...set
语句,它本身自带的锁可以帮我们规避掉这种问题。
await this.articleRepository.query( 'UPDATE articles SET views = views + 1 WHERE id = ?', [articleId], );
这一次再用测试脚本去跑的时候,发现是没有问题的了,加的次数是对的。
乐观锁
如果你非要取出来,在代码里面做一些操作,再更新。那么也可以试试乐观锁,乐观锁的实现通常会使用比较与交换 (Compare-and-Swap,CAS)
或类似的机制来确保数据的一致性。
比较与交换是一种并发算法,用于实现原子操作,它会比较内存位置的值与预期值,如果相等则进行更新操作,否则不做任何操作或者重新尝试。
在乐观锁中,当读取实体时,会同时获取一个版本号标识,然后在保存时比较这个标识是否与读取时一致,如果一致,则执行更新操作。
具体的实现是在表里增加一个 version
字段:
ALTER TABLE jueyin.articles ADD version INT DEFAULT 0 NULL;
然后把查询出来的 version
值作为更新条件之一,判断这次更新是否生效,如果不生效,则等待并重试
async addView(articleId: number) { const oldVersion = await this.articleRepository.findOne({ where: { id: articleId }, select: ['version'], }); const res = await this.articleRepository.query( 'UPDATE articles SET views = views + 1,version = version + 1 WHERE id = ? AND version = ?', [articleId, oldVersion.version], ); // 没有更新成功 if (res.affectedRows === 0) { await wait(); await this.addView(articleId); } }
当然,这个阅读量 +1
的场景其实也没必要用到乐观锁了。这里只是通过这个场景,来举例乐观锁的使用。
悲观锁
我们一般使用 SELECT FOR UPDATE
实现悲观锁,即在数据读取的同时锁定数据,以防止其他事务修改数据,从而确保当前事务能够安全地读取和更新数据。
SELECT FOR UPDATE
获取数据并将选定行上的排他锁,别的事务无法读取和修改该行的数据。
总的来说,SELECT FOR UPDATE
适用于需要确保数据一致性和完整性的场景,特别是在并发环境中需要对共享数据进行读取和修改的情况下。通过锁定数据行,可以确保在操作过程中其他事务不会对相同的数据进行修改,从而避免并发冲突和数据不一致性。
但是,这可能会导致大量的线程阻塞,因此使用与否需要根据你自身的业务场景来进行判断。
async addView(articleId: number) { return this.entityManager.transaction(async (entityManager) => { const query = `SELECT views FROM articles WHERE id = ? FOR UPDATE`; const res = await entityManager.query(query, [articleId]); const value = res[0].views + 1; await entityManager.update( ArticleEntity, { id: articleId }, { views: value }, ); }); }
消息队列
再拓展一下,首先阅读量计数是一个频繁的逻辑。尤其对一些热榜文章来说,所以这里可以用到消息队列来做流量控制。
我们可以把一些频繁触发的任务交给消息队列去处理,然后再通过控制消息队列的消费频率或消费者个数,让一些任务可以较为平滑的处理而不是一下子全部处理,这样可以平衡系统的负载。
而且,也可以指定消息队列的消费者数量,比如你可以指定只有一个消费者,来规避上述的并发问题。
我们这里用到了 Bull
,它是一个基于 redis
的消息队列。它提供了简单而灵活的 API
,可用于实现各种队列和后台任务处理的需求。
首先安装一下需要的包:
npm i bull @nestjs/bull
然后可以在 app.module.ts
中进行配置:
@Module({ imports: [ ScheduleModule.forRoot(), BullModule.forRoot({ redis: { host, port, db: 2, password: configService.get<string>('REDIS_PASSWORD', 'password'), }, }) ], }) export class AppModule { }
以阅读量计数为例,当需要对文章的阅读量计数时,可以往队列里面扔一个任务,然后队列根据配置的逻辑,把任务取出来执行。
在 article.service
中,主要组装好参数,然后调用 queue-provider.service
,并加上重试逻辑
async addView(articleId: number) { const addQueue = async (retryTime = 3) => { if (retryTime === 0) { return; } try { const jobId = uuidv4(); await this.queueProviderService.pushAddView({ jobId, articleId, }); } catch (error) { addQueue(retryTime - 1); } }; await addQueue(); }
这个 jobId
主要是用来保证消息只会被消费一次,至于怎么做,在流程的最后会介绍。
然后来实现一个 queue-provider.service
,它主要是操作队列,往队列里面丢东西。
import { Injectable } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { QUEUE } from '../utils/constant'; @Injectable() export class QueueProviderService { constructor(@InjectQueue(QUEUE) private readonly queue: Queue) {} async pushAddView(data: { jobId: string; articleId: number }) { await this.queue.add('addView', data, { attempts: 5, // 任务失败后的重试次数 backoff: { type: 'exponential', // 两次重试之间的时间间隔将以指数形式增长 delay: 5000, // 重试时间间隔,单位:毫秒 }, }); } }
这样,消息就被发送到了队列中,然后我们还要实现一个消息的消费者,来消费这条消息。
import { Process, Processor } from '@nestjs/bull'; import { QUEUE } from '../utils/constant'; import { ArticleService } from '../modules/article/article.service'; @Processor(QUEUE) export class QueueConsumerService { constructor(private articleService: ArticleService) {} @Process('addView') async handleAddView(job: any) { // 处理任务 console.log('Processing job:', job.data); const { jobId, articleId } = job.data; await this.articleService.handleAddView({ jobId, articleId }); } }
这里实现一个 queue-consumer.service
,用来处理消息的消费,最后还是调用了 article.service
中的方法,这里 article.service
的 handleAddView
方法就是用来真正阅读量计数的。
async handleAddView(data: { jobId: string; articleId: number }) { const { jobId, articleId } = data; await this.entityManager.transaction(async (transactionalEntityManager) => { try { await transactionalEntityManager.save(JobRecordEntity, { jobId, }); } catch (error) { if (error?.sqlMessage?.includes('Duplicate entry')) { return; } else { throw error; } } await this.articleRepository.query( 'UPDATE articles SET views = views + 1,version = version + 1 WHERE id = ?', [articleId], ); }); }
好的,这里就需要提到一开始说的 jobId
,这个 jobId
可以认为是这条消息的唯一标识,我们是用 uuid
这个库生成的。一个 jobId
对应的消息只应该被消费一次,这里被消费一次的定义是,一条消息只能是阅读量 +1
。
在生产者一侧,我需要保证消息一定发成功,所以我们会加重试逻辑。但是这并不意味着消费者只收到了一条消息。所以这里需要保证消息的幂等性。
可以使用一个唯一索引来做这件事情,这里新建一张 job_records
表,并把 job_id
作为唯一索引。
-- jueyin.job_records definition CREATE TABLE `job_records` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_id` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `job_records_job_id_IDX` (`job_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=40 DEFAULT CHARSET=utf8mb4;
然后在真正消费消息的时候会开启一个事务,如果报了数据重复的数据,表示这个 jobId
已经存在,这条消息已经被消费过了,此时可以直接 return
,提交事务。
否则就正常走事务的逻辑,成功就提交,失败就回滚。
感悟
或许很多前端都有成为全栈的梦想,所以自发去学习一些后端知识。但无论是前端工程师,还是后端工程师,他们都是软件工程师。软件工程师的必备技能应该是网络、数据库、操作系统。
我见过工作几年的后端只会用框架里的分页组件,让他写一个分页 sql
写不出来;我也见过在分布式系统下用了内存缓存,导致的一些奇葩问题。。
不是说学了一些上层的 node
框架怎么用,会写写 CURD
就是全栈(虽然真正工作中大多数都是这个),我们还是要不断精进自身的能力,才能一步步成为更资深的开发。
诸君,共勉~
最后
以上就是本文的全部内容,如果你觉得有意思的话,点点关注点点赞吧~