深度解读flink kerberos认证(含流程图及源码)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 深度解读flink kerberos认证(含流程图及源码)

01 引言

博主前面写过一篇文章(《flink kerberos认证源码剖析》)主要是讲解了flink kerberos的认证原理以及代码流程分析。

可以知道,flink只要做一次kerberos的用户认证,就完美了对接connector所有涉及kerberos认证的组件,如:kafkazookeeperhadoop组件等。

本文以提交作业至一个有认证的hadoop环境,执行有认证的kafka实时同步到mysql的flink作业为例子讲解。

02 流程分析

下面展示的是使用命令来部署一个作业(kafka->mysql)至yarn的流程图,如下:

上述流程图主要描述的是使用flink 命令来提交一个作业至yarn的整个流程:

  • Step1:首先用户需要手动上传flink程序运行在yarn的时候所需的所有jar包(即:flink sdk lib目录里所有的jar);
  • Step2:在提交作业至yarn之前,需要在环境变量里配置好hadoop相关的环境变量,主要是声明hadoopconf目录、hadooplib目录的路径,flink客户端可以从环境变量读取这些信息。同时,也需要在flink-conf.yaml配置安全模块
  • Step3:在配置完上述的内容之后,flink客户端接受到run的命令,首先会初始化各个安全模块(主要是做kerberos认证),然后会自动上传kerberos相关的配置文件(如:krb5.confuser.keytab等)自动上传至hdfs的某个临时缓存目录;
  • Step4yarn接收到请求时,会分配资源并创建Yarn容器,用来运行jobmanagertaskmanager
  • Step5:容器里面的jobmanager创建前,会自动下载hdfsflink程序运行时依赖的库到本地,同时也会下载kerberos配置文件,跟前面提交时的步骤一样,即自动初始化安全模块,并自动做了kerberos认证的处理,。
  • Step6:此时,flink使用的connector就能直接从做好认证的ugi或者jaas文件里获取用户票据;
  • Step7:最后就能做flink就能在这样一个“已认证”的环境里做任何想做的事了。

ok,这里应该有个分界线,这里备注一下对应源码或截图,有兴趣的童鞋可以阅读。


Q1: flink底层是在哪里自动上传配置至hdfs的

对应的源码位置在:org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster,由于篇幅原因,这里不再讲解前后的代码,其实入口是在org.apache.flink.client.cli.CliFrontend#main,最终上传的代码在org.apache.flink.yarn.YarnApplicationFileUploader#copyToRemoteApplicationDir


Q2: 如何从已认证的环境,从ugi获取kerberos用户票据?

直接UserGroupInformation.getCurrentUser() 即可,flink已经自动帮我们做用户认证了,可以通过如下方式获取:


Q3: 如何从已认证的环境,获取jass文件?

在Yarn容器里运行程序前,flink已经自动帮我们创建好jass.conf文件了,并设置进了启动参数-Djava.security.auth.login.config,下面试部分的日志截图:

03 核心配置

从前面的内容,可以知道如果要使用命令提交一个flink 作业到yarn,且要完成kerberos认证,需要配置的内容有:

  • hadoop环境:配置文件、客户端jar包路径;
  • flink-conf.yaml:主要配置安全部分;
  • 连接器kerberos认证相关配置:如:kafka connector。

3.1 hadoop环境配置

flink提交作业时,会自动从环境变量获取hadoop相关的配置,获取配置的源码在:

org.apache.flink.runtime.util.HadoopUtils#getHadoopConfiguration,截图如下:

涉及的方法org.apache.flink.yarn.YarnClusterDescriptor#isReadyForDeployment,截图如下:

以下是配置hadoop环境变量的环境变量配置示例(如果是Linux系统可以直接在~/.bash_profile里添加,根据自己不同的系统去配置):

HADOOP_HOME=/data/hadoop-2.7.7
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/*

3.2 flink-conf.yaml

详细的配置官网地址:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#auth-with-external-systems

以下是flink-conf.yaml的配置描述:

key值 描述
security.kerberos.access.hadoopFileSystems 用于指定 Flink 将要访问的Kerberos安全的Hadoop文件系统的列表。例如,security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002;hdfs://namenode3:9003
security.kerberos.login.contexts 一个逗号分隔的登录上下文列表,用于提供Kerberos凭据(例如,Client,KafkaClient 用于ZooKeeper身份验证和Kafka身份验证的凭据)。
security.kerberos.login.keytab 用户凭据的Kerberos keytab文件的绝对路径
security.kerberos.login.principal 与keytab关联的Kerberos主体名称
security.kerberos.login.use-ticket-cache 是否从Kerberos票证缓存中读取。如果使用kinit的方式获取票据,这里如果设置为true,则直接从kinit缓存票据的位置获取
security.kerberos.relogin.period 用于控制Kerberos重新登录的时间周期,默认1分钟

示例如下:

security.kerberos.login.contexts: Client,KafkaClient
security.kerberos.login.keytab: /etc/user.keytab 
security.kerberos.login.principal: user@HADOOP.COM 
security.kerberos.login.use-ticket-cache: false

3.3 连接器相关的配置

flink程序在yarn容器里面运行时,对于引用的kafka connector来说,它“只知道”jass.conf,但是还有很多其它的配置项,如:security.protocol、sasl.kerberos.service.name等等。这些该如何配置呢?

通过阅读flink-connector-kafka的源码,我们发现可以在properties.*这个参数里写入我们想要的配置,这个参数在源码的org.apache.flink.streaming.connectors.kafka.table.KafkaOptions#getKafkaProperties

本文最终的示例代码如下:

-- 创建kafka源表
CREATE TABLE table_source_kafka (
  name STRING,
  id INT
)
WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name' = 'kafka',
    'properties.sasl.mechanism' = 'GSSAPI',
    'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
    'topic' = 'topic_demo',
  'scan.startup.mode' = 'latest-offset',
  'value.format' = 'json'
);
-- 创建mysql目标表
CREATE TABLE table_sink_mysql (
  name STRING,
  id INT,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://127.0.0.1:3306/db_name', 
  'username' = 'root',
  'password' = '123456', 
  'table-name' = 't_demo'
);
-- 实时同步kafka数据到mysql
INSERT INTO table_sink_mysql (SELECT * FROM table_source_kafka);

04 文末

本文主要讲解了flink kerberos认证的完整流程,并贴上相应的源码,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
557 0
读Flink源码谈设计:图的抽象与分层
|
6月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
582 1
|
6月前
|
流计算
Flink源码解析
Flink源码解析
96 0
|
3月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何使用Flink SQL连接带有Kerberos认证的Hive
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之源码 deploy,生成带有时间戳的jar包,如何修改配置信息
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
存储 SQL API
读Flink源码谈设计:流批一体的实现与现状
在Dataflow相关的论文发表前,大家都往往认为需要两套API来实现流计算和批计算,典型的实现便是Lambda架构。
619 0
|
Web App开发 监控 API
Flink技术源码解析(一):Flink概述与源码研读准备
一、前言 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理领域的一颗炙手可热的新星。关于Flink与其它主流实时大数据处理引擎Storm、Spark Streaming的不同与优势,可参考https://blog.csdn.net/cm_chenmin/article/details/53072498。 出于技术人对技术本能的好奇与冲动,
32267 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
15天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
679 10
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