flink kerberos认证源码剖析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 超详细讲解flink kerberos认证源码

@[toc]

01 引言

官方的文档:https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-kerberos/

我们都知道,如果某个大数据组件(如:hadoopflink等)需要使用keberos认证,都会使用kinit命令来生成票据,组件一般都会从生成票据的缓存中读取tgt,然后去访问不同的数据源。

但是,一般tgt的有效期一般为24小时,而flink的流式作业都会超过这个时间,tgt很容易就失效了,使用kinit的方式似乎不可行,因此flink也有了自己的方案,可以通过配置kerberos认证信息(如:keytabkrb5princal等)来实现票据的续签并长期使用,本文来讲讲。

02 flink的安全机制

通过阅读官方的文档,我们知道 Kerberos 用户凭证是组件之间共享的配置,每个组件会显式地使用它 ,博主理解的就是flink操作hadoop的组件以及不同connector(如:kafka、zookeeper等)的数据源连接,都可以共用同一个kerberos用户凭证,只要该用户分配了所有组件的操作权限即可。

配置示例在flink-conf.yaml看到,如下图:
image.png

那么flink是如何处理安全认证操作的呢?官网也有描述:

The internal architecture is based on security modules (implementing org.apache.flink.runtime.security.modules.SecurityModule) which are installed at startup.

意思就是,flink 安全内部架构是建立在安全模块上(实现 org.apache.flink.runtime.security.modules.SecurityModule接口),安全模块在 flink 启动过程中被安装

我们看看SecurityModule这个接口:
image.png

可以看到,实现了SecurityModule有3个模块分别是,HadoopModule、JaasModule、ZookeeperModule。

下面是我整理不同模块的描述:

模块 功能 描述
HadoopModule 该模块使用 Hadoop UserGroupInformation(UGI)类来建立进程范围的登录用户上下文 这还要为了方便与 Hadoop 组件的所有交互,包括 HDFS、HBase 和 YARN
JaasModule 该模块为集群提供动态 jaas 配置,依赖 jass 的组件可直接使用 例如zookeepr和kafka直接依赖即可。flink on yarn模式在容器创建时,会根据kerberos配置自动生成动态的jass配置
ZookeeperModule 该模块配置某些进程范围内 zooKeeper 安全相关的设置 即flink-conf配置文件里面的zookeeper 服务名称(默认为:zookeeper)和 jaas 登录上下文名称(默认为:Client)的关系就在这里体现了

好啦,接下来继续分析这几个模块的初始化的整体流程。

03 源码流程分析

下面是博主整理的源码分析流程图:
image.png

3.1 程序入口

从上述的流程图可以看到,入口是CliFrontend的main方法:
image.png

3.2 安全模块安装

接着进入SecurityUtils的install方法:
image.png

经过代码反查,可以得出程序会读取“security.module.factory.classes”配置,然后自动加载本文一开始说的3个安全模块:
image.png

ok,接下来看看各模块的安装逻辑。

3.3 模块安装源码

下面是注释好的安全模块代码描述。

首先是HadoopModule:

/**
 * HadoopModule 是Flink的安全模块之一,用于配置和处理与Hadoop安全性相关的任务。
 *
 * @author : YangLinWei
 * @createTime: 2023/9/7 17:04
 * @version: 1.0.0
 */
public class HadoopModule implements SecurityModule {
   
   

