开发者社区> 问答> 正文

[flink] 请求一个flink实时计算的代码例子

太初123 2018-11-20 19:33:16 1363

flink实时流中消息队列作为source,需求是每来一条消息数据,只筛选两个字段(一个叫id,另一个叫time)出来作为后面计算用,然后keyBy字段id,再对 每个id 求汇总从历史到现在的time的sum。请问代码怎么实现?

消息中间件 流计算
分享到
取消 提交回答
全部回答(2)
  • 倪完
    2019-07-17 23:15:36
    已采纳

    如果使用DataStream API可参见flink的WordCount(org.apache.flink.streaming.examples.wordcount.WordCount)示例代码:

    public class WordCount {

    // *************************************************************************
    // PROGRAM
    // *************************************************************************
    
    public static void main(String[] args) throws Exception {
    
        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);
    
        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);
    
        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }
    
        DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
            .keyBy(0).sum(1);
    
        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    
        // execute program
        env.execute("Streaming WordCount");
    }
    
    // *************************************************************************
    // USER FUNCTIONS
    // *************************************************************************
    
    /**
     * Implements the string tokenizer that splits sentences into words as a
     * user-defined FlatMapFunction. The function takes a line (String) and
     * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
     * Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");
    
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
    

    }

    0 0
  • 太初123
    2019-07-17 23:15:36

    多谢!

    0 0
添加回答
+ 订阅

大数据计算实践乐园,近距离学习前沿技术

推荐文章
相似问题