Flink Window 排序

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: ## 概述 - 对增量Window进行输出排序 - WordCount增量(按单词名称排序) - WordCount增量(按单词个数,再单词名称排序)

Flink Window 排序

概述

  • 对增量Window进行输出排序
  • WordCount增量(按单词名称排序)
  • WordCount增量(按单词个数,再单词名称排序)

源码

源码分析

WordCount 程序(增量按单词升序排序)

  • DataStream.windowAll 说明是window中的所有Key返回AllWindowedStream
  • AllWindowedStream.process(ProcessAllWindowFunction),ProcessAllWindowFunction数定义整个Window的所有数据传过来,进行处理
    可以进行按key合并,按单词排序,按单词个数排序
  • BucketingSink指定文件输出目录

package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc

import java.time.ZoneId

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.flink.util.Collector

import scala.collection.mutable

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted {


  def getConfiguration(isDebug:Boolean = false):Configuration={

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }

  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)






    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理)
      //缺点,window中数据量大时,就容易内存溢出
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

      .process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
        override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
          val set = new mutable.HashSet[WordWithCount]{}


          for(wordCount <- elements){
            if(set.contains(wordCount)){
              set.remove(wordCount)
              set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
            }else{
              set.add(wordCount)
            }
          }

          val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word)  < 0 )

          for(wordCount <- sortSet)  out.collect(wordCount)
        }

      })




      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)




    //textResult.print().setParallelism(1)

    val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")


    bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
    //bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]())
    //bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]())
    //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    //bucketingSink.setBatchSize(100 ) // this is 400 MB,
    bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB,
    //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins
    //setInactiveBucketCheckInterval
    //setInactiveBucketThreshold
    //每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源
    bucketingSink.setInactiveBucketCheckInterval(2 * 1000)
    bucketingSink.setInactiveBucketThreshold(2 * 1000)
    bucketingSink.setAsyncTimeout(1 * 1000)


    dataStreamDeal.setParallelism(1)
      .addSink(bucketingSink)




    if(args == null || args.size ==0){
      env.execute("默认作业")

      //执行计划
      //println(env.getExecutionPlan)
      //StreamGraph
     //println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

    }else{
      env.execute(args(0))
    }

    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

/*  abstract private   class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {

  }*/
}


输入数据

1 2 1 5 3

输出数据

WordWithCount(1,2)
WordWithCount(2,1)
WordWithCount(3,1)
WordWithCount(5,1)

WordCount 程序(增量,按单词个数排序,个数相同,再按单词排序)

  • DataStream.windowAll 说明是window中的所有Key返回AllWindowedStream
  • AllWindowedStream.process(ProcessAllWindowFunction),ProcessAllWindowFunction数定义整个Window的所有数据传过来,进行处理
    可以进行按key合并,按单词排序,按单词个数排序
  • BucketingSink指定文件输出目录
package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort

import java.time.ZoneId

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}
import org.apache.flink.util.Collector

import scala.collection.mutable

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSortedByCount {


  def getConfiguration(isDebug:Boolean = false):Configuration={

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }

  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration)






    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1))
      .keyBy("word")
      //将当前window中所有的行记录,发送过来ProcessAllWindowFunction函数中去处理(可以排序,可以对相同key进行处理)
      //缺点,window中数据量大时,就容易内存溢出
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

      .process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] {
        override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = {
          val set = new mutable.HashSet[WordWithCount]{}


          for(wordCount <- elements){
            if(set.contains(wordCount)){
              set.remove(wordCount)
              set.add(new WordWithCount(wordCount.word,wordCount.count + 1))
            }else{
              set.add(wordCount)
            }
          }

          val sortSet = set.toList.sortWith( (a,b) => {


            if(a.count == b.count){
              a.word.compareTo(b.word) < 0
            }else{
              a.count < b.count
            }

          } )

          for(wordCount <- sortSet)  out.collect(wordCount)
        }

      })




      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)




    //textResult.print().setParallelism(1)

    val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data")


    bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai")))
    //bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]())
    //bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]())
    //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    //bucketingSink.setBatchSize(100 ) // this is 400 MB,
    bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB,
    //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
    //bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins
    //setInactiveBucketCheckInterval
    //setInactiveBucketThreshold
    //每间隔多久时间,往Sink中写数据,不是每天条数据就写,浪费资源
    bucketingSink.setBatchRolloverInterval(2 * 1000)
    bucketingSink.setInactiveBucketThreshold(2 * 1000)
    //bucketingSink.setAsyncTimeout(1 * 1000)


    dataStreamDeal.setParallelism(1)
      .addSink(bucketingSink)




    if(args == null || args.size ==0){
      env.execute("默认作业")

      //执行计划
      //println(env.getExecutionPlan)
      //StreamGraph
     //println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

    }else{
      env.execute(args(0))
    }

    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

/*  abstract private   class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> {

  }*/
}


输入数据

1 1 2 4 4 3 2 1

输出数据


WordWithCount(3,1)
WordWithCount(2,2)
WordWithCount(4,2)
WordWithCount(1,3)
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
10天前
|
数据安全/隐私保护 流计算
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
Flink的Interval Join是基于水印(Watermark)和时间窗口(Time Window)实现的
118 2
|
BI API 数据处理
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。
543 0
带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)
|
存储 Java Apache
Flink Window 、Time(二)| 学习笔记
快速学习 Flink Window 、Time 。
126 0
|
BI API 数据处理
【Flink】(四)详解 Flink 中的窗口(Window)
【Flink】(四)详解 Flink 中的窗口(Window)
762 0
【Flink】(四)详解 Flink 中的窗口(Window)
|
API 流计算 Windows
关于Flink框架窗口(window)函数最全解析
在真实的场景中数据流往往都是没有界限的,无休止的,就像是一个通道中水流持续不断地通过管道流向别处,这样显然是无法进行处理、计算的,那如何可以将没有界限的数据进行处理呢?我们可以将这些无界限的数据流进行切割、拆分,将其得到一个有界限的数据集合然后进行处理、计算就方便多了。Flink中窗口(Window)就是来处理无界限的数据流的,将无线的数据流切割成为有限流,然后将切割后的有限流数据分发到指定有限大小的桶中进行分析计算。
关于Flink框架窗口(window)函数最全解析
|
存储 缓存 数据处理
|
存储 分布式计算 测试技术
彻底搞清Flink中的Window
在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
442 0
彻底搞清Flink中的Window
|
SQL 消息中间件 监控