    private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);

    private final SecurityConfiguration securityConfig; // Flink安全配置

    private final Configuration hadoopConfiguration; // Hadoop配置

    public HadoopModule(
            SecurityConfiguration securityConfiguration, Configuration hadoopConfiguration) {
   
   
        this.securityConfig = checkNotNull(securityConfiguration);
        this.hadoopConfiguration = checkNotNull(hadoopConfiguration);
    }

    @VisibleForTesting
    public SecurityConfiguration getSecurityConfig() {
   
   
        return securityConfig;
    }

    @Override
    public void install() throws SecurityInstallException {
   
   

        // 注入Hadoop的配置信息
        UserGroupInformation.setConfiguration(hadoopConfiguration);

        UserGroupInformation loginUser;

        try {
   
   
            // 如果Hadoop启用了kerberos认证,并且配置了Keytab和Principal,那么使用Keytab进行登录
            if (UserGroupInformation.isSecurityEnabled()
                    && !StringUtils.isBlank(securityConfig.getKeytab())
                    && !StringUtils.isBlank(securityConfig.getPrincipal())) {
   
   
                String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();

                UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);

                loginUser = UserGroupInformation.getLoginUser();

                // 补充使用可用的令牌
                String fileLocation =
                        System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);

                if (fileLocation != null) {
   
   
                    Credentials credentialsFromTokenStorageFile =
                            Credentials.readTokenStorageFile(
                                    new File(fileLocation), hadoopConfiguration);


                    // 因为UGI更喜欢委托令牌,它最终会过期并且不会回退到使用Kerberos票据
                    Credentials credentialsToBeAdded = new Credentials();
                    final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
                    Collection<Token<? extends TokenIdentifier>> usrTok =
                            credentialsFromTokenStorageFile.getAllTokens();

                    // 如果UGI使用Kerberos Keytab登录,不加载HDFS委托令牌,
                    for (Token<? extends TokenIdentifier> token : usrTok) {
   
   
                        if (!token.getKind().equals(hdfsDelegationTokenKind)) {
   
   
                            final Text id = new Text(token.getIdentifier());
                            credentialsToBeAdded.addToken(id, token);
                        }
                    }

                    loginUser.addCredentials(credentialsToBeAdded);
                }
            } else {
   
   
                // 使用当前用户的凭据登录(例如,票证缓存,操作系统登录)
                // 注意,存储的令牌会自动读取
                try {
   
   
                    // 使用反射API获取登录用户对象
                    Method loginUserFromSubjectMethod =
                            UserGroupInformation.class.getMethod(
                                    "loginUserFromSubject", Subject.class);
                    loginUserFromSubjectMethod.invoke(null, (Subject) null);
                } catch (NoSuchMethodException e) {
   
   
                    LOG.warn("Could not find method implementations in the shaded jar.", e);
                } catch (InvocationTargetException e) {
   
   
                    throw e.getTargetException();
                }

                loginUser = UserGroupInformation.getLoginUser();
            }

            LOG.info("Hadoop user set to {}", loginUser);

            if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
   
   
                boolean isCredentialsConfigured =
                        HadoopUtils.areKerberosCredentialsValid(
                                loginUser, securityConfig.useTicketCache());

                LOG.info(
                        "Kerberos security is enabled and credentials are {}.",
                        isCredentialsConfigured ? "valid" : "invalid");
            }
        } catch (Throwable ex) {
   
   
            throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
        }
    }

    @Override
    public void uninstall() {
   
   
        throw new UnsupportedOperationException();
    }
}

接着是JassModule:

/**
 * 负责安装全局的JAAS(Java Authentication and Authorization Service)配置。
 *
 * <p>安装的配置结合了以下因素的登录模块:
 * - 用户提供的JAAS配置文件(如果有)
 * - Kerberos keytab(如果配置了)
 * - 当前环境中的任何缓存的Kerberos凭证
 *
 * <p>该模块还安装了一个默认的JAAS配置文件(如果需要),以确保与ZooKeeper和Kafka的兼容性。注意,JRE实际上会查找多个文件位置。
 * 参考链接:
 * https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
 * https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
 *
 * @author : YangLinWei
 * @createTime: 2023/9/7 17:10
 * @version: 1.0.0
 */
@Internal
public class JaasModule implements SecurityModule {
   
   

    private static final Logger LOG = LoggerFactory.getLogger(JaasModule.class);

    static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";

    static final String JAAS_CONF_RESOURCE_NAME = "flink-jaas.conf";

    private final SecurityConfiguration securityConfig; // Flink安全配置

    private String priorConfigFile;  // 先前的JAAS配置文件
    private javax.security.auth.login.Configuration priorConfig;  // 先前的JAAS配置

    private DynamicConfiguration currentConfig; // 当前的JAAS配置

    /** JAAS文件将安装到的工作目录。 */
    private final String workingDir; // JAAS文件的工作目录

