Flink / Scala - 使用 RedisSink 存储数据

简介: 现在有一批流数据想要存储到 Redis 中,离线可以使用 Spark + foreach 搞定,由于是多流 join 且带状态,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑。

一.引言

现在有一批流数据想要存储到 Redis 中,离线可以使用 Spark + foreach 搞定,由于是多流 join 且带状态,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑。

二.RedisSink 简介

1.源码浅析

image.gif编辑

RedisSink 和 KafkaSink 类似都是继承了 RichSinkFunction,其内部主要实现了三个方法以及五个主要变量 :

A.五个变量

String additionalKey : 附加键,redis 主要是 k-v 存储,也有 k-k-v 式存储,additionalKey 即为 k-k-v 的第一个 k

RedisMapper<In> redisSinkMapper : 从 In 中解析 k,v,按指定的 RedisCommand 执行操作

RedisCommond redisCommand : redis 指令,例如 set(k, v),lpush(k, v) ...

FlinkJedisConfigBase: Redis 配置,分别支持 Redis、RedisPool 、RedisCluster

RedisCommandsContainer:redis 容器,根据 FlinkJedisConfigBase 配置以及上面的 commond 执行 k-v、k-k-v 的操作

B.三个方法

open: 初始化相关参数,主要是基于 FlinkJedisConfigBase 初始化 RedisCommandsContainer

close: 关闭相关 Socket,这里主要关闭 RedisCommandsContainer

invoke: 针对单个 INPUT 基于 Socket 的执行操作,这里主要是执行相关 Jedis、JedisPool、JedisCluster 操作

2.底层实现

A.FlinkJedisConfigBase

image.gif编辑

FlinkJedisConfigBase 其实只是一个中转类,其内部存储了相关的 jedis 参数,执行 build 初始化时将 FlinkJedisConfigBase 内的参数转到 GenericObjectPoolConfig 中再构造 RedisCommandsContainer

image.gif编辑

B. RedisCommandsContainer

RedisCommandsContainer 底层实现基于 Jedis 的 JedisCluster、JedisPool 和 JedisSentinePool,分贝对应 flinkJedisCluster、flinkJedisPool 和 flinkJedisSentine,通过 build 方法和 flinkJedisConfig 实现相关类的初始化。

image.gif编辑

C. RedisCommond

这里其实是对 Jedis 指令的封装,目前只支持无返回值的存储命令,例如 lpush、sadd、hset 等等,也可以理解,对于流式程序的最终 sink,在低延迟高吞吐的场景下,尽量避免读取的流量,例如 get、hget 命令很明显不适合在 sink 场景下实现,不过也不是不能实现,继承 RedisCommandsContainer 类即可基于 Jedis 实现其他的 redis 指令。

image.gif编辑

三.RedisSink 示例

1.实现需求与辅助类

需求: 自定义 Source 实现将 k-v 存储至 redis 中

A.K-V 存储类

case class SaveInfo(key: String, value: String)

image.gif

B.RedisMapper 命令类

这里使用最基础的 SET 命令,将 SaveInfo 的 k-v 存储至对应 redis。

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class JedisMapper extends RedisMapper[SaveInfo] {
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.SET)
  }
  override def getKeyFromData(saveInfo: SaveInfo): String = {
    saveInfo.key
  }
  override def getValueFromData(saveInfo: SaveInfo): String = {
    saveInfo.value
  }
}

image.gif

2.主函数

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 自定义 Source
    val sourceArray = (0 to 5).map("TestKey" + _).zipWithIndex.toArray.map { case (k, v) =>
      SaveInfo(k, v.toString)
    }
    // 定义 FlinkJedisPool 配置
    val flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
      .setHost(host)
      .setPort(port)
      .setTimeout(1000)
      .setMaxTotal(10)
      .setMaxIdle(10)
      .setMinIdle(10)
      .build()
    // 初始化 RedisSink
    val jedisSink = new RedisSink(flinkJedisPoolConfig, new JedisMapper)
    // 执行 DAG
    env.fromCollection(sourceArray).addSink(jedisSink)
    env.execute()
  }

image.gif

生成测试的有限流,并直接引入 JedisSink,逻辑非常简单。

3.运行效果

先看下 Source 内的几条数据样式:

image.gif编辑

再看下执行后的 Redis 内容:

image.gif编辑

逻辑执行没有问题。

四.总结

这里示例了最基本的 JedisSink 方法,即初始化 FlinkJedisPool 进行单条数据的 Invoke 操作,但是一般最好采用批处理的方式,即获取 RedisResource,存储 N 条,return resource,如此循环往复。后续将介绍自定义实现 RedisCommandsContainer 的方法以及如何流转批,一次处理多条 redis 存储 k-v。

目录
相关文章
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
841 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
308 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
8月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
859 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
4月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
1964 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
5月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
387 1
京东零售基于Flink的推荐系统智能数据体系
|
12月前
|
存储 SQL 缓存
Flink 2.0 存算分离状态存储 — ForSt DB 
本文整理自阿里云技术专家兰兆千在Flink Forward Asia 2024上的分享,主要介绍Flink 2.0的存算分离架构、全新状态存储内核ForSt DB及工作进展与未来展望。Flink 2.0通过存算分离解决了本地磁盘瓶颈、检查点资源尖峰和作业恢复速度慢等问题,提升了云原生部署能力。ForSt DB作为嵌入式Key-value存储内核,支持远端读写、批量并发优化和快速检查点等功能。性能测试表明,ForSt在异步访问和本地缓存支持下表现卓越。未来,Flink将继续完善SQL Operator的异步优化,并引入更多流特性支持。
1202 88
Flink 2.0 存算分离状态存储 — ForSt DB 
|
9月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
10月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2550 45
|
9月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
182 1
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
917 61