1.背景
在做用户画像的过程中经常会遇到,需要将用户数据入缓存的需求,提供给线上服务进行调用,因为最终加工的画像数据普遍会存储在离线表(Hive)或者一些其他存储中(HDFS、Doris 等等)
但是这类数据存储的特点非常明显适合一些内部运营系统做数据分析,但是用来做线上系统的高QPS、低延迟的服务,显然是不能满足的。
因此就必须将画像数据写入到redis 这种类似的分布式缓存当中。
那么如此大量的数据(亿级),如何能更快地写入到缓存当中呢?
2.系统架构设计
(1)利用Spark rdd 多分区的方式来进行并行写入缓存,提升写入缓存速度
(2)数据量太大,写入redis qps 较高,避免对redis 产生较大压力,进行限流控制
核心代码
result.foreachPartition(it -> { Jedis jedis = RedisInstance.getInstance(properties.getProperty("redis.ip"), Integer.parseInt(properties.getProperty("redis.port")), properties.getProperty("redis.pwd")); System.out.println(it.hashCode()); Pipeline pipeline = jedis.pipelined(); AtomicLong atomicLong = new AtomicLong(); long start = System.currentTimeMillis(); it.forEachRemaining(v -> { //System.out.println(v.getString(0)+":"+v.getString(1)); atomicLong.incrementAndGet(); qpsControll(start, requiredQps, atomicLong, it.hashCode()); pipeline.sadd(v.getString(0), v.getString(1)); if (atomicLong.get() % 3 == 0) { //每1000条提交一次 pipeline.sync(); } } ); pipeline.close(); jedis.close(); });
限流控制
private static void qpsControll(long start, int requiredQps, AtomicLong count, int x) { //System.out.println("current count:"+x+":"+ count.get()); long actualQps = 1000 * count.get() / (System.currentTimeMillis() - start); System.out.println(x + ":" + actualQps); if (actualQps > (long) requiredQps) { System.out.println("=====stop ====="); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println(e); } } }
3.线上效果
能看到写入的qps 是在我们控制的范围内,一旦超过范围就会暂停一小段时间,项目源码已经开源,欢迎大家star,fork
https://gitee.com/ZhuGeZiFang/spark-redis
https://github.com/zhugezifang/hdfs-to-redis