    public JaasModule(SecurityConfiguration securityConfig) {
   
   
        this.securityConfig = checkNotNull(securityConfig);
        String[] dirs = splitPaths(securityConfig.getFlinkConfig().getString(CoreOptions.TMP_DIRS));
        /// 至少应该有一个目录。
        checkState(dirs.length > 0);
        this.workingDir = dirs[0];
    }

    @Override
    public void install() {
   
   

        // 确保始终定义了配置文件,以与ZooKeeper和Kafka兼容,它们检查系统属性和文件的存在性。
        priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
        if (priorConfigFile == null) {
   
   
            File configFile = generateDefaultConfigFile(workingDir);
            System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
            LOG.info("Jaas file will be created as {}.", configFile);
        }

        // 读取JAAS配置文件
        priorConfig = javax.security.auth.login.Configuration.getConfiguration();

        // 构建动态的JAAS配置
        currentConfig = new DynamicConfiguration(priorConfig);

        // 将配置的JAAS登录上下文与krb5条目关联起来
        AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
        if (krb5Entries != null) {
   
   
            for (String app : securityConfig.getLoginContextNames()) {
   
   
                currentConfig.addAppConfigurationEntry(app, krb5Entries);
            }
        }

        javax.security.auth.login.Configuration.setConfiguration(currentConfig);
    }

    @Override
    public void uninstall() throws SecurityInstallException {
   
   
        if (priorConfigFile != null) {
   
   
            System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
        } else {
   
   
            System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
        }
        javax.security.auth.login.Configuration.setConfiguration(priorConfig);
    }

    public DynamicConfiguration getCurrentConfiguration() {
   
   
        return currentConfig;
    }

    private static AppConfigurationEntry[] getAppConfigurationEntries(
            SecurityConfiguration securityConfig) {
   
   

        AppConfigurationEntry userKerberosAce = null;
        if (securityConfig.useTicketCache()) {
   
   
            userKerberosAce = KerberosUtils.ticketCacheEntry();
        }
        AppConfigurationEntry keytabKerberosAce = null;
        if (securityConfig.getKeytab() != null) {
   
   
            keytabKerberosAce =
                    KerberosUtils.keytabEntry(
                            securityConfig.getKeytab(), securityConfig.getPrincipal());
        }

        AppConfigurationEntry[] appConfigurationEntry;
        if (userKerberosAce != null && keytabKerberosAce != null) {
   
   
            appConfigurationEntry =
                    new AppConfigurationEntry[]{
   
   keytabKerberosAce, userKerberosAce};
        } else if (keytabKerberosAce != null) {
   
   
            appConfigurationEntry = new AppConfigurationEntry[]{
   
   keytabKerberosAce};
        } else if (userKerberosAce != null) {
   
   
            appConfigurationEntry = new AppConfigurationEntry[]{
   
   userKerberosAce};
        } else {
   
   
            return null;
        }

        return appConfigurationEntry;
    }

    /** 生成默认的JAAS配置文件。 */
    private static File generateDefaultConfigFile(String workingDir) {
   
   
        checkArgument(workingDir != null, "working directory should not be null.");
        final File jaasConfFile;
        try {
   
   
            Path path = Paths.get(workingDir);
            if (Files.notExists(path)) {
   
   
                // We intentionally favored Path.toRealPath over Files.readSymbolicLinks as the
                // latter one might return a
                // relative path if the symbolic link refers to it. Path.toRealPath resolves the
                // relative path instead.
                Path parent = path.getParent().toRealPath();
                Path resolvedPath = Paths.get(parent.toString(), path.getFileName().toString());

                path = Files.createDirectories(resolvedPath);
            }
            Path jaasConfPath = Files.createTempFile(path, "jaas-", ".conf");
            try (InputStream resourceStream =
                         JaasModule.class
                                 .getClassLoader()
                                 .getResourceAsStream(JAAS_CONF_RESOURCE_NAME)) {
   
   
                Files.copy(resourceStream, jaasConfPath, StandardCopyOption.REPLACE_EXISTING);
            }
            jaasConfFile = new File(workingDir, jaasConfPath.getFileName().toString());
            jaasConfFile.deleteOnExit();
        } catch (IOException e) {
   
   
            throw new RuntimeException("unable to generate a JAAS configuration file", e);
        }
        return jaasConfFile;
    }
}

