flink动态更新作业

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink动态更新作业

我们知道 Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,在作业启动后初始化了之后如果再想更新作业的配置一般有三种解决方法:

改变启动参数或者改变配置文件,重启作业,让作业能够读取到修改后的配置
通过读取配置流(需要自定义 Source 读取配置),然后流和流连接起来
整合配置中心

 

整合nacos动态更新checkoint


1:pom依赖

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-core</artifactId>
    <version>1.1.4</version>
</dependency>
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.1.4</version>
</dependency>
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-common</artifactId>
    <version>1.1.4</version>
</dependency>

2:flink整合nacos代码

import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
 * Desc: 测试 nacos 动态更改 Checkpoint 配置后,Flink 是否可以获取到更改后的值,并生效?
 *
 * 结论是:不生效,因为 Flink 是 Lazy Evaluation(延迟执行),当程序的 main 方法执行时,数据源加载数据和数据转换等算子不会立马执行,
 * 这些操作会被创建并添加到程序的执行计划中去,只有当执行环境 env 的 execute 方法被显示地触发执行时,整个程序才开始执行实际的操作,所以
 * 在一开始初始化后等程序执行 execute 方法后再修改 env 的配置其实就不起作用了。
 *
 */
public class FlinkNacosTest2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
        env.setParallelism(1);
        String serverAddr = "localhost";
        String dataId = "test";
        String group = "DEFAULT_GROUP";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        final String[] content = {configService.getConfig(dataId, group, 5000)};
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("===============");
                content[0] = configInfo;
                env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(content[0]));
                System.out.println("----------");
                System.out.println(env.getCheckpointConfig().getCheckpointInterval());
            }
        });
        System.out.println(content[0]);
        env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(content[0]));
        env.getCheckpointConfig().setCheckpointTimeout(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.addSource(new SourceFunction<Tuple2<String, Long>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("xxxx", System.currentTimeMillis()));
                    Thread.sleep(800);
                }
            }
            @Override
            public void cancel() {
            }
        }).print();
        env.execute();
    }
}

 


flink 整合apollo完成动态更新作业


1:pom依赖

<dependency>
    <groupId>com.ctrip.framework.apollo</groupId>
    <artifactId>apollo-client</artifactId>
    <version>1.5.1</version>
</dependency>

2:flink整合apollo代码

import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.model.ConfigChange;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
@Slf4j
public class FlinkApolloTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
        env.setParallelism(1);
        env.addSource(new RichSourceFunction<String>() {
            private Config config;
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                config = ConfigService.getAppConfig();
                config.addChangeListener(new ConfigChangeListener() {
                    @Override
                    public void onChange(ConfigChangeEvent configChangeEvent) {
                        for (String key : configChangeEvent.changedKeys()) {
                            ConfigChange change = configChangeEvent.getChange(key);
                            log.info("Change - key: {}, oldValue: {}, newValue: {}, changeType: {}",
                                    change.getPropertyName(), change.getOldValue(), change.getNewValue(),
                                    change.getChangeType());
                        }
                    }
                });
            }
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (true) {
                    ctx.collect(config.getProperty("name", "zs"));
                    Thread.sleep(3000);
                }
            }
            @Override
            public void cancel() {
            }
        }).print();
        env.execute("flink Apollo");
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
消息中间件 分布式计算 大数据
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
大数据-123 - Flink 并行度 相关概念 全局、作业、算子、Slot并行度 Flink并行度设置与测试
179 0
|
3月前
|
消息中间件 分布式计算 大数据
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
大数据-128 - Flink 并行度设置 细节详解 全局、作业、算子、Slot
178 0
|
5月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何从savepoint重新启动作业
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
机器学习/深度学习 人工智能 运维
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
美团 Flink 大作业部署问题之Flink在生态技术演进上有什么主要方向
|
5月前
|
监控 Serverless Apache
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
美团 Flink 大作业部署问题之如何体现Flink在业界的影响力
|
5月前
|
监控 Serverless 数据库
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
美团 Flink 大作业部署问题之端云联调并将流量恢复到云端实例如何结束
|
5月前
|
监控 Java Serverless
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
美团 Flink 大作业部署问题之想在Serverless平台上实时查看Spring Boot应用的日志要怎么操作
|
5月前
|
Java 流计算
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
美团 Flink 大作业部署问题之files-to-delete 的执行为什么能够异步进行呢
|
5月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
5月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的