flink实时流中消息队列作为source,需求是每来一条消息数据,只筛选两个字段(一个叫id,另一个叫time)出来作为后面计算用,然后keyBy字段id,再对 每个id 求汇总从历史到现在的time的sum。请问代码怎么实现?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
如果使用DataStream API可参见flink的WordCount(org.apache.flink.streaming.examples.wordcount.WordCount)示例代码:
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
    // Checking input parameters
    final ParameterTool params = ParameterTool.fromArgs(args);
    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);
    // get input data
    DataStream<String> text;
    if (params.has("input")) {
        // read the text file from given input path
        text = env.readTextFile(params.get("input"));
    } else {
        System.out.println("Executing WordCount example with default input data set.");
        System.out.println("Use --input to specify file input.");
        // get default test text data
        text = env.fromElements(WordCountData.WORDS);
    }
    DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
        .keyBy(0).sum(1);
    // emit result
    if (params.has("output")) {
        counts.writeAsText(params.get("output"));
    } else {
        System.out.println("Printing result to stdout. Use --output to specify output path.");
        counts.print();
    }
    // execute program
    env.execute("Streaming WordCount");
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
 * Implements the string tokenizer that splits sentences into words as a
 * user-defined FlatMapFunction. The function takes a line (String) and
 * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
 * Integer>}).
 */
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");
        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<>(token, 1));
            }
        }
    }
}
}