先赞后看,南哥助你Java进阶一大半
弹幕系统最早起源于日本,流行于视频网站niconico。我们认识的初音未来(Hatsune Miku)就是在niconico
平台上爆红的!!
我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。
⭐⭐⭐本文收录在《Java学习/进阶/面试指南》:https://github..JavaSouth
1. 直播弹幕设计
1.1 底层数据结构支持
南友们看看右下角的弹幕列表,这个弹幕列表就是我们今天要的攻克的对象,至于中间视频直播的走马灯弹幕
,它其实也是根据弹幕列表的数据来进行滚动。
南哥观察了下这个直播间,现在有41.5万
人在观看!
计算机世界实际上是现实世界的抽象,那弹幕列表我们要用什么数据结构支持。
我希望用的是Redis,Redis官方写着这么霸气的宣传语。
Get the world’s fastest in-memory database from the ones who built it
从构建者那里获取世界上最快的内存数据库
而底层数据结构我们使用Redis五大基本数据类型之一:Zset。Zset是一种有序集合类型,它有一个score值,score值用来存储用户发送弹幕的时间戳,那整个列表就会根据时间戳来进行排序。
而Zset元素的值就作为用户弹幕,例如上图的:剧本演了又演
。
有了底层数据结构支持,我们来说说这个弹幕列表有什么功能限制。大家有没注意到我们进入直播间,直播间是不会把所有的弹幕内容都
显示出来的,往往只是显示前10条。
那我们也给弹幕列表加上这个特性,在Redis的Zset结构设置只保留前10条的属性。
// 创建Zset数据结构并设置10条的限制
public class DanmakuService {
private Jedis jedis = new Jedis("localhost");
public void addDanmaku(String roomId, String danmaku, long timestamp) {
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, danmaku);
// 只保留最新的10条弹幕
jedis.zremrangeByRank(key, 0, -(11));
}
}
1.2 弹幕列表查询
那用户进入直播间,弹幕列表是怎么查询出来的?
我们按最简单高效来,用户进入直播间,客户端调用API接口去查询出Redis里的弹幕列表。
有南友会问:这只是最近的前10条聊天记录,后面的呢?
别急,有两种方案。
(1)轮询查询
客户端轮询查询API接口,不断抓取出用户发出的新弹幕。具体细节的话,客户端第一次查询出的弹幕列表的数据结构是:[(时间戳1: 弹幕1), (时间戳2: 弹幕2)]
。
后续查询客户端继续轮询调用API接口,同时携带当前弹幕列表的最大时间戳入参。而后端服务就会根据该时间戳返回比该时间戳大的数据,用户发送的新的弹幕也就会显示出来。
// 轮询API接口
public class DanmakuService {
private Jedis jedis = new Jedis("localhost");
public Set<String> getRecentDanmaku(String roomId, long lastTimestamp) {
String key = "room:" + roomId + ":danmaku";
return jedis.zrangeByScore(key, lastTimestamp + 1, Long.MAX_VALUE);
}
}
轮询API我们要设置多少时间轮询一次呢?我们先设置 3 秒
轮询更新一次弹幕列表,后续再根据用户反馈、服务器资源进行优化调整。
(2)WebSocket技术
视频直播间有个特点,主播和观众是无时不刻在进行互动聊天的,这就要求音视频要实时同步了。那直播弹幕就更应该实时,使用第一种轮询API的方法,可能会有 3 秒延迟
的情况发生。
要实时推送新的弹幕,我们可以使用WebSocket技术,客户端和WebSocket服务器保存长连接,用户只要发送新的弹幕消息,WebSocket服务器便会实时推送到客户端上。
第一种方法虽然粗暴,如果轮询查询会空,那本次查询就是一次资源浪费,对服务器资源不友好。但他简单高效,出错情况也少。
如果老板要你半个月上线这个弹幕列表功能,那第一种方法也未尝不可。后续我们再来根据实际情况作出升级调整的策略,例如升级为WebSocket技术。
1.3 系统流程
南哥画下整个系统的流程。
用户通过客户端发送弹幕,通过后端服务把弹幕消息发送到Kafka。使用Kafka消息我们就可以进行流量削峰,弹幕消息有时是成千上万 / 秒,把存储弹幕消息的任务通过消息队列缓存起来,一个个执行,减少服务器瞬时的高压力。
发送到Kafka后,负责监听弹幕Kafka消息的后端服务会把弹幕消息写入Redis。
// 监听写入Redis
public class DanmakuListener {
private Jedis jedis = new Jedis("localhost");
@KafkaListener(topics = "danmaku", groupId = "group_id")
public void listen(ConsumerRecord<String, String> record) {
String roomId = record.key();
String message = record.value();
long timestamp = System.currentTimeMillis();
String key = "room:" + roomId + ":danmaku";
// 将弹幕消息写入Redis Zset
jedis.zadd(key, timestamp, message);
// 保留最近10条
jedis.zremrangeByRank(key, 0, -11);
}
}
1.4 消息丢失问题
有这么一种情况,用户A发送了(1726406132, 弹幕A)
,用户B发送了(1726406150, 弹幕B)
,用户A先发送了弹幕,用户B再发送弹幕。
如果用户B的弹幕先写入到了Redis的Zset列表,其他用户进入直播间查询了第一个弹幕列表。那即使用户A后面成功写入了弹幕,其他用户也不会获取到用户A的弹幕。
因为客户端只会更新比弹幕B时间戳更大的弹幕消息。这怎么处理?
我们要解决的是弹幕B先于弹幕A成功写入的问题,不考虑其他特殊情况,可以给写入Redis的方法加上分布式锁的功能,保证先获取锁的弹幕消息写入的过程中,不会有其他弹幕消息写入的干扰。
// 在写入弹幕时获取分布式锁
public class DanmakuService {
private RedisLockUtil redisLock = new RedisLockUtil();
private Jedis jedis = new Jedis("localhost");
public void addDanmakuWithLock(String roomId, String danmaku, long timestamp) {
String lockKey = "lock:" + roomId;
try {
if (redisLock.acquireLock(lockKey)) {
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, danmaku);
jedis.zremrangeByRank(key, 0, -11);
}
} finally {
redisLock.releaseLock(lockKey);
}
}
}
public class RedisLockUtil {
private Jedis jedis = new Jedis("localhost");
private static final int EXPIRE_TIME = 5000; // 5秒
// 获取锁
public boolean acquireLock(String lockKey) {
long currentTime = System.currentTimeMillis();
String result = jedis.set(lockKey, String.valueOf(currentTime), "NX", "PX", EXPIRE_TIME);
return "OK".equals(result);
}
// 释放锁
public void releaseLock(String lockKey) {
jedis.del(lockKey);
}
}
戳这,《JavaSouth》作为一份涵盖Java程序员所需掌握核心知识、面试重点的《Java学习进阶指南》。
我是南哥,南就南在Get到你的有趣评论➕点赞➕关注。
创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️