01 引言
博主前面写过一篇文章(《flink kerberos认证源码剖析》)主要是讲解了flink kerberos
的认证原理以及代码流程分析。
可以知道,flink只要做一次kerberos的用户认证,就完美了对接connector所有涉及kerberos认证的组件,如:kafka
、zookeeper
、hadoop
组件等。
本文以提交作业至一个有认证的hadoop环境,执行有认证的kafka实时同步到mysql的flink作业为例子讲解。
02 流程分析
下面展示的是使用命令来部署一个作业(kafka->mysql)至yarn的流程图,如下:
上述流程图主要描述的是使用flink 命令来提交一个作业至yarn的整个流程:
- Step1:首先用户需要手动上传
flink
程序运行在yarn
的时候所需的所有jar包(即:flink sdk lib
目录里所有的jar
); - Step2:在提交作业至
yarn
之前,需要在环境变量里配置好hadoop
相关的环境变量,主要是声明hadoop
的conf
目录、hadoop
的lib
目录的路径,flink
客户端可以从环境变量读取这些信息。同时,也需要在flink-conf.yaml配置安全模块; - Step3:在配置完上述的内容之后,
flink
客户端接受到run
的命令,首先会初始化各个安全模块(主要是做kerberos认证),然后会自动上传kerberos
相关的配置文件(如:krb5.conf
、user.keytab
等)自动上传至hdfs
的某个临时缓存目录; - Step4:
yarn
接收到请求时,会分配资源并创建Yarn
容器,用来运行jobmanager
和taskmanager
; - Step5:容器里面的
jobmanager
创建前,会自动下载hdfs
的flink
程序运行时依赖的库到本地,同时也会下载kerberos
配置文件,跟前面提交时的步骤一样,即自动初始化安全模块,并自动做了kerberos
认证的处理,。 - Step6:此时,
flink
使用的connector
就能直接从做好认证的ugi
或者jaas
文件里获取用户票据; - Step7:最后就能做
flink
就能在这样一个“已认证”的环境里做任何想做的事了。
ok,这里应该有个分界线,这里备注一下对应源码或截图,有兴趣的童鞋可以阅读。
Q1: flink底层是在哪里自动上传配置至hdfs的
对应的源码位置在:
org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster
,由于篇幅原因,这里不再讲解前后的代码,其实入口是在org.apache.flink.client.cli.CliFrontend#main
,最终上传的代码在org.apache.flink.yarn.YarnApplicationFileUploader#copyToRemoteApplicationDir
:
Q2: 如何从已认证的环境,从ugi获取kerberos用户票据?
直接
UserGroupInformation.getCurrentUser()
即可,flink已经自动帮我们做用户认证了,可以通过如下方式获取:
Q3: 如何从已认证的环境,获取jass文件?
在Yarn容器里运行程序前,flink已经自动帮我们创建好jass.conf文件了,并设置进了启动参数
-Djava.security.auth.login.config
,下面试部分的日志截图:
03 核心配置
从前面的内容,可以知道如果要使用命令提交一个flink 作业到yarn,且要完成kerberos认证,需要配置的内容有:
- hadoop环境:配置文件、客户端jar包路径;
- flink-conf.yaml:主要配置安全部分;
- 连接器kerberos认证相关配置:如:kafka connector。
3.1 hadoop环境配置
flink提交作业时,会自动从环境变量获取hadoop相关的配置,获取配置的源码在:org.apache.flink.runtime.util.HadoopUtils#getHadoopConfiguration
,截图如下:
涉及的方法org.apache.flink.yarn.YarnClusterDescriptor#isReadyForDeployment
,截图如下:
以下是配置hadoop环境变量的环境变量配置示例(如果是Linux系统可以直接在~/.bash_profile里添加,根据自己不同的系统去配置):
HADOOP_HOME=/data/hadoop-2.7.7
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/*
3.2 flink-conf.yaml
以下是flink-conf.yaml的配置描述:
key值 | 描述 |
---|---|
security.kerberos.access.hadoopFileSystems | 用于指定 Flink 将要访问的Kerberos安全的Hadoop文件系统的列表。例如,security.kerberos.access.hadoopFileSystems=hdfs://namenode2:9002;hdfs://namenode3:9003 |
security.kerberos.login.contexts | 一个逗号分隔的登录上下文列表,用于提供Kerberos凭据(例如,Client,KafkaClient 用于ZooKeeper身份验证和Kafka身份验证的凭据)。 |
security.kerberos.login.keytab | 用户凭据的Kerberos keytab文件的绝对路径 |
security.kerberos.login.principal | 与keytab关联的Kerberos主体名称 |
security.kerberos.login.use-ticket-cache | 是否从Kerberos票证缓存中读取。如果使用kinit的方式获取票据,这里如果设置为true,则直接从kinit缓存票据的位置获取 |
security.kerberos.relogin.period | 用于控制Kerberos重新登录的时间周期,默认1分钟 |
示例如下:
security.kerberos.login.contexts: Client,KafkaClient
security.kerberos.login.keytab: /etc/user.keytab
security.kerberos.login.principal: user@HADOOP.COM
security.kerberos.login.use-ticket-cache: false
3.3 连接器相关的配置
flink程序在yarn容器里面运行时,对于引用的kafka connector来说,它“只知道”jass.conf,但是还有很多其它的配置项,如:security.protocol、sasl.kerberos.service.name等等。这些该如何配置呢?
通过阅读flink-connector-kafka的源码,我们发现可以在properties.*
这个参数里写入我们想要的配置,这个参数在源码的org.apache.flink.streaming.connectors.kafka.table.KafkaOptions#getKafkaProperties
本文最终的示例代码如下:
-- 创建kafka源表
CREATE TABLE table_source_kafka (
name STRING,
id INT
)
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.sasl.mechanism' = 'GSSAPI',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
'topic' = 'topic_demo',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'json'
);
-- 创建mysql目标表
CREATE TABLE table_sink_mysql (
name STRING,
id INT,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/db_name',
'username' = 'root',
'password' = '123456',
'table-name' = 't_demo'
);
-- 实时同步kafka数据到mysql
INSERT INTO table_sink_mysql (SELECT * FROM table_source_kafka);
04 文末
本文主要讲解了flink kerberos
认证的完整流程,并贴上相应的源码,希望能帮助到大家,谢谢大家的阅读,本文完!