开发者社区> 问答> 正文

[flink] 请求一个flink实时计算的代码例子

已解决

flink实时流中消息队列作为source,需求是每来一条消息数据,只筛选两个字段(一个叫id,另一个叫time)出来作为后面计算用,然后keyBy字段id,再对 每个id 求汇总从历史到现在的time的sum。请问代码怎么实现?

展开
收起
太初123 2018-11-20 19:33:16 4997 0
2 条回答
写回答
取消 提交回答
  • 采纳回答

    如果使用DataStream API可参见flink的WordCount(org.apache.flink.streaming.examples.wordcount.WordCount)示例代码:

    public class WordCount {

    // *************************************************************************
    // PROGRAM
    // *************************************************************************
    
    public static void main(String[] args) throws Exception {
    
        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);
    
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);
    
        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }
    
        DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(0).sum(1);
    
        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    
        // execute program
        env.execute("Streaming WordCount");
    }
    
    // *************************************************************************
    // USER FUNCTIONS
    // *************************************************************************
    
    /**
     * Implements the string tokenizer that splits sentences into words as a
     * user-defined FlatMapFunction. The function takes a line (String) and
     * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
     * Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
    
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
    

    }

    2019-07-17 23:15:36
    赞同 展开评论 打赏
  • 多谢!

    2019-07-17 23:15:36
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载