开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

实时计算Flink如何将字符转换为大写字母函数

已解决

实时计算Flink如何将字符转换为大写字母函数

展开
收起
阿里云服务支持 2022-12-12 10:22:34 213 0
2 条回答
写回答
取消 提交回答
  • 官方回答

    Flink将字符转换为大写字母,需使用 UPPER函数,详情请参考官方文档

    2022-12-12 10:28:01
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    方式一:直接调用原生的map算子

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

    //方式一:调用原生自带的map算子 SingleOutputStreamOperator wordSource = socketTextStream.map(word -> word.toUpperCase());

    wordSource.print(); env.execute(); 方式二:调用底层的transform算子重定义实现

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888); //方式二: SingleOutputStreamOperator wordSource = socketTextStream.transform("MyMap", TypeInformation.of(String.class), new StreamMap<>(String::toUpperCase));

    wordSource.print(); env.execute(); 方式三:继承实现类自定义实现

    /** * 类似于StreamMap操作 */ static class MyStreamMap extends AbstractStreamOperator implements OneInputStreamOperator<String, String> { @Override public void processElement(StreamRecord element) throws Exception { String elementValue = element.getValue(); output.collect(element.replace(elementValue.toUpperCase())); } }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

    SingleOutputStreamOperator wordSource = socketTextStream.transform("MyStreamMap", TypeInformation.of(String.class), new MyStreamMap());

    wordSource.print(); env.execute(); 方式四:实现RichMapFunction类

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

    SingleOutputStreamOperator wordSource = socketTextStream.map(new RichMapFunction<String, String>() { /** * 在构造对象之后,执行map方法之前执行一次 * 通常用于初始化工作,例如连接创建等 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); }

            /**
             * 在关闭subtask之前执行一次,例如做一些释放资源的工作
             * @throws Exception
             */
            @Override
            public void close() throws Exception {
                super.close();
            }
    
            @Override
            public String map(String s) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
    
                return indexOfThisSubtask + ":" + s.toUpperCase();
            }
        });
    

    wordSource.print(); env.execute();

    2022-12-12 10:27:06
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载