Flink / Scala - 使用 RedisSink 存储数据

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 现在有一批流数据想要存储到 Redis 中,离线可以使用 Spark + foreach 搞定,由于是多流 join 且带状态,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑。

一.引言

现在有一批流数据想要存储到 Redis 中,离线可以使用 Spark + foreach 搞定,由于是多流 join 且带状态,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑。

二.RedisSink 简介

1.源码浅析

image.gif编辑

RedisSink 和 KafkaSink 类似都是继承了 RichSinkFunction,其内部主要实现了三个方法以及五个主要变量 :

A.五个变量

String additionalKey : 附加键,redis 主要是 k-v 存储,也有 k-k-v 式存储,additionalKey 即为 k-k-v 的第一个 k

RedisMapper<In> redisSinkMapper : 从 In 中解析 k,v,按指定的 RedisCommand 执行操作

RedisCommond redisCommand : redis 指令,例如 set(k, v),lpush(k, v) ...

FlinkJedisConfigBase: Redis 配置,分别支持 Redis、RedisPool 、RedisCluster

RedisCommandsContainer:redis 容器,根据 FlinkJedisConfigBase 配置以及上面的 commond 执行 k-v、k-k-v 的操作

B.三个方法

open: 初始化相关参数,主要是基于 FlinkJedisConfigBase 初始化 RedisCommandsContainer

close: 关闭相关 Socket,这里主要关闭 RedisCommandsContainer

invoke: 针对单个 INPUT 基于 Socket 的执行操作,这里主要是执行相关 Jedis、JedisPool、JedisCluster 操作

2.底层实现

A.FlinkJedisConfigBase

image.gif编辑

FlinkJedisConfigBase 其实只是一个中转类,其内部存储了相关的 jedis 参数,执行 build 初始化时将 FlinkJedisConfigBase 内的参数转到 GenericObjectPoolConfig 中再构造 RedisCommandsContainer

image.gif编辑

B. RedisCommandsContainer

RedisCommandsContainer 底层实现基于 Jedis 的 JedisCluster、JedisPool 和 JedisSentinePool,分贝对应 flinkJedisCluster、flinkJedisPool 和 flinkJedisSentine,通过 build 方法和 flinkJedisConfig 实现相关类的初始化。

image.gif编辑

C. RedisCommond

这里其实是对 Jedis 指令的封装,目前只支持无返回值的存储命令,例如 lpush、sadd、hset 等等,也可以理解,对于流式程序的最终 sink,在低延迟高吞吐的场景下,尽量避免读取的流量,例如 get、hget 命令很明显不适合在 sink 场景下实现,不过也不是不能实现,继承 RedisCommandsContainer 类即可基于 Jedis 实现其他的 redis 指令。

image.gif编辑

三.RedisSink 示例

1.实现需求与辅助类

需求: 自定义 Source 实现将 k-v 存储至 redis 中

A.K-V 存储类

case class SaveInfo(key: String, value: String)

image.gif

B.RedisMapper 命令类

这里使用最基础的 SET 命令,将 SaveInfo 的 k-v 存储至对应 redis。

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class JedisMapper extends RedisMapper[SaveInfo] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET)
  }
  override def getKeyFromData(saveInfo: SaveInfo): String = {
    saveInfo.key
  }
  override def getValueFromData(saveInfo: SaveInfo): String = {
    saveInfo.value
  }
}

image.gif

2.主函数

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 自定义 Source
    val sourceArray = (0 to 5).map("TestKey" + _).zipWithIndex.toArray.map { case (k, v) =>
      SaveInfo(k, v.toString)
    }
    // 定义 FlinkJedisPool 配置
    val flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost(host)
      .setPort(port)
      .setTimeout(1000)
      .setMaxTotal(10)
      .setMaxIdle(10)
      .setMinIdle(10)
      .build()
    // 初始化 RedisSink
    val jedisSink = new RedisSink(flinkJedisPoolConfig, new JedisMapper)
    // 执行 DAG
    env.fromCollection(sourceArray).addSink(jedisSink)
    env.execute()
  }

image.gif

生成测试的有限流,并直接引入 JedisSink,逻辑非常简单。

3.运行效果

先看下 Source 内的几条数据样式:

image.gif编辑

再看下执行后的 Redis 内容:

image.gif编辑

逻辑执行没有问题。

四.总结

这里示例了最基本的 JedisSink 方法,即初始化 FlinkJedisPool 进行单条数据的 Invoke 操作,但是一般最好采用批处理的方式,即获取 RedisResource,存储 N 条,return resource,如此循环往复。后续将介绍自定义实现 RedisCommandsContainer 的方法以及如何流转批,一次处理多条 redis 存储 k-v。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
13天前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
38 3
|
6天前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
13天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
38 1
|
14天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
18 1
|
13天前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
13 0
|
13天前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
37 0
|
13天前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
29 0
|
13天前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
35 0
|
1月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
66 2