大数据中必须要掌握的 Flink SQL 详细剖析 (二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

另外,你需要为 Flink 的 Scala 批处理或流式 API 添加依赖项。对于批量查询,您需要添加:


<dependency>
  <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>


2. Flink SQL 实战案例



1) 批数据 SQL


用法:


  1. 构建 Table 运行环境
  2. 将 DataSet 注册为一张表
  3. 使用 Table 运行环境的 sqlQuery 方法来执行 SQL 语句


示例:使用 Flink SQL 统计用户消费订单的总金额、最大金额、最小金额、订单总数。


订单 id 用户名 订单日期 消费金额
1 Zhangsan 2018-10-20 15:30 358.5


测试数据(订单 ID、用户名、订单日期、订单金额):


Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)


步骤:


  1. 获取一个批处理运行环境
  2. 获取一个 Table 运行环境
  3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
  4. 基于本地 Order 集合创建一个 DataSet source
  5. 使用 Table 运行环境将 DataSet 注册为一张表
  6. 使用 SQL 语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
  7. 使用 TableEnv.toDataSet 将 Table 转换为 DataSet
  8. 打印测试


示例代码


import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
/**
 * 使用Flink SQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。
 */
object BatchFlinkSqlDemo {
  //3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
  case class Order(id:Int, userName:String, createTime:String, money:Double)
  def main(args: Array[String]): Unit = {
    /**
     * 实现思路:
     * 1. 获取一个批处理运行环境
     * 2. 获取一个Table运行环境
     * 3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
     * 4. 基于本地 Order 集合创建一个DataSet source
     * 5. 使用Table运行环境将DataSet注册为一张表
     * 6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
     * 7. 使用TableEnv.toDataSet将Table转换为DataSet
     * 8. 打印测试
     */
    //1. 获取一个批处理运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //2. 获取一个Table运行环境
    val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 基于本地 Order 集合创建一个DataSet source
    val orderDataSet: DataSet[Order] = env.fromElements(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    )
    //5. 使用Table运行环境将DataSet注册为一张表
    tabEnv.registerDataSet("t_order", orderDataSet)
    //6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    //用户消费订单的总金额、最大金额、最小金额、订单总数。
    val sql =
      """
        | select
        |   userName,
        |   sum(money) totalMoney,
        |   max(money) maxMoney,
        |   min(money) minMoney,
        |   count(1) totalCount
        |  from t_order
        |  group by userName
        |""".stripMargin  //在scala中stripMargin默认是“|”作为多行连接符
    //7. 使用TableEnv.toDataSet将Table转换为DataSet
    val table: Table = tabEnv.sqlQuery(sql)
    table.printSchema()
    tabEnv.toDataSet[Row](table).print()
  }
}


2) 流数据 SQL


流处理中也可以支持 SQL。但是需要注意以下几点:


  1. 要使用流处理的 SQL,必须要添加水印时间
  2. 使用 registerDataStream 注册表的时候,使用 ' 来指定字段
  3. 注册表的时候,必须要指定一个 rowtime,否则无法在 SQL 中使用窗口
  4. 必须要导入 import org.apache.flink.table.api.scala._ 隐式参数
  5. SQL 中使用 trumble(时间列名, interval '时间' sencond) 来进行定义窗口


示例:使用 Flink SQL 来统计 5 秒内 用户的 订单总数、订单的最大金额、订单的最小金额。


步骤


  1. 获取流处理运行环境
  2. 获取 Table 运行环境
  3. 设置处理时间为 EventTime
  4. 创建一个订单样例类 Order ,包含四个字段(订单 ID、用户 ID、订单金额、时间戳)
  5. 创建一个自定义数据源
  • 使用 for 循环生成 1000 个订单
  • 随机生成订单 ID(UUID)
  • 随机生成用户 ID(0-2)
  • 随机生成订单金额(0-100)
  • 时间戳为当前系统时间
  • 每隔 1 秒生成一个订单
  1. 添加水印,允许延迟 2 秒
  2. 导入 import org.apache.flink.table.api.scala._ 隐式参数
  3. 使用 registerDataStream 注册表,并分别指定字段,还要指定 rowtime 字段
  4. 编写 SQL 语句统计用户订单总数、最大金额、最小金额
    分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
  5. 使用 tableEnv.sqlQuery 执行 sql 语句
  6. 将 SQL 的执行结果转换成 DataStream 再打印出来
  7. 启动流处理程序


