深度解读flink kerberos认证(含流程图及源码)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 深度解读flink kerberos认证(含流程图及源码)

01 引言

博主前面写过一篇文章(《flink kerberos认证源码剖析》)主要是讲解了flink kerberos的认证原理以及代码流程分析。

可以知道,flink只要做一次kerberos的用户认证,就完美了对接connector所有涉及kerberos认证的组件,如:kafkazookeeperhadoop组件等。

本文以提交作业至一个有认证的hadoop环境,执行有认证的kafka实时同步到mysql的flink作业为例子讲解。

02 流程分析

下面展示的是使用命令来部署一个作业(kafka->mysql)至yarn的流程图,如下:

上述流程图主要描述的是使用flink 命令来提交一个作业至yarn的整个流程:

  • Step1:首先用户需要手动上传flink程序运行在yarn的时候所需的所有jar包(即:flink sdk lib目录里所有的jar);
  • Step2:在提交作业至yarn之前,需要在环境变量里配置好hadoop相关的环境变量,主要是声明hadoopconf目录、hadooplib目录的路径,flink客户端可以从环境变量读取这些信息。同时,也需要在flink-conf.yaml配置安全模块
  • Step3:在配置完上述的内容之后,flink客户端接受到run的命令,首先会初始化各个安全模块(主要是做kerberos认证),然后会自动上传kerberos相关的配置文件(如:krb5.confuser.keytab等)自动上传至hdfs的某个临时缓存目录;
  • Step4yarn接收到请求时,会分配资源并创建Yarn容器,用来运行jobmanagertaskmanager
  • Step5:容器里面的jobmanager创建前,会自动下载hdfsflink程序运行时依赖的库到本地,同时也会下载kerberos配置文件,跟前面提交时的步骤一样,即自动初始化安全模块,并自动做了kerberos认证的处理,。
  • Step6:此时,flink使用的connector就能直接从做好认证的ugi或者jaas文件里获取用户票据;
  • Step7:最后就能做flink就能在这样一个“已认证”的环境里做任何想做的事了。

ok,这里应该有个分界线,这里备注一下对应源码或截图,有兴趣的童鞋可以阅读。


Q1: flink底层是在哪里自动上传配置至hdfs的

对应的源码位置在:org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster,由于篇幅原因,这里不再讲解前后的代码,其实入口是在org.apache.flink.client.cli.CliFrontend#main,最终上传的代码在org.apache.flink.yarn.YarnApplicationFileUploader#copyToRemoteApplicationDir


Q2: 如何从已认证的环境,从ugi获取kerberos用户票据?

直接UserGroupInformation.getCurrentUser() 即可,flink已经自动帮我们做用户认证了,可以通过如下方式获取:


Q3: 如何从已认证的环境,获取jass文件?

在Yarn容器里运行程序前,flink已经自动帮我们创建好jass.conf文件了,并设置进了启动参数-Djava.security.auth.login.config,下面试部分的日志截图:

03 核心配置

从前面的内容,可以知道如果要使用命令提交一个flink 作业到yarn,且要完成kerberos认证,需要配置的内容有:

  • hadoop环境:配置文件、客户端jar包路径;
  • flink-conf.yaml:主要配置安全部分;
  • 连接器kerberos认证相关配置:如:kafka connector。

3.1 hadoop环境配置

flink提交作业时,会自动从环境变量获取hadoop相关的配置,获取配置的源码在:

org.apache.flink.runtime.util.HadoopUtils#getHadoopConfiguration,截图如下:

涉及的方法org.apache.flink.yarn.YarnClusterDescriptor#isReadyForDeployment,截图如下:

以下是配置hadoop环境变量的环境变量配置示例(如果是Linux系统可以直接在~/.bash_profile里添加,根据自己不同的系统去配置):

