我们知道 Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,在作业启动后初始化了之后如果再想更新作业的配置一般有三种解决方法:
改变启动参数或者改变配置文件,重启作业,让作业能够读取到修改后的配置 通过读取配置流(需要自定义 Source 读取配置),然后流和流连接起来 整合配置中心
整合nacos动态更新checkoint
1:pom依赖
<dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-core</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-common</artifactId> <version>1.1.4</version> </dependency>
2:flink整合nacos代码
import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Properties; import java.util.concurrent.Executor; /** * Desc: 测试 nacos 动态更改 Checkpoint 配置后,Flink 是否可以获取到更改后的值,并生效? * * 结论是:不生效,因为 Flink 是 Lazy Evaluation(延迟执行),当程序的 main 方法执行时,数据源加载数据和数据转换等算子不会立马执行, * 这些操作会被创建并添加到程序的执行计划中去,只有当执行环境 env 的 execute 方法被显示地触发执行时,整个程序才开始执行实际的操作,所以 * 在一开始初始化后等程序执行 execute 方法后再修改 env 的配置其实就不起作用了。 * */ public class FlinkNacosTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args)); env.setParallelism(1); String serverAddr = "localhost"; String dataId = "test"; String group = "DEFAULT_GROUP"; Properties properties = new Properties(); properties.put("serverAddr", serverAddr); ConfigService configService = NacosFactory.createConfigService(properties); final String[] content = {configService.getConfig(dataId, group, 5000)}; configService.addListener(dataId, group, new Listener() { @Override public Executor getExecutor() { return null; } @Override public void receiveConfigInfo(String configInfo) { System.out.println("==============="); content[0] = configInfo; env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(content[0])); System.out.println("----------"); System.out.println(env.getCheckpointConfig().getCheckpointInterval()); } }); System.out.println(content[0]); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(content[0])); env.getCheckpointConfig().setCheckpointTimeout(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); env.addSource(new SourceFunction<Tuple2<String, Long>>() { @Override public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception { while (true) { ctx.collect(new Tuple2<>("xxxx", System.currentTimeMillis())); Thread.sleep(800); } } @Override public void cancel() { } }).print(); env.execute(); } }
flink 整合apollo完成动态更新作业
1:pom依赖
<dependency> <groupId>com.ctrip.framework.apollo</groupId> <artifactId>apollo-client</artifactId> <version>1.5.1</version> </dependency>
2:flink整合apollo代码
import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigChangeListener; import com.ctrip.framework.apollo.ConfigService; import com.ctrip.framework.apollo.model.ConfigChange; import com.ctrip.framework.apollo.model.ConfigChangeEvent; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; @Slf4j public class FlinkApolloTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args)); env.setParallelism(1); env.addSource(new RichSourceFunction<String>() { private Config config; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); config = ConfigService.getAppConfig(); config.addChangeListener(new ConfigChangeListener() { @Override public void onChange(ConfigChangeEvent configChangeEvent) { for (String key : configChangeEvent.changedKeys()) { ConfigChange change = configChangeEvent.getChange(key); log.info("Change - key: {}, oldValue: {}, newValue: {}, changeType: {}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()); } } }); } @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { ctx.collect(config.getProperty("name", "zs")); Thread.sleep(3000); } } @Override public void cancel() { } }).print(); env.execute("flink Apollo"); } }