Flink SQL 的数据脱敏解决方案

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink SQL 的数据脱敏解决方案,支持面向用户级别的数据脱敏访问控制,即特定用户只能访问到脱敏后的数据。

Flink SQL 的数据脱敏解决方案,支持面向用户级别的数据脱敏访问控制,即特定用户只能访问到脱敏后的数据。此方案是实时领域Flink的解决思路,类似于离线数仓 Hive 中 Ranger Column Masking 方案。

一、基础知识

1.1 数据脱敏

数据脱敏(Data Masking)是一种数据安全技术,用于保护敏感数据,以防止未经授权的访问。该技术通过将敏感数据替换为虚假数据或不可识别的数据来实现。例如可以使用数据脱敏技术将信用卡号码、社会安全号码等敏感信息替换为随机生成的数字或字母,以保护这些信息的隐私和安全。

1.2 业务流程

下面用订单表orders的两行数据来举例,示例数据如下:

1.2.1 设置脱敏策略

管理员配置用户、表、字段、脱敏条件,例如下面的配置。

Data mask example conditions.png

1.2.2 用户访问数据

当用户在Flink上查询orders表的数据时,会在底层结合该用户的脱敏条件重新生成 SQL,即让数据脱敏生效。
当用户 A 和用户 B 在执行下面相同的 SQL 时,会看到不同的结果数据。

SELECT * FROM orders

用户A查看到的结果数据如下customer_name字段的数据被全部掩盖掉。

Data mask-masked with customer_name after mask.png

用户 B 查看到的结果数据如下customer_name字段的数据只会显示前 4 位,剩下的用 x 代替。

Data mask-masked with customer_name after mask_show_first_4.png

二、Hive 数据脱敏解决方案

在离线数仓工具 Hive 领域,由于发展多年已有 Ranger 来支持字段数据的脱敏控制,详见参考文献【1】
下图是在 Ranger 里配置 Hive 表数据脱敏条件的页面,供参考。

Hive-Ranger data mask.png

但由于 Flink 实时数仓领域发展相对较短,Ranger 还不支持 Flink SQL,以及依赖 Ranger 的话会导致系统部署和运维过重,因此开始自研实时数仓的数据脱敏解决工具。当然本文中的核心思想也适用于 Ranger 中,可以基于此较快开发出 ranger-flink 插件。

三、Flink SQL 数据脱敏解决方案

3.1 解决方案

3.1.1 Flink SQL 执行流程

根据 Flink 1.16 修正和简化后的执行流程如下图所示。
FlinkSQL simple-execution flowchart.png

CalciteParser.parse()处理后会得到一个 SqlNode 类型的抽象语法树,本文会针对此抽象语法树来组装脱敏条件后来生成新的 AST,以实现数据脱敏控制。

3.1.2 Calcite 对象继承关系

下面章节要用到 Calcite 中的 SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall 和 SqlSelect 等类,此处进行简单介绍以及展示它们间继承关系,以便读者阅读本文源码。

序号 介绍
1 SqlNode A SqlNode is a SQL parse tree.
2 SqlCall A SqlCall is a call to an SqlOperator operator.
3 SqlIdentifier A SqlIdentifier is an identifier, possibly compound.
4 SqlJoin Parse tree node representing a JOIN clause.
5 SqlBasicCall Implementation of SqlCall that keeps its operands in an array.
6 SqlSelect A SqlSelect is a node of a parse tree which represents a select statement, the parent class is SqlCall

Calcite SqlNode diagrams.png

3.1.3 解决思路

针对输入的 Flink SQL,在CalciteParser.parse()进行语法解析后生成抽象语法树(Abstract Syntax Tree,简称 AST)后,采用自定义Calcite SqlBasicVisitor的方法遍历AST中的所有SqlSelect,获取到里面的每个输入表。如果输入表中字段有配置脱敏条件,则针对输入表生成子查询语句,并把脱敏字段改写成CAST(脱敏函数(字段名) AS 字段类型) AS 字段名,再通过CalciteParser.parseExpression()把子查询转换成 SqlSelect,并用此 SqlSelect 替换原 AST 中的输入表来生成新的 AST,最后得到新的 SQL 来继续执行。