示例代码


import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.types.Row
import scala.util.Random
/**
 * 需求:
 *  使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额
 *
 *  timestamp是关键字不能作为字段的名字(关键字不能作为字段名字)
 */
object StreamFlinkSqlDemo {
    /**
     *  1. 获取流处理运行环境
     * 2. 获取Table运行环境
     * 3. 设置处理时间为 EventTime
     * 4. 创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳)
     * 5. 创建一个自定义数据源
     *    使用for循环生成1000个订单
     *    随机生成订单ID(UUID)
     *    随机生成用户ID(0-2)
     *    随机生成订单金额(0-100)
     *    时间戳为当前系统时间
     *    每隔1秒生成一个订单
     * 6. 添加水印,允许延迟2秒
     * 7. 导入 import org.apache.flink.table.api.scala._ 隐式参数
     * 8. 使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段
     * 9. 编写SQL语句统计用户订单总数、最大金额、最小金额
     * 分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
     * 10. 使用 tableEnv.sqlQuery 执行sql语句
     * 11. 将SQL的执行结果转换成DataStream再打印出来
     * 12. 启动流处理程序
     */
    // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
    case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
    def main(args: Array[String]): Unit = {
      // 1. 创建流处理运行环境
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      // 2. 设置处理时间为`EventTime`
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      //获取table的运行环境
      val tableEnv = TableEnvironment.getTableEnvironment(env)
      // 4. 创建一个自定义数据源
      val orderDataStream = env.addSource(new RichSourceFunction[Order] {
        var isRunning = true
        override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
          // - 随机生成订单ID(UUID)
          // - 随机生成用户ID(0-2)
          // - 随机生成订单金额(0-100)
          // - 时间戳为当前系统时间
          // - 每隔1秒生成一个订单
          for (i <- 0 until 1000 if isRunning) {
            val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
              System.currentTimeMillis())
            TimeUnit.SECONDS.sleep(1)
            ctx.collect(order)
          }
        }
        override def cancel(): Unit = { isRunning = false }
      })
      // 5. 添加水印,允许延迟2秒
      val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
          override def extractTimestamp(element: Order): Long = {
            val eventTime = element.createTime
            eventTime
          }
        }
      )
      // 6. 导入`import org.apache.flink.table.api.scala._`隐式参数
      // 7. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段
      import org.apache.flink.table.api.scala._
      tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
      // 8. 编写SQL语句统计用户订单总数、最大金额、最小金额
      // - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口
      val sql =
      """
        |select
        | userId,
        | count(1) as totalCount,
        | max(money) as maxMoney,
        | min(money) as minMoney
        | from
        | t_order
        | group by
        | tumble(createTime, interval '5' second),
        | userId
      """.stripMargin
      // 9. 使用`tableEnv.sqlQuery`执行sql语句
      val table: Table = tableEnv.sqlQuery(sql)
      // 10. 将SQL的执行结果转换成DataStream再打印出来
      table.toRetractStream[Row].print()
      env.execute("StreamSQLApp")
    }
}
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
26天前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
90 15
|
28天前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
56 2
|
28天前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
31 1
|
1月前
|
SQL JSON 分布式计算
ODPS SQL ——列转行、行转列这回让我玩明白了!
本文详细介绍了在MaxCompute中如何使用TRANS_ARRAY和LATERAL VIEW EXPLODE函数来实现列转行的功能。
|
27天前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
26 0
|
2月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
45 0
|
2月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
42 0
|
大数据 流计算
大数据—Flink深入学习
1.容错机制
134 0
|
11天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
下一篇
无影云桌面