开发者社区> 问答> 正文

如何每五分钟统计一次当天某个消息的总条数

大家好! 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数? 谢谢!*来自志愿者整理的flink邮件归档

展开
收起
毛毛虫雨 2021-12-07 13:58:18 566 0
1 条回答
写回答
取消 提交回答
  • 处理这个问题,我有一些想法:

    ).Flink Stream默认是处理增量数据,对指定间隔时间或数量内的数据进行分析 ).可以自定义 ProcessAllWindowFunction,相当于,对于一个Window的数据,自己实现处理逻辑,参数是在Window之前的operator也是已经处理的 ).对于你,需要存储每次计算的结果,并更新到存储中心供每次计算使用(如Redis、等) ).下面是一个ProcessAllWIndowFunction的示例,供参考(实现功能: WordCount 程序(增量按单词升序排序) )

    package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort

    import java.time.ZoneId

    import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer} import org.apache.flink.util.Collector

    import scala.collection.mutable

    /** * nc -lk 1234 输入数据 */ object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {

    def getConfiguration(isDebug:Boolean = false):Configuration={

    val configuration : Configuration = new Configuration()

    if(isDebug){ val timeout = "100000 s" val timeoutHeartbeatPause = "1000000 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",10000000) configuration.setInteger("heartbeat.timeout",50000000) }

    configuration }

    def main(args: Array[String]): Unit = {

    val port = 1234 // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)

    // get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n')

    import org.apache.flink.streaming.api.scala._ val dataStreamDeal = dataStream.flatMap( w => w.split("\s") ).map( w => WordWithCount(w,1)) .keyBy("word") //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理) //缺点,window中数据量大时,就容易内存溢出 .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

    .process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] { override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = { val set = new mutable.HashSet[WordWithCount]{}

    for(wordCount <- elements){ if(set.contains(wordCount)){ set.remove(wordCount) set.add(new WordWithCount(wordCount.word,wordCount.count + 1)) }else{ set.add(wordCount) } }

    val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word) < 0 )

    for(wordCount <- sortSet) out.collect(wordCount) }

    })

    //.countWindow(3) //.countWindow(3,1) //.countWindowAll(3)

    //textResult.print().setParallelism(1)

    val bucketingSink = new BucketingSinkWordWithCount

    bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai"))) //bucketingSink.setWriter(new SequenceFileWriterIntWritable, Text) //bucketingSink.setWriter(new SequenceFileWriterWordWithCount) //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, //bucketingSink.setBatchSize(100 ) // this is 400 MB, bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB, //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins //setInactiveBucketCheckInterval //setInactiveBucketThreshold //每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源

    bucketingSink.setInactiveBucketThreshold(2 * 1000) bucketingSink.setAsyncTimeout(1 * 1000)

    dataStreamDeal.setParallelism(1) .addSink(bucketingSink)

    if(args == null || args.size ==0){ env.execute("默认作业")

    //执行计划 //println(env.getExecutionPlan) //StreamGraph //println(env.getStreamGraph.getStreamingPlanAsJSON)

    //JsonPlanGenerator.generatePlan(jobGraph)

    }else{ env.execute(args(0)) }

    println("结束")

    }

    // Data type for words with count case class WordWithCount(word: String, count: Long)

    /* abstract private class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {

    }*/ }*来自志愿者整理的flink

    2021-12-07 15:26:59
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载