HADOOP_HOME=/data/hadoop-2.7.7
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/*

3.2 flink-conf.yaml

详细的配置官网地址:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#auth-with-external-systems

以下是flink-conf.yaml的配置描述:

key值 描述
security.kerberos.access.hadoopFileSystems 用于指定 Flink 将要访问的Kerberos安全的Hadoop文件系统的列表。例如,security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002;hdfs://namenode3:9003
security.kerberos.login.contexts 一个逗号分隔的登录上下文列表,用于提供Kerberos凭据(例如,Client,KafkaClient 用于ZooKeeper身份验证和Kafka身份验证的凭据)。
security.kerberos.login.keytab 用户凭据的Kerberos keytab文件的绝对路径
security.kerberos.login.principal 与keytab关联的Kerberos主体名称
security.kerberos.login.use-ticket-cache 是否从Kerberos票证缓存中读取。如果使用kinit的方式获取票据,这里如果设置为true,则直接从kinit缓存票据的位置获取
security.kerberos.relogin.period 用于控制Kerberos重新登录的时间周期,默认1分钟

示例如下:

security.kerberos.login.contexts: Client,KafkaClient
security.kerberos.login.keytab: /etc/user.keytab 
security.kerberos.login.principal: user@HADOOP.COM 
security.kerberos.login.use-ticket-cache: false

3.3 连接器相关的配置

flink程序在yarn容器里面运行时,对于引用的kafka connector来说,它“只知道”jass.conf,但是还有很多其它的配置项,如:security.protocol、sasl.kerberos.service.name等等。这些该如何配置呢?

通过阅读flink-connector-kafka的源码,我们发现可以在properties.*这个参数里写入我们想要的配置,这个参数在源码的org.apache.flink.streaming.connectors.kafka.table.KafkaOptions#getKafkaProperties

本文最终的示例代码如下:

-- 创建kafka源表
CREATE TABLE table_source_kafka (
  name STRING,
  id INT
)
WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name' = 'kafka',
    'properties.sasl.mechanism' = 'GSSAPI',
    'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
    'topic' = 'topic_demo',
  'scan.startup.mode' = 'latest-offset',
  'value.format' = 'json'
);
-- 创建mysql目标表
CREATE TABLE table_sink_mysql (
  name STRING,
  id INT,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://127.0.0.1:3306/db_name', 
  'username' = 'root',
  'password' = '123456', 
  'table-name' = 't_demo'
);
-- 实时同步kafka数据到mysql
INSERT INTO table_sink_mysql (SELECT * FROM table_source_kafka);

04 文末

本文主要讲解了flink kerberos认证的完整流程,并贴上相应的源码,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4月前
|
流计算
Flink源码解析
Flink源码解析
59 0
|
4月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
540 0
读Flink源码谈设计:图的抽象与分层
|
4月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
541 1
|
4月前
|
存储 SQL API
读Flink源码谈设计:流批一体的实现与现状
在Dataflow相关的论文发表前,大家都往往认为需要两套API来实现流计算和批计算,典型的实现便是Lambda架构。
574 0
|
4月前
|
存储 算法 Java
读Flink源码谈设计:Exactly Once
将Flink应用至生产已有一段时间,刚上生产的时候有幸排查过因数据倾斜引起的Checkpoint超时问题——当时简单的了解了相关机制,最近正好在读Flink源码,不如趁这个机会搞清楚。 在这里,我们首先要搞清楚两种Exactly-Once的区别: - Exactly Once:在计算引擎内部,数据不丢失不重复。本质是通过Flink开启检查点进行Barrier对齐,即可做到。 - End to End Exactly Once:这意味着从数据读取、引擎处理到写入外部存储的整个过程中,数据都是不丢失不重复的。这要求数据源可重放,写入端支持事务的恢复和回滚或幂等。
535 0
|
4月前
|
监控 Java 流计算
读Flink源码谈设计:Metric
前阵子笔者涉及了些许监控相关的开发工作,在开发过程中也碰到过些许问题,便翻读了Flink相关部分的代码,在读代码的过程中发现了一些好的设计,因此也是写成文章整理上来。
363 0
读Flink源码谈设计:Metric
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
483 5
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1356 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
152 0