最后是ZookeeperModule:

/**
 * 这个类负责处理ZooKeeper的安全配置,安装全局的ZooKeeper安全配置以支持SASL认证。
 * 它会根据安全配置的设置,设置相应的系统属性,包括是否启用SASL、ZooKeeper服务名称以及登录上下文名称等
 *
 * @author : YangLinWei
 * @createTime: 2023/9/7 17:15
 * @version: 1.0.0
 */
public class ZooKeeperModule implements SecurityModule {
   
   

    private static final String ZOOKEEPER_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";

    /** 用于设置ZooKeeper是否使用SASL的系统属性。 */
    private static final String ZK_ENABLE_CLIENT_SASL = "zookeeper.sasl.client";

    /** 用于设置期望的ZooKeeper服务名称的系统属性。 */
    private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";

    /** 用于设置要使用的登录上下文名称的系统属性。 */
    private static final String ZK_LOGIN_CONTEXT_NAME = "zookeeper.sasl.clientconfig";

    private final SecurityConfiguration securityConfig; // Flink安全配置

    private String priorSaslEnable; // 先前的ZooKeeper SASL启用状态

    private String priorServiceName; // 先前的ZooKeeper服务名称

    private String priorLoginContextName; // 先前的登录上下文名称

    public ZooKeeperModule(SecurityConfiguration securityConfig) {
   
   
        this.securityConfig = checkNotNull(securityConfig);
    }

    @Override
    public void install() throws SecurityInstallException {
   
   

        priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
        System.setProperty(
                ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));

        priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
        if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) {
   
   
            System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
        }

        priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
        if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) {
   
   
            System.setProperty(
                    ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
        }
    }

    @Override
    public void uninstall() throws SecurityInstallException {
   
   
        if (priorSaslEnable != null) {
   
   
            System.setProperty(ZK_ENABLE_CLIENT_SASL, priorSaslEnable);
        } else {
   
   
            System.clearProperty(ZK_ENABLE_CLIENT_SASL);
        }
        if (priorServiceName != null) {
   
   
            System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
        } else {
   
   
            System.clearProperty(ZK_SASL_CLIENT_USERNAME);
        }
        if (priorLoginContextName != null) {
   
   
            System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
        } else {
   
   
            System.clearProperty(ZK_LOGIN_CONTEXT_NAME);
        }
    }
}

04 文末

本文主要讲解了flink kerberos的认证原理以及代码流程分析,希望能帮助到大家,谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7月前
|
SQL 算法 API
读Flink源码谈设计:图的抽象与分层
前阵子组里的小伙伴问我“为什么Flink从我们的代码到真正可执行的状态,要经过这么多个graph转换?这样做有什么好处嘛?”我早期看到这里的设计时的确有过相同的疑惑,当时由于手里还在看别的东西,查阅过一些资料后就翻页了。如今又碰到了这样的问题,不妨就在这篇文章中好好搞清楚。
562 0
读Flink源码谈设计:图的抽象与分层
|
7月前
|
存储 消息中间件 缓存
读Flink源码谈设计:有效管理内存之道
在最初接触到Flink时,是来自于业界里一些头部玩家的分享——大家会用其来处理海量数据。在这种场景下,`如何避免JVM GC带来StopTheWorld带来的副作用`这样的问题一直盘绕在我心头。直到用了Flink以后,阅读了相关的源码(以1.14.0为基准),终于有了一些答案。在这篇文章里也是会分享给大家。
585 1
|
7月前
|
流计算
Flink源码解析
Flink源码解析
106 0
|
4月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何使用Flink SQL连接带有Kerberos认证的Hive
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
Oracle 关系型数据库 Java
实时计算 Flink版产品使用问题之源码 deploy,生成带有时间戳的jar包,如何修改配置信息
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
存储 SQL API
读Flink源码谈设计:流批一体的实现与现状
在Dataflow相关的论文发表前,大家都往往认为需要两套API来实现流计算和批计算,典型的实现便是Lambda架构。
629 0
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1060 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
23天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
139 56