Flink从入门到入土(上)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink从入门到入土(上)

和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分

image.png

1.Environment

image.png

Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单

// 批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 流式数据处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

2.Source

image.png

Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源.


2.1.从集合读取数据


一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从集合读取数据
 */
object SourceList {
  def main(args: Array[String]): Unit = {
      //1.创建执行的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.从集合中读取数据
    val sensorDS: DataStream[WaterSensor] = env.fromCollection(
      // List(1,2,3,4,5)
      List(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
    )
    //3.打印
    sensorDS.print()
    //4.执行
    env.execute("sensor")
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png


2.2从文件中读取数据


通常情况下,我们会从存储介质中获取数据,比较常见的就是将日志文件作为数据源


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从文件读取数据
 */
object SourceFile {
  def main(args: Array[String]): Unit = {
    //1.创建执行的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.从指定路径获取数据
    val fileDS: DataStream[String] = env.readTextFile("input/data.log")
    //3.打印
    fileDS.print()
    //4.执行
    env.execute("sensor")
  }
}
/**
 * 在读取文件时,文件路径可以是目录也可以是单一文件。如果采用相对文件路径,会从当前系统参数user.dir中获取路径
 * System.getProperty("user.dir")
 */
/**
 * 如果在IDEA中执行代码,那么系统参数user.dir自动指向项目根目录,
 * 如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,
 * 也可以将路径设置为分布式文件系统路径,如HDFS
 val fileDS: DataStream[String] =
 env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")
 */

image.png


如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,也可以将路径设置为分布式文件系统路径,如HDFS


val fileDS: DataStream[String] =
env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")


默认读取时,flink的依赖关系中是不包含Hadoop依赖关系的,所以执行上面代码时,会出现错误。


image.png


解决方法就是增加相关依赖jar包就可以了


image.png


2.3 kafka读取数据


Kafka作为消息传输队列,是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统。在现今企业级开发中,Kafka 和 Flink成为构建一个实时的数据处理系统的首选


2.3.1 引入kafka连接器的依赖


<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>1.10.0</version>
</dependency>


2.3.2 代码实现参考


import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从kafka读取数据
 */
object SourceKafka {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment =
      StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop02:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
    val kafkaDS: DataStream[String] = env.addSource(
      new FlinkKafkaConsumer011[String](
        "sensor",
        new SimpleStringSchema(),
        properties)
    )
    kafkaDS.print()
    env.execute("sensor")
  }
}


2.4 自定义数据源


大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式


2.4.1  创建自定义数据源


import com.atyang.day01.Source.SourceList.WaterSensor
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
/**
 * description: ss 
 * date: 2020/8/28 20:36 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:自定义数据源
 */
class MySensorSource extends SourceFunction[WaterSensor] {
  var flg = true
  override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
    while ( flg ) {
      // 采集数据
      ctx.collect(
        WaterSensor(
          "sensor_" +new Random().nextInt(3),
          1577844001,
          new Random().nextInt(5)+40
        )
      )
      Thread.sleep(100)
    }
  }
  override def cancel(): Unit = {
    flg = false;
  }
}


image.png


3.Transform

image.png


在Spark中,算子分为转换算子和行动算子,转换算子的作用可以通过算子方法的调用将一个RDD转换另外一个RDD,Flink中也存在同样的操作,可以将一个数据流转换为其他的数据流。


转换过程中,数据流的类型也会发生变化,那么到底Flink支持什么样的数据类型呢,其实我们常用的数据类型,Flink都是支持的。比如:Long, String, Integer, Int, 元组,样例类,List, Map等。


3.1 map


  • 映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素


  • 参数:Scala匿名函数或MapFunction


  • 返回:DataStream


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从集合读取数据
 */
object Transfrom_map {
  def main(args: Array[String]): Unit = {
      //1.创建执行的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.从集合中读取数据
    val sensorDS: DataStream[WaterSensor] = env.fromCollection(
      // List(1,2,3,4,5)
      List(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
    )
    val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1))
    //3.打印
    sensorDSMap.print()
    //4.执行
    env.execute("sensor")
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png


3.1.1 MapFunction


Flink为每一个算子的参数都至少提供了Scala匿名函数和函数类两种的方式,其中如果使用函数类作为参数的话,需要让自定义函数继承指定的父类或实现特定的接口。例如:MapFunction


sensor-data.log 文件数据


sensor_1,1549044122,10
sensor_1,1549044123,20
sensor_1,1549044124,30
sensor_2,1549044125,40
sensor_1,1549044126,50
sensor_2,1549044127,60
sensor_1,1549044128,70
sensor_3,1549044129,80
sensor_3,1549044130,90
sensor_3,1549044130,100


import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从文件读取数据
 */
object SourceFileMap {
  def main(args: Array[String]): Unit = {
    //1.创建执行的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.从指定路径获取数据
    val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
    val MapDS = fileDS.map(
      lines => {
        //更加逗号切割 获取每个元素
        val datas: Array[String] = lines.split(",")
        WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
      }
    )
    //3.打印
    MapDS.print()
    //4.执行
    env.execute("map")
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala._
/**
 * description: SourceList 
 * date: 2020/8/28 19:02 
 * version: 1.0
 *
 * @author 阳斌
 *         邮箱:1692207904@qq.com
 *         类的说明:从文件读取数据
 */
object Transform_MapFunction {
  def main(args: Array[String]): Unit = {
    //1.创建执行的环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //2.从指定路径获取数据
    val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
     sensorDS.map()
    //3.打印
  //  MapDS.print()
    //4.执行
    env.execute("map")
  }
  /**
   * 自定义继承 MapFunction
   * MapFunction[T,O]
   * 自定义输入和输出
   *
   */
  class MyMapFunction extends MapFunction[String,WaterSensor]{
    override def map(t: String): WaterSensor = {
      val datas: Array[String] = t.split(",")
      WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)
    }
  }
  /**
   * 定义样例类:水位传感器:用于接收空高数据
   *
   * @param id 传感器编号
   * @param ts 时间戳
   * @param vc 空高
   */
  case class WaterSensor(id: String, ts: Long, vc: Double)
}


image.png




相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 流计算
Flink从入门到入土(下)
Flink从入门到入土(下)
Flink从入门到入土(下)
|
传感器 分布式计算 Scala
Flink从入门到入土(中)
Flink从入门到入土(中)
Flink从入门到入土(中)
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
839 7
阿里云实时计算Flink在多行业的应用和实践
|
26天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
846 17
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
4月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
23天前
|
SQL 运维 数据可视化
阿里云实时计算Flink版产品体验测评
阿里云实时计算Flink基于Apache Flink构建,提供一站式实时大数据分析平台,支持端到端亚秒级实时数据分析,适用于实时大屏、实时报表、实时ETL和风控监测等场景,具备高性价比、开发效率、运维管理和企业安全等优势。
zdl
|
14天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
50 0
|
2月前
|
运维 搜索推荐 数据安全/隐私保护
阿里云实时计算Flink版测评报告
阿里云实时计算Flink版在用户行为分析与标签画像场景中表现出色,通过实时处理电商平台用户行为数据,生成用户兴趣偏好和标签,提升推荐系统效率。该服务具备高稳定性、低延迟、高吞吐量,支持按需计费,显著降低运维成本,提高开发效率。
70 1
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版