FlinkSQL data mask solution.png

3.2 详细方案

3.2.1 解析输入表

通过对Flink SQL 语法的分析和研究,最终出现输入表的只包含以下两种情况:

  1. SELECT 语句的 FROM 子句,如果是子查询,则递归继续遍历。
  2. SELECT ... JOIN 语句的 Left 和 Right 子句,如果是多表 JOIN,则递归查询遍历。

因此,下面的主要步骤会根据 FROM 子句的类型来寻找输入表。

3.2.2 主要步骤

主要通过 Calcite 提供的访问者模式自定义 DataMaskVisitor 来实现,遍历 AST 中所有的 SqlSelect 对象用子查询替换里面的输入表。

下面详细描述替换输入表的步骤,整体流程如下图所示。

Data mask-rewrite the main process.png

  1. 遍历 AST 中 SELECT 语句。
  2. 判断是否自定义的 SELECT 语句(由下面步骤 10 生成),是则跳转到步骤 11,否则继续步骤 3。
  3. 判断 SELECT 语句中的 FROM 类型,按照不同类型对应执行下面的步骤 4、5、6 和 11。
  4. 如果 FROM 是 SqlJoin 类型,则分别遍历其左 Left 和 Right 右节点,即执行当前步骤 4 和步骤 7。由于可能是三张表及以上的 Join,因此进行递归处理,即针对其左 Left 节点跳回到步骤 3。
  5. 如果 FROM 是 SqlIdentifier 类型,则表示是表。但是输入 SQL 中没有定义表的别名,则用表名作为别名。跳转到步骤 8。
  6. 如果 FROM 是 SqlBasicCall 类型,则表示带别名。但需要判断是否来自子查询,是则跳转到步骤 11 继续遍历AST,后续步骤 1 会对子查询中的 SELECT 语句进行处理。否则跳转到步骤 8。
  7. 递归处理 Join 的右节点,即跳回到步骤3。
  8. 遍历表中的每个字段,如果某个字段有定义脱敏条件,则把改字段改写成格式CAST(脱敏函数(字段名) AS 字段类型) AS 字段名,否则用原字段名。
  9. 针对步骤 8 处理后的字段,构建子查询语句,形如 (SELECT 字段名1, 字段名2, CAST(脱敏函数(字段名3) AS 字段类型) AS 字段名3、字段名4 FROM 表名) AS 表别名
  10. 对步骤 9 的子查询调用CalciteParser.parseExpression()进行解析,生成自定义 SELECT 语句,并替换掉原 FROM。
  11. 继续遍历 AST,找到里面的 SELECT 语句进行处理,跳回到步骤 1。

3.2.3 Hive及Ranger兼容性

在 Ranger 中,默认的脱敏策略的如下所示。通过调研发现 Ranger 的大部分脱敏策略是通过调用 Hive 自带或自定义的系统函数实现的。

序号 策略名 策略说明 Hive系统函数
1 Redact 用x屏蔽字母字符,用n屏蔽数字字符 mask
2 Partial mask: show last 4 仅显示最后四个字符,其他用x代替 mask_show_last_n
3 Partial mask: show first 4 仅显示前四个字符,其他用x代替 mask_show_first_n
4 Hash 用值的哈希值替换原值 mask_hash
5 Nullify 用NULL值替换原值 Ranger自身实现
6 Unmasked 原样显示 Ranger自身实现
7 Date: show only year 仅显示日期字符串的年份 mask
8 Custom Hive UDF来自定义策略

由于 Flink 支持 Hive Catalog,在 Flink 能调用 Hive 系统函数。 因此,本方案也支持在 Flink SQL 配置 Ranger 的脱敏策略。

四、用例测试

用例测试数据来自于 CDC Connectors for Apache Flink【4】官网,本文给orders表增加一个 region 字段,同时增加'connector'='print'类型的 print_sink 表,其字段和orders表的一样。

下载源码后,可通过 Maven 运行单元测试。

$ cd flink-sql-security
$ mvn test

详细测试用例可查看源码中的单测RewriteDataMaskTestExecuteDataMaskTest,下面只描述两个案例。

4.1 测试 SELECT

4.1.1 输入 SQL

用户 A 执行下述 SQL:

