flink动态更新作业

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink动态更新作业

我们知道 Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,在作业启动后初始化了之后如果再想更新作业的配置一般有三种解决方法:

改变启动参数或者改变配置文件,重启作业,让作业能够读取到修改后的配置
通过读取配置流(需要自定义 Source 读取配置),然后流和流连接起来
整合配置中心

 

整合nacos动态更新checkoint


1:pom依赖

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-core</artifactId>
    <version>1.1.4</version>
</dependency>
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.1.4</version>
</dependency>
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-common</artifactId>
    <version>1.1.4</version>
</dependency>

2:flink整合nacos代码

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
 * Desc: 测试 nacos 动态更改 Checkpoint 配置后,Flink 是否可以获取到更改后的值,并生效?
 *
 * 结论是:不生效,因为 Flink 是 Lazy Evaluation(延迟执行),当程序的 main 方法执行时,数据源加载数据和数据转换等算子不会立马执行,
 * 这些操作会被创建并添加到程序的执行计划中去,只有当执行环境 env 的 execute 方法被显示地触发执行时,整个程序才开始执行实际的操作,所以
 * 在一开始初始化后等程序执行 execute 方法后再修改 env 的配置其实就不起作用了。
 *
 */
public class FlinkNacosTest2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
        env.setParallelism(1);
        String serverAddr = "localhost";
        String dataId = "test";
        String group = "DEFAULT_GROUP";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        final String[] content = {configService.getConfig(dataId, group, 5000)};
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("===============");
                content[0] = configInfo;
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(content[0]));
                System.out.println("----------");
                System.out.println(env.getCheckpointConfig().getCheckpointInterval());
            }
        });
        System.out.println(content[0]);
        env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(content[0]));
        env.getCheckpointConfig().setCheckpointTimeout(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.addSource(new SourceFunction<Tuple2<String, Long>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("xxxx", System.currentTimeMillis()));
                    Thread.sleep(800);
                }
            }
            @Override
            public void cancel() {
            }
        }).print();
        env.execute();
    }
}

 


flink 整合apollo完成动态更新作业


1:pom依赖

<dependency>
    <groupId>com.ctrip.framework.apollo</groupId>
    <artifactId>apollo-client</artifactId>
    <version>1.5.1</version>
</dependency>

2:flink整合apollo代码

import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.model.ConfigChange;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@Slf4j
public class FlinkApolloTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
        env.setParallelism(1);
        env.addSource(new RichSourceFunction<String>() {
            private Config config;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                config = ConfigService.getAppConfig();
                config.addChangeListener(new ConfigChangeListener() {
                    @Override
                    public void onChange(ConfigChangeEvent configChangeEvent) {
                        for (String key : configChangeEvent.changedKeys()) {
                            ConfigChange change = configChangeEvent.getChange(key);
                            log.info("Change - key: {}, oldValue: {}, newValue: {}, changeType: {}",
                                    change.getPropertyName(), change.getOldValue(), change.getNewValue(),
                                    change.getChangeType());
                        }
                    }
                });
            }
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(config.getProperty("name", "zs"));
                    Thread.sleep(3000);
                }
            }
            @Override
            public void cancel() {
            }
        }).print();
        env.execute("flink Apollo");
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
资源调度 监控 关系型数据库
在Flink CDC作业提交过程中,出现超时问题可能与多种因素有关
【2月更文挑战第8天】在Flink CDC作业提交过程中,出现超时问题可能与多种因素有关
69 11
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
86 1
|
2月前
|
SQL 消息中间件 Kafka
Flink报错问题之SQL作业中调用UDTF报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 资源调度
Flink报错问题之flink 1.11 sql作业提交JM报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
存储 SQL 消息中间件
Flink作业问题之取消Flink Streaming作业失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Kafka
flink问题之作业执行异常如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
56 2
|
2月前
|
SQL 消息中间件 Kafka
Flink问题之取消正在运行的Flink Streaming作业如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
32 0
|
2月前
|
资源调度 Kubernetes Java
Flink--day02、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
Flink--day022、Flink部署(Yarn集群搭建下的会话模式部署、单作业模式部署、应用模式部署)
138 5
|
2月前
|
SQL 分布式数据库 Apache
Flink问题之提交作业失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
32 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
483 5