🌷🍁 博主猫头虎 带您 Go to New World.✨🍁
🦄 博客首页——猫头虎的博客🎐
🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺
🌊 《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐
🌊 《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大小厂~💐
🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥
高并发幂等计数器的设计与实现
摘要
本文探讨了如何实现一个高并发、幂等的计数器服务,该服务用于处理外部的 inc
请求以增加特定视频的播放计数。考虑到网络延迟和重试等因素,该服务需要确保每个请求至少被处理一次,同时避免重复计数。我们使用了 MySQL 用于持久化存储计数数据,并用 Redis 进行幂等性检查。本文通过 Go、Java 和 Python 三种编程语言展示了具体的实现代码,并对核心逻辑进行了详细解释。Java 代码部分更是进行了全流程的展示,包括幂等性检查、数据库更新和已处理请求的记录。这样的设计不仅确保了高并发处理能力,还实现了请求的幂等性。
引言
在分布式系统中,高并发和幂等性是两个非常关键的问题。本文将探讨如何实现一个高并发、幂等的计数器服务。该服务接受外部的 inc
请求,用于增加特定视频的播放计数。由于网络延迟和请求重试等原因,多个相同或不同的 inc
请求可能并发到达服务。因此,服务需要确保每个请求至少被处理一次(at least once),同时避免重复计数。我们将使用 Go、Java 和 Python 来分别演示这一实现。
问题描述:
高并发幂等计数器题目
问题描述:
1.实现一个计数器服务
2.服务接收外部的 inc 请求,每个请求具有全局唯一 request id 和视频 id
3.因为网络和重试的原因,请求可能会重复的到达
4.时序上,多个重复的请求可能并发达到,两次重复请求之间的间隔不可预期
5.需要保证 at least once ,计数值不能丢失
6.可以依赖一些外部组件, mysql redis
依赖组件
- MySQL: 用于持久化存储计数器的数据。
- Redis: 用于高速缓存和临时存储已经接收到的 request id。
实现思路
- 接收请求: 使用 Web 框架接收 inc 请求,并提取其中的
request_id
和video_id
。 - 幂等检查: 使用 Redis 查询该
request_id
,如果已存在,则该请求已被处理。 - 队列或缓存: 如果是新的
request_id
,则将其存入 Redis,并进行数据库更新操作。 - 计数逻辑: 从 MySQL 中获取当前计数,然后加 1,并更新回数据库。
Go 代码示例
// 导入相应的包 import ( "github.com/go-redis/redis/v8" "database/sql" // 其他必要的包 ) func incHandler(requestID string, videoID string) string { if isProcessed(requestID) { return "Already Processed" } // 更新数据库 updateCounter(videoID) return "OK" } func isProcessed(requestID string) bool { // Redis 检查 val, _ := redisClient.Get(ctx, requestID).Result() return val != "" } func updateCounter(videoID string) { // MySQL 更新 // 省略具体实现 }
Java 代码示例
简单代码:
import redis.clients.jedis.Jedis; import org.springframework.jdbc.core.JdbcTemplate; // 其他必要的导入 @RestController public class CounterController { @Autowired Jedis jedis; @Autowired JdbcTemplate jdbcTemplate; @RequestMapping("/inc") public String inc(@RequestParam String requestId, @RequestParam String videoId) { if (isProcessed(requestId)) { return "Already Processed"; } // 更新数据库 updateCounter(videoId); return "OK"; } public boolean isProcessed(String requestId) { // Redis 检查 return jedis.exists(requestId); } public void updateCounter(String videoId) { // MySQL 更新 // 省略具体实现 } }
详细代码:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.*; import redis.clients.jedis.Jedis; @RestController public class CounterController { @Autowired private Jedis jedis; @Autowired private JdbcTemplate jdbcTemplate; @GetMapping("/inc") public String inc(@RequestParam String requestId, @RequestParam String videoId) { // Step 1: 幂等性检查 if (isProcessed(requestId)) { return "Already Processed"; } // Step 2: 更新数据库 if (updateCounter(videoId)) { // Step 3: 将 requestId 存入 Redis 以保证幂等性 jedis.set(requestId, "true"); return "OK"; } return "Failed"; } private boolean isProcessed(String requestId) { // 使用 Redis 检查 requestId 是否已处理 return jedis.exists(requestId); } private boolean updateCounter(String videoId) { // 使用 JdbcTemplate 和 MySQL 更新计数器 String query = "SELECT count FROM video_counter WHERE video_id = ?"; Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class); if (count == null) { // 如果视频尚未有计数,初始化为 1 jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId); } else { // 如果视频已有计数,加 1 jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId); } return true; } }
思路解析
幂等性检查: 使用 Redis 进行了幂等性检查。如果请求已经被处理过(即在 Redis 中有记录),我们就直接返回。
数据库更新: 我们使用 Spring 的 JdbcTemplate 来与 MySQL 进行交互。如果这是一个全新的 video_id,我们将其添加到数据库并初始化计数为 1;否则,我们找到当前计数并加 1。
记录处理过的请求: 最后,我们将处理过的 request_id 添加到 Redis 中,以便进行未来的幂等性检查。
这样,我们就得到了一个高并发、幂等的计数器服务。
优化问题
如果两个重复的请求,进入到Step1 都没有查到,不是就都进入step2
解决方法
如果两个相同的 request_id
几乎同时到达,并且都通过了 Step 1 的幂等性检查,那么它们都会进入 Step 2,从而违反了我们想要的幂等性。
在不使用 Redis 或其他外部锁服务的情况下,这个问题变得更加复杂。一种可能的解决方案是使用数据库的原子操作和唯一约束来确保幂等性。具体来说,您可以使用数据库事务来解决这个问题。
解决方案一(不使用Redis):
- 创建一个新表(比如叫做
processed_requests
)用于存储已经处理过的request_id
。该表有一个唯一约束在request_id
上。 - 在处理一个新请求之前,尝试将
request_id
插入到processed_requests
表中。如果插入成功,说明这是第一次处理该请求,然后再去更新视频的播放计数。如果插入失败(因为唯一约束),则说明该请求已经被处理过,直接返回。 - 这一切都应在一个数据库事务中完成,以确保操作的原子性。
这里是更新后的 Java 代码片段:
import org.springframework.transaction.annotation.Transactional; // ... @RestController public class CounterController { @Autowired private JdbcTemplate jdbcTemplate; @Transactional @GetMapping("/inc") public String inc(@RequestParam String requestId, @RequestParam String videoId) { // Step 1: 幂等性检查和插入 try { jdbcTemplate.update("INSERT INTO processed_requests(request_id) VALUES(?)", requestId); } catch (DuplicateKeyException e) { return "Already Processed"; } // Step 2: 更新数据库 updateCounter(videoId); return "OK"; } private void updateCounter(String videoId) { String query = "SELECT count FROM video_counter WHERE video_id = ?"; Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class); if (count == null) { jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId); } else { jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId); } } }
这样,在并发环境中,由于数据库的唯一约束和事务,我们可以确保每个 request_id
只会被处理一次。这解决了您提到的问题。
方案二: 使用Redis
如果使用 Redis,一种常见的方法是利用 Redis 的原子操作来解决这个问题。具体来说,可以使用 SETNX
(SET if Not eXists)命令来设置 request_id
的值,并检查操作是否成功。这个命令只有在键不存在时才会设置值,并返回一个标识(通常是 1 表示成功,0 表示失败)。因为 Redis 的这个操作是原子性的,所以即使有多个相同的请求几乎同时到达,也只有一个能够成功设置该 request_id
。
下面是 Java 代码中这部分逻辑的示例:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import redis.clients.jedis.Jedis; @RestController public class CounterController { @Autowired private Jedis jedis; @Autowired private JdbcTemplate jdbcTemplate; @GetMapping("/inc") public String inc(@RequestParam String requestId, @RequestParam String videoId) { // Step 1: 幂等性检查 if (!setIfNotExists(requestId)) { return "Already Processed"; } // Step 2: 更新数据库 updateCounter(videoId); return "OK"; } private boolean setIfNotExists(String requestId) { return jedis.setnx(requestId, "true") == 1; } private void updateCounter(String videoId) { String query = "SELECT count FROM video_counter WHERE video_id = ?"; Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class); if (count == null) { jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId); } else { jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId); } } }
这里,setIfNotExists
方法使用 Redis 的 SETNX
命令来尝试设置一个 request_id
。如果设置成功(返回 1),则继续更新数据库。如果设置失败(返回 0,表示该 request_id
已经存在),则直接返回 “Already Processed”,实现幂等性。
这样,在高并发环境下,即使多个相同的 request_id
几乎同时到达,也只有一个会被实际处理,其他的都会在 setIfNotExists
阶段被拦截,从而确保了幂等性。
Python 代码示例
from flask import Flask, request import redis import mysql.connector app = Flask(__name__) r = redis.Redis(host='localhost', port=6379, db=0) conn = mysql.connector.connect(user='root', password='password', database='test') @app.route('/inc', methods=['POST']) def inc(): request_id = request.form['request_id'] video_id = request.form['video_id'] if r.exists(request_id): return 'Already Processed' r.set(request_id, 1) cursor = conn.cursor() # 更新 MySQL # 省略具体实现 return 'OK'
拓展:
面试官提问:1.我就是同一个时间段大量的请求到来了怎么办?
2.还有这个合适的过期时间怎么合适
1. 大量请求同时到来
当大量请求在同一时间段到来时,一般有以下几种应对方式:
扩展性:
- 横向扩展(水平扩展): 通过增加更多的服务器来分摊负载。
- 垂直扩展(纵向扩展): 通过增加单个服务器的计算资源(例如,CPU、RAM)。
限流:
- 令牌桶或漏桶算法: 控制进入到系统的请求速率。
- 优先级队列: 为不同类型的请求设置不同的优先级。
缓存:
- 读缓存: 对常读不变或少变的数据进行缓存。
- 写缓存(Write-Through、Write-Back): 缓存写操作,批量或异步地更新到数据库。
异步处理:
- 消息队列: 使用如 RabbitMQ、Kafka 等消息队列服务将处理请求和更新数据库的操作解耦。
2. 合适的过期时间
设置合适的过期时间主要依赖于业务需求和系统资源。以下几点可能有助于确定一个合适的过期时间:
- 业务逻辑: 如果请求是短暂的(比如秒杀活动),那么短的过期时间可能更合适。
- 资源限制: 如果 Redis 或其他缓存资源有限,较短的过期时间可以更快地回收资源。
- 数据一致性: 长的过期时间可能导致数据不一致,短的过期时间虽然可以减少这种风险,但会增加系统的复杂性。
- 网络延迟和重试策略: 如果网络延迟和重试可能导致长时间的重复请求,应设置相对长一些的过期时间。
综合以上因素,过期时间的设置通常需要在性能和准确性之间找到一个平衡点。这通常需要通过实验和性能测试来不断调整和优化。
如果不用redis呢?
如果不使用 Redis,我们有几种其他方法也可以实现高并发和幂等性。
1. 数据库唯一索引
可以在数据库中为 request_id
设置唯一索引。当重复请求到来时,尝试插入相同的 request_id
将会触发数据库错误,从而达到幂等性的目的。
private boolean isProcessed(String requestId) { try { jdbcTemplate.update("INSERT INTO request_ids(request_id) VALUES(?)", requestId); } catch (DuplicateKeyException e) { return true; } return false; }
2. 应用内存
在应用级别维护一个已处理 request_id
的集合。这样不需要外部存储,但这种方式不适合分布式系统,因为每个实例都有自己的内存。
private Set<String> processedRequestIds = Collections.synchronizedSet(new HashSet<>()); private boolean isProcessed(String requestId) { if (processedRequestIds.contains(requestId)) { return true; } processedRequestIds.add(requestId); return false; }
3. 分布式锁
使用数据库或者其他分布式锁机制确保同一时间只有一个请求在处理。这种方法可以确保幂等性但可能影响系统性能。
private boolean isProcessed(String requestId) { if (acquireLock(requestId)) { // 进行处理 releaseLock(requestId); return false; } return true; }
4. 消息队列
使用消息队列(如 Kafka、RabbitMQ 等)确保消息的幂等处理。大多数现代消息队列都提供了这种机制。
5. 文件系统
在文件系统中为每个 request_id
创建一个唯一文件。通过检查文件存在与否,来确定是否处理了该请求。但这种方法不适合高并发场景。
每种方法都有其优点和局限性。选择哪种方法取决于具体需求、可用资源以及你愿意接受的复杂性。
总结
通过使用 Redis 进行幂等性检查,以及使用 MySQL 进行持久化存储,我们成功地实现了一个高并发、幂等的计数器服务。这种设计能够在高并发条件下保证 at least once 语义,同时也实现了幂等性。
该设计还有进一步优化和扩展的空间,例如,可以加入更多的负载均衡和高可用性特性,或者使用消息队列来进一步解耦生产者和消费者。
感谢您的阅读,希望本文能为您提供有用的信息和启示。如有任何问题或建议,请随时留言。
作者: [猫头虎]
发布时间: [2023.08.30]
原创声明
======= ·
- 原创作者: 猫头虎
作者wx: [ libin9iOak ]
学习 | 复习 |
✔ |
本文为原创文章,版权归作者所有。未经许可,禁止转载、复制或引用。
作者保证信息真实可靠,但不对准确性和完整性承担责任。
未经许可,禁止商业用途。
如有疑问或建议,请联系作者。
感谢您的支持与尊重。
点击
下方名片
,加入IT技术核心学习团队。一起探索科技的未来,共同成长。