开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

咨询一下Flink,RichFunction函数里的open方法的参数 怎么传递?

咨询一下Flink,RichFunction函数里的open方法的参数Configuration, 怎么传递?
void open(Configuration parameters)
这么写也传递不过去
Configuration conf = new Configuration();
conf.setString("name", "wsy");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

展开
收起
真的很搞笑 2024-03-11 13:56:50 192 0
3 条回答
写回答
取消 提交回答
  • 5f849ad5c68a907d0e68a61ed2bf23fb.png
    在open()时, 传递进去的是一个空Configuration, 这样是拿不到的ff5d3c8bc77c1e58b5dcb00e5192f218.png
    放到全局参数中去拿 ,此回答整理自钉群“【②群】Apache Flink China社区”

    2024-03-11 14:50:53
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    在Flink中,open()方法的参数Configuration是通过RichFunction的子类构造函数传递的。您可以通过以下方式传递Configuration对象:

    public class MyRichFunction extends RichFunction {
        private Configuration conf;
    
        public MyRichFunction(Configuration conf) {
            this.conf = conf;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // 在这里可以使用conf对象进行配置操作
        }
    
        // 其他方法...
    }
    

    然后,在创建StreamExecutionEnvironment时,将Configuration对象传递给MyRichFunction的构造函数:

    Configuration conf = new Configuration();
    conf.setString("name", "wsy");
    
    MyRichFunction myRichFunction = new MyRichFunction(conf);
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    

    这样,您就可以在open()方法中使用传递的Configuration对象进行配置操作了。

    2024-03-11 14:39:17
    赞同 展开评论 打赏
  • 在 Flink 中,open(Configuration parameters) 方法的 parameters 参数是由 Flink 系统自动填充的,用于传递给用户自定义函数运行时的配置信息。如果你想要自定义并传递额外的配置,应该通过 StreamExecutionEnvironment 设置全局配置或者通过 RuntimeContext 在运行时获取。

       // 设置全局配置(这些配置会被分发到各个 TaskManager 上,不一定能直接传递给 open 方法)
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.getConfig().setGlobalJobParameters(new Configuration());
       // 添加自定义参数
       env.getConfig().getConfiguration().setString("name", "wsy");
    
       // 在 RichFunction 中获取配置
       public class MyRichMapFunction extends RichMapFunction<String, String> {
           private transient String name;
    
           @Override
           public void open(Configuration parameters) throws Exception {
               super.open(parameters);
               // 从 RuntimeContext 获取全局配置
               Configuration globalConfig = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
               name = globalConfig.getString("name");
           }
    
           // ...
       }
    
    2024-03-11 14:26:43
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载