方式一:直接调用原生的map算子
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);
//方式一:调用原生自带的map算子 SingleOutputStreamOperator wordSource = socketTextStream.map(word -> word.toUpperCase());
wordSource.print(); env.execute(); 方式二:调用底层的transform算子重定义实现
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888); //方式二: SingleOutputStreamOperator wordSource = socketTextStream.transform("MyMap", TypeInformation.of(String.class), new StreamMap<>(String::toUpperCase));
wordSource.print(); env.execute(); 方式三:继承实现类自定义实现
/** * 类似于StreamMap操作 */ static class MyStreamMap extends AbstractStreamOperator implements OneInputStreamOperator<String, String> { @Override public void processElement(StreamRecord element) throws Exception { String elementValue = element.getValue(); output.collect(element.replace(elementValue.toUpperCase())); } }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator wordSource = socketTextStream.transform("MyStreamMap", TypeInformation.of(String.class), new MyStreamMap());
wordSource.print(); env.execute(); 方式四:实现RichMapFunction类
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource socketTextStream = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator wordSource = socketTextStream.map(new RichMapFunction<String, String>() { /** * 在构造对象之后,执行map方法之前执行一次 * 通常用于初始化工作,例如连接创建等 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); }
/**
* 在关闭subtask之前执行一次,例如做一些释放资源的工作
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
}
@Override
public String map(String s) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
return indexOfThisSubtask + ":" + s.toUpperCase();
}
});
wordSource.print(); env.execute();
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。