Flink从入门到入土(下)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink从入门到入土(下)

3.7 connect


在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。


Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。


image.png


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_Connect {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
    // 3.转换成样例类
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    // 4. 从集合中再读取一条流
    val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))
    val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS)
    // coMap表示连接流调用的map,各自都需要一个 function
    resultCS.map(
      sensor=>sensor.id,
      num=>num+1
    ).print()
    // 4. 执行
    env.execute()
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png


3.8 union


对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream


image.png


connect与 union 区别:


  1. union之前两个流的类型必须是一样,connect可以不一样


  1. connect只能操作两个流,union可以操作多个。


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:FlatMap
 */
object Transform_Union {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 2. 从集合中读取流
    val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4))
    val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10))
    val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110))
    // TODO union 真正将多条流合并成一条流
    // 合并的流,类型必须一致
    // 可以合并多条流,只要类型一致
    num1DS.union(num2DS).union(num3DS)
      .print()
    // 4. 执行
    env.execute()
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png


3.9 Operator


Flink作为计算框架,主要应用于数据计算处理上, 所以在keyBy对数据进行分流后,可以对数据进行相应的统计分析


3.9.1 滚动聚合算子(Rolling Aggregation)


这些算子可以针对KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream


sum()


image.png


3.9.2 reduce


一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。


image.png


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:Reduce
 */
object Transform_Reduce {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
    // 3.转换成样例类
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    // 输入的类型一样,输出类型和输出类型也要一样
    // 组内的第一条数据,不进入reduce计算
    val reduceDS: DataStream[WaterSensor] = sensorKS.reduce(
      (ws1, ws2) => {
        println(ws1 + "<===>" + ws2)
        WaterSensor(ws1.id, System.currentTimeMillis(), ws1.vc + ws2.vc)
      }
    )
    reduceDS.print("reduce")
    // 4. 执行
    env.execute()
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png


3.9.3process


Flink在数据流通过keyBy进行分流处理后,如果想要处理过程中获取环境相关信息,可以采用process算子自定义实现 1)继承KeyedProcessFunction抽象类,并定义泛型:


[KEY, IN, OUT]


class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String]{}


  1. 重写方法


// 自定义KeyedProcessFunction,是一个特殊的富函数
  // 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型
  // 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑
/**
      * 处理逻辑:来一条处理一条
      *
      * @param value 一条数据
      * @param ctx   上下文对象
      * @param out   采集器:收集数据,并输出
      */
    override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
      out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value)
      // 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple
//      ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple
    }


完整代码:



import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
 * description: SourceList
 * date: 2020/8/28 19:02
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:Reduce
 */
object Transform_Process {
  def main(args: Array[String]): Unit = {
    // 1.创建执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 2.读取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
    // 3.转换成样例类
    val mapDS: DataStream[WaterSensor] = sensorDS.map(
      lines => {
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    //按照ID  进行分组
    val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
    sensorKS.process(new MyKeyedProcessFunction)
    // 4. 执行
    env.execute()
  }
  // 自定义KeyedProcessFunction,是一个特殊的富函数
  // 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型
  // 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑
  class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] {
    /**
     * 处理逻辑:来一条处理一条
     *
     * @param value 一条数据
     * @param ctx   上下文对象
     * @param out   采集器:收集数据,并输出
     */
    override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
      out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value)
      // 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple
      //      ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple
    }
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


4.Sink


image.png


Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作


之前我们一直在使用的print方法其实就是一种Sink。


@PublicEvolving
    public DataStreamSink<T> print(String sinkIdentifier) {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false);
        return this.addSink(printFunction).name("Print to Std. Out");
    }


官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink


image.png


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
传感器 分布式计算 Scala
Flink从入门到入土(中)
Flink从入门到入土(中)
Flink从入门到入土(中)
|
消息中间件 存储 分布式计算
Flink从入门到入土(上)
Flink从入门到入土(上)
Flink从入门到入土(上)
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
802 7
阿里云实时计算Flink在多行业的应用和实践
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
运维 搜索推荐 数据安全/隐私保护
阿里云实时计算Flink版测评报告
阿里云实时计算Flink版在用户行为分析与标签画像场景中表现出色,通过实时处理电商平台用户行为数据,生成用户兴趣偏好和标签,提升推荐系统效率。该服务具备高稳定性、低延迟、高吞吐量,支持按需计费,显著降低运维成本,提高开发效率。
32 1
|
14天前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
2月前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
56 15
|
13天前
|
运维 监控 Serverless
阿里云实时计算Flink版评测报告
阿里云实时计算Flink版是一款全托管的Serverless实时流处理服务,基于Apache Flink构建,提供企业级增值功能。本文从稳定性、性能、开发运维、安全性和成本效益等方面全面评测该产品,展示其在实时数据处理中的卓越表现和高投资回报率。
|
13天前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
26 0