SELECT order_id, customer_name, product_id, region FROM orders

4.1.2 根据脱敏条件重新生成SQL

  1. 输入 SQL 是一个简单 SELECT 语句,其 FROM 类型是SqlIdentifier,由于没有定义别名,用表名orders作为别名。
  2. 由于用户A针对字段customer_name定义脱敏条件 MASK(对应函数是脱敏函数是mask),该字段在流程图中的步骤 8 中被改写为CAST(mask(customer_name) AS STRING) AS customer_name,其余字段未定义脱敏条件则保持不变。
  3. 然后在步骤 9 的操作中,表名orders被改写成如下子查询,子查询两侧用括号()进行包裹,并且用 AS 别名来增加表别名。
(SELECT
     order_id,
     order_date,
     CAST(mask(customer_name) AS STRING) AS customer_name,
     product_id,
     price,
     order_status,
     region
FROM 
    orders
) AS orders

4.1.3 输出 SQL 和运行结果

最终执行的改写后SQL如下所示,这样用户A查询到的顾客姓名customer_name字段都是掩盖后的数据。

SELECT
    order_id,
    customer_name,
    product_id,
    region
FROM (
    SELECT 
         order_id,
         order_date,
         CAST(mask(customer_name) AS STRING) AS customer_name,
         product_id,
         price,
         order_status,
         region
    FROM 
         orders
     ) AS orders

4.2 测试 INSERT-SELECT

4.2.1 输入 SQL

用户 A 执行下述 SQL:

INSERT INTO print_sink SELECT * FROM orders

4.2.2 根据脱敏条件重新生成 SQL

通过自定义 Calcite DataMaskVisitor 访问生成的 AST,能找到对应的 SELECT 语句是SELECT order_id, customer_name, product_id, region FROM orders

针对此 SELECT 语句的改写逻辑同上,不再阐述。

4.2.3 输出 SQL 和运行结果

最终执行的改写后 SQL 如下所示,注意插入到print_sink表的customer_name字段是掩盖后的数据。

INSERT INTO print_sink (
    SELECT 
        * 
    FROM (
        SELECT 
            order_id, 
            order_date, 
            CAST(mask(customer_name) AS STRING) AS customer_name, 
            product_id, 
            price, 
            order_status, 
            region 
        FROM 
            orders
    ) AS orders
)

五、参考文献

  1. Apache Ranger Column Masking in Hive
  2. FlinkSQL字段血缘解决方案及源码
  3. 从SQL语句中解析出源表和结果表
  4. HiveQL—数据脱敏函数

查看更多技术内容


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
486 43
|
1月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
166 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
2月前
|
SQL
SQL如何只让特定列中只显示一行数据
SQL如何只让特定列中只显示一行数据
|
5月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
629 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
12天前
|
存储 JSON 数据处理
Flink基于Paimon的实时湖仓解决方案的演进
本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
110 0
Flink基于Paimon的实时湖仓解决方案的演进
|
6月前
|
SQL 自然语言处理 数据库
【Azure Developer】分享两段Python代码处理表格(CSV格式)数据 : 根据每列的内容生成SQL语句
本文介绍了使用Python Pandas处理数据收集任务中格式不统一的问题。针对两种情况:服务名对应多人拥有状态(1/0表示),以及服务名与人名重复列的情况,分别采用双层for循环和字典数据结构实现数据转换,最终生成Name对应的Services列表(逗号分隔)。此方法高效解决大量数据的人工处理难题,减少错误并提升效率。文中附带代码示例及执行结果截图,便于理解和实践。
163 4
|
1月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
986 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
2月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
222 1
京东零售基于Flink的推荐系统智能数据体系
|
2月前
|
SQL
SQL中如何删除指定查询出来的数据
SQL中如何删除指定查询出来的数据
|
2月前
|
SQL 关系型数据库 MySQL
SQL如何对不同表的数据进行更新
本文介绍了如何将表A的Col1数据更新到表B的Col1中,分别提供了Microsoft SQL和MySQL的实现方法,并探讨了多表合并后更新的优化方式,如使用MERGE语句提升效率。适用于数据库数据同步与批量更新场景。

相关产品

  • 实时计算 Flink版