Flink1.12支持对接Atlas【使用Atlas收集Flink元数据】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink1.12支持对接Atlas【使用Atlas收集Flink元数据】

在Cloudera Streaming Analytics中,可以将Flink与Apache Atlas一起使用,以跟踪Flink作业的输入和输出数据。


Atlas是沿袭和元数据管理解决方案,在Cloudera Data Platform上受支持。这意味着可以查找,组织和管理有关Flink应用程序以及它们如何相互关联的数据的不同资产。这实现了一系列数据管理和法规遵从性用例。


有关Atlas的更多信息,请参阅Cloudera Runtime文档


Flink元数据集合中的Atlas实体

在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。


为Flink创建Atlas实体类型定义

在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。


验证元数据收集

启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。


Flink元数据集合中的Atlas实体



在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。


在向Atlas提交更新时,Flink应用程序会描述自身以及用作源和接收器的实体。Atlas创建并更新相应的实体,并从收集到的和已经可用的实体创建沿袭。在内部,Flink客户端和Atlas服务器之间的通信是使用Kafka主题实现的。该解决方案被Atlas社区称为Flink挂钩。


c5690f5def9a8f9812e893f71faad408.png

为Flink创建Atlas实体类型定义


在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。


默认情况下,Atlas不包括Flink的元数据源。管理员必须手动将实体类型定义上载到群集,才能启动Flink元数据收集。


注意:

启用或禁用TLS时,Atlas管理服务器的默认端口分别为31433和31000。


步骤

1.使用Atlas REST API将设计的实体类型定义上载到集群。

1. curl -k -u <atlas_admin>:<atlas_admin_pwd> --location --request POST 'https://<atlas_server_host>:<atlas_server_port>/api/atlas/v2/types/typedefs' \
2. --header 'Content-Type: application/json' \
3. --data-raw '{
4.     "enumDefs": [],
5.     "structDefs": [],
6.     "classificationDefs": [],
7.     "entityDefs": [
8.         {
9.             "name": "flink_application",
10.             "superTypes": [
11.                 "Process"
12.             ],
13.             "serviceType": "flink",
14.             "typeVersion": "1.0",
15.             "attributeDefs": [
16.                 {
17.                     "name": "id",
18.                     "typeName": "string",
19.                     "cardinality": "SINGLE",
20.                     "isIndexable": true,
21.                     "isOptional": false,
22.                     "isUnique": true
23.                 },
24.                 {
25.                     "name": "startTime",
26.                     "typeName": "date",
27.                     "cardinality": "SINGLE",
28.                     "isIndexable": false,
29.                     "isOptional": true,
30.                     "isUnique": false
31.                 },
32.                 {
33.                     "name": "endTime",
34.                     "typeName": "date",
35.                     "cardinality": "SINGLE",
36.                     "isIndexable": false,
37.                     "isOptional": true,
38.                     "isUnique": false
39.                 },
40.                 {
41.                     "name": "conf",
42.                     "typeName": "map<string,string>",
43.                     "cardinality": "SINGLE",
44.                     "isIndexable": false,
45.                     "isOptional": true,
46.                     "isUnique": false
47.                 },
48.                 {
49.                     "name": "inputs",
50.                     "typeName": "array<string>",
51.                     "cardinality": "LIST",
52.                     "isIndexable": false,
53.                     "isOptional": false,
54.                     "isUnique": false
55.                 },
56.                 {
57.                     "name": "outputs",
58.                     "typeName": "array<string>",
59.                     "cardinality": "LIST",
60.                     "isIndexable": false,
61.                     "isOptional": false,
62.                     "isUnique": false
63.                 }
64.             ]
65.         }
66.     ],
67.     "relationshipDefs": []
68. }'

2.登录到Cloudera Manager。

3.转到Flink>配置。

4.在搜索栏中搜索“启用图集”。

5.启用Atlas元数据收集。


1ef98ce242ed8c794fcbb5f9f71881b0.png


成功提交后,Flink客户端会通知Atlas有关作业的元数据。


验证元数据收集


启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。


要验证元数据集合,可以从“运行Flink作业”中运行“流式WordCount”示例。


在日志中,出现以下新行:


1. ...
2. 20/05/13 06:28:12 INFO hook.FlinkAtlasHook: Collecting metadata for a new Flink Application: Streaming WordCount
3. ...

Flink通过Kafka主题与Atlas通信,默认情况下,该主题名为ATLAS_HOOK。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用合集之cdc对于源库数据的抽取是否存在有些元数据的改变无法处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL DataWorks 分布式数据库
实时计算 Flink版产品使用合集之如何与SQLServer实时对接
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之如何对接Oracle数据源
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 消息中间件 关系型数据库
实时计算 Flink版产品使用问题之元数据血缘可以通过什么来获取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
Kubernetes 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何通过ApacheAtlas获取元数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
Flink CDC引起的Mysql元数据锁
记一次Flink CDC引起的Mysql元数据锁事故,总结经验教训。后续在编写Flink CDC任务时,要处理好异常,避免产生长时间的元数据锁。同时出现生产问题时要及时排查,不能抱有侥幸心理。
|
6月前
|
Java atlas 网络安全
Flink CDC编译问题之编译atlas报错如何解决
Flink CDC编译指的是将Flink CDC源代码转换成可执行程序的过程,可能会涉及到依赖管理和环境配置等问题;本合集将介绍Flink CDC编译的步骤、常见错误及其解决办法,以确保顺利完成编译过程。
|
消息中间件 资源调度 关系型数据库
Flink初试——对接Kafka
Flink初试——对接Kafka
287 0
Flink初试——对接Kafka
|
SQL 调度 HIVE
flink 读取hudi 表元数据信息
flink 如何获取hudi 表的元数据信息
flink 读取hudi 表元数据信息
|
消息中间件 SQL 分布式计算
实时数据治理—当Atlas遇见Flink
实时数据治理—当Atlas遇见Flink
1038 0
实时数据治理—当Atlas遇见Flink