实时计算Flink如何将字符转换为大写字母函数

已解决

实时计算Flink如何将字符转换为大写字母函数

展开
收起
阿里云服务支持 2022-12-12 10:22:34 266 分享 版权
2 条回答
写回答
取消 提交回答
  • 官方回答

    Flink将字符转换为大写字母,需使用 UPPER函数,详情请参考官方文档

    2022-12-12 10:28:01
    赞同 展开评论
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    方式一:直接调用原生的map算子

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

    //方式一:调用原生自带的map算子 SingleOutputStreamOperator wordSource = socketTextStream.map(word -> word.toUpperCase());

    wordSource.print(); env.execute(); 方式二:调用底层的transform算子重定义实现

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888); //方式二: SingleOutputStreamOperator wordSource = socketTextStream.transform("MyMap", TypeInformation.of(String.class), new StreamMap<>(String::toUpperCase));

    wordSource.print(); env.execute(); 方式三:继承实现类自定义实现

    /** * 类似于StreamMap操作 */ static class MyStreamMap extends AbstractStreamOperator implements OneInputStreamOperator<String, String> { @Override public void processElement(StreamRecord element) throws Exception { String elementValue = element.getValue(); output.collect(element.replace(elementValue.toUpperCase())); } }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

    SingleOutputStreamOperator wordSource = socketTextStream.transform("MyStreamMap", TypeInformation.of(String.class), new MyStreamMap());

    wordSource.print(); env.execute(); 方式四:实现RichMapFunction类

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);

    SingleOutputStreamOperator wordSource = socketTextStream.map(new RichMapFunction<String, String>() { /** * 在构造对象之后,执行map方法之前执行一次 * 通常用于初始化工作,例如连接创建等 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); }

            /**
             * 在关闭subtask之前执行一次,例如做一些释放资源的工作
             * @throws Exception
             */
            @Override
            public void close() throws Exception {
                super.close();
            }
    
            @Override
            public String map(String s) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
    
                return indexOfThisSubtask + ":" + s.toUpperCase();
            }
        });
    

    wordSource.print(); env.execute();

    2022-12-12 10:27:06
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

收录在圈子:
作为全球云计算的领先者,阿里云为全球230万企业提供着云计算服务,服务范围覆盖200多个国家和地区。我们致力于为企业、政府等组织机构提供安全可靠的云计算服务,给用户带来极速愉悦的服务体验。
还有其他疑问?
咨询AI助理