大家好! 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数? 谢谢!*来自志愿者整理的flink邮件归档
处理这个问题,我有一些想法:
).Flink Stream默认是处理增量数据,对指定间隔时间或数量内的数据进行分析 ).可以自定义 ProcessAllWindowFunction,相当于,对于一个Window的数据,自己实现处理逻辑,参数是在Window之前的operator也是已经处理的 ).对于你,需要存储每次计算的结果,并更新到存储中心供每次计算使用(如Redis、等) ).下面是一个ProcessAllWIndowFunction的示例,供参考(实现功能: WordCount 程序(增量按单词升序排序) )
package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort
import java.time.ZoneId
import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer} import org.apache.flink.util.Collector
import scala.collection.mutable
/** * nc -lk 1234 输入数据 */ object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {
def getConfiguration(isDebug:Boolean = false):Configuration={
val configuration : Configuration = new Configuration()
if(isDebug){ val timeout = "100000 s" val timeoutHeartbeatPause = "1000000 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",10000000) configuration.setInteger("heartbeat.timeout",50000000) }
configuration }
def main(args: Array[String]): Unit = {
val port = 1234 // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val configuration : Configuration = getConfiguration(true)
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
// get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n')
import org.apache.flink.streaming.api.scala._ val dataStreamDeal = dataStream.flatMap( w => w.split("\s") ).map( w => WordWithCount(w,1)) .keyBy("word") //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理) //缺点,window中数据量大时,就容易内存溢出 .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] { override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = { val set = new mutable.HashSet[WordWithCount]{}
for(wordCount <- elements){ if(set.contains(wordCount)){ set.remove(wordCount) set.add(new WordWithCount(wordCount.word,wordCount.count + 1)) }else{ set.add(wordCount) } }
val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word) < 0 )
for(wordCount <- sortSet) out.collect(wordCount) }
})
//.countWindow(3) //.countWindow(3,1) //.countWindowAll(3)
//textResult.print().setParallelism(1)
val bucketingSink = new BucketingSinkWordWithCount
bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai"))) //bucketingSink.setWriter(new SequenceFileWriterIntWritable, Text) //bucketingSink.setWriter(new SequenceFileWriterWordWithCount) //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, //bucketingSink.setBatchSize(100 ) // this is 400 MB, bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB, //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins //setInactiveBucketCheckInterval //setInactiveBucketThreshold //每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源
bucketingSink.setInactiveBucketThreshold(2 * 1000) bucketingSink.setAsyncTimeout(1 * 1000)
dataStreamDeal.setParallelism(1) .addSink(bucketingSink)
if(args == null || args.size ==0){ env.execute("默认作业")
//执行计划 //println(env.getExecutionPlan) //StreamGraph //println(env.getStreamGraph.getStreamingPlanAsJSON)
//JsonPlanGenerator.generatePlan(jobGraph)
}else{ env.execute(args(0)) }
println("结束")
}
// Data type for words with count case class WordWithCount(word: String, count: Long)
/* abstract private class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {
}*/ }*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。