在Cloudera Streaming Analytics中,可以将Flink与Apache Atlas一起使用,以跟踪Flink作业的输入和输出数据。
Atlas是沿袭和元数据管理解决方案,在Cloudera Data Platform上受支持。这意味着可以查找,组织和管理有关Flink应用程序以及它们如何相互关联的数据的不同资产。这实现了一系列数据管理和法规遵从性用例。
有关Atlas的更多信息,请参阅Cloudera Runtime文档。
Flink元数据集合中的Atlas实体
在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。
为Flink创建Atlas实体类型定义
在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。
验证元数据收集
启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。
Flink元数据集合中的Atlas实体
在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。
在向Atlas提交更新时,Flink应用程序会描述自身以及用作源和接收器的实体。Atlas创建并更新相应的实体,并从收集到的和已经可用的实体创建沿袭。在内部,Flink客户端和Atlas服务器之间的通信是使用Kafka主题实现的。该解决方案被Atlas社区称为Flink挂钩。
为Flink创建Atlas实体类型定义
在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。
默认情况下,Atlas不包括Flink的元数据源。管理员必须手动将实体类型定义上载到群集,才能启动Flink元数据收集。
注意:
启用或禁用TLS时,Atlas管理服务器的默认端口分别为31433和31000。
步骤
1.使用Atlas REST API将设计的实体类型定义上载到集群。
1. curl -k -u <atlas_admin>:<atlas_admin_pwd> --location --request POST 'https://<atlas_server_host>:<atlas_server_port>/api/atlas/v2/types/typedefs' \ 2. --header 'Content-Type: application/json' \ 3. --data-raw '{ 4. "enumDefs": [], 5. "structDefs": [], 6. "classificationDefs": [], 7. "entityDefs": [ 8. { 9. "name": "flink_application", 10. "superTypes": [ 11. "Process" 12. ], 13. "serviceType": "flink", 14. "typeVersion": "1.0", 15. "attributeDefs": [ 16. { 17. "name": "id", 18. "typeName": "string", 19. "cardinality": "SINGLE", 20. "isIndexable": true, 21. "isOptional": false, 22. "isUnique": true 23. }, 24. { 25. "name": "startTime", 26. "typeName": "date", 27. "cardinality": "SINGLE", 28. "isIndexable": false, 29. "isOptional": true, 30. "isUnique": false 31. }, 32. { 33. "name": "endTime", 34. "typeName": "date", 35. "cardinality": "SINGLE", 36. "isIndexable": false, 37. "isOptional": true, 38. "isUnique": false 39. }, 40. { 41. "name": "conf", 42. "typeName": "map<string,string>", 43. "cardinality": "SINGLE", 44. "isIndexable": false, 45. "isOptional": true, 46. "isUnique": false 47. }, 48. { 49. "name": "inputs", 50. "typeName": "array<string>", 51. "cardinality": "LIST", 52. "isIndexable": false, 53. "isOptional": false, 54. "isUnique": false 55. }, 56. { 57. "name": "outputs", 58. "typeName": "array<string>", 59. "cardinality": "LIST", 60. "isIndexable": false, 61. "isOptional": false, 62. "isUnique": false 63. } 64. ] 65. } 66. ], 67. "relationshipDefs": [] 68. }'
2.登录到Cloudera Manager。
3.转到Flink>配置。
4.在搜索栏中搜索“启用图集”。
5.启用Atlas元数据收集。
成功提交后,Flink客户端会通知Atlas有关作业的元数据。
验证元数据收集
启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。
要验证元数据集合,可以从“运行Flink作业”中运行“流式WordCount”示例。
在日志中,出现以下新行:
1. ... 2. 20/05/13 06:28:12 INFO hook.FlinkAtlasHook: Collecting metadata for a new Flink Application: Streaming WordCount 3. ...
Flink通过Kafka主题与Atlas通信,默认情况下,该主题名为ATLAS_HOOK。