大数据中必须要掌握的 Flink SQL 详细剖析 (二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

另外,你需要为 Flink 的 Scala 批处理或流式 API 添加依赖项。对于批量查询,您需要添加:


<dependency>
  <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>


2. Flink SQL 实战案例



1) 批数据 SQL


用法:


  1. 构建 Table 运行环境
  2. 将 DataSet 注册为一张表
  3. 使用 Table 运行环境的 sqlQuery 方法来执行 SQL 语句


示例:使用 Flink SQL 统计用户消费订单的总金额、最大金额、最小金额、订单总数。


订单 id 用户名 订单日期 消费金额
1 Zhangsan 2018-10-20 15:30 358.5


测试数据(订单 ID、用户名、订单日期、订单金额):


Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)


步骤:


  1. 获取一个批处理运行环境
  2. 获取一个 Table 运行环境
  3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
  4. 基于本地 Order 集合创建一个 DataSet source
  5. 使用 Table 运行环境将 DataSet 注册为一张表
  6. 使用 SQL 语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
  7. 使用 TableEnv.toDataSet 将 Table 转换为 DataSet
  8. 打印测试


示例代码


import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
/**
 * 使用Flink SQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。
 */
object BatchFlinkSqlDemo {
  //3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
  case class Order(id:Int, userName:String, createTime:String, money:Double)
  def main(args: Array[String]): Unit = {
    /**
     * 实现思路:
     * 1. 获取一个批处理运行环境
     * 2. 获取一个Table运行环境
     * 3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
     * 4. 基于本地 Order 集合创建一个DataSet source
     * 5. 使用Table运行环境将DataSet注册为一张表
     * 6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
     * 7. 使用TableEnv.toDataSet将Table转换为DataSet
     * 8. 打印测试
     */
    //1. 获取一个批处理运行环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //2. 获取一个Table运行环境
    val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 基于本地 Order 集合创建一个DataSet source
    val orderDataSet: DataSet[Order] = env.fromElements(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    )
    //5. 使用Table运行环境将DataSet注册为一张表
    tabEnv.registerDataSet("t_order", orderDataSet)
    //6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
    //用户消费订单的总金额、最大金额、最小金额、订单总数。
    val sql =
      """
        | select
        |   userName,
        |   sum(money) totalMoney,
        |   max(money) maxMoney,
        |   min(money) minMoney,
        |   count(1) totalCount
        |  from t_order
        |  group by userName
        |""".stripMargin  //在scala中stripMargin默认是“|”作为多行连接符
    //7. 使用TableEnv.toDataSet将Table转换为DataSet
    val table: Table = tabEnv.sqlQuery(sql)
    table.printSchema()
    tabEnv.toDataSet[Row](table).print()
  }
}


2) 流数据 SQL


流处理中也可以支持 SQL。但是需要注意以下几点:


  1. 要使用流处理的 SQL,必须要添加水印时间
  2. 使用 registerDataStream 注册表的时候,使用 ' 来指定字段
  3. 注册表的时候,必须要指定一个 rowtime,否则无法在 SQL 中使用窗口
  4. 必须要导入 import org.apache.flink.table.api.scala._ 隐式参数
  5. SQL 中使用 trumble(时间列名, interval '时间' sencond) 来进行定义窗口


示例:使用 Flink SQL 来统计 5 秒内 用户的 订单总数、订单的最大金额、订单的最小金额。


步骤


  1. 获取流处理运行环境
  2. 获取 Table 运行环境
  3. 设置处理时间为 EventTime
  4. 创建一个订单样例类 Order ,包含四个字段(订单 ID、用户 ID、订单金额、时间戳)
  5. 创建一个自定义数据源
  • 使用 for 循环生成 1000 个订单
  • 随机生成订单 ID(UUID)
  • 随机生成用户 ID(0-2)
  • 随机生成订单金额(0-100)
  • 时间戳为当前系统时间
  • 每隔 1 秒生成一个订单
  1. 添加水印,允许延迟 2 秒
  2. 导入 import org.apache.flink.table.api.scala._ 隐式参数
  3. 使用 registerDataStream 注册表,并分别指定字段,还要指定 rowtime 字段
  4. 编写 SQL 语句统计用户订单总数、最大金额、最小金额
    分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
  5. 使用 tableEnv.sqlQuery 执行 sql 语句
  6. 将 SQL 的执行结果转换成 DataStream 再打印出来
  7. 启动流处理程序


示例代码


import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.types.Row
import scala.util.Random
/**
 * 需求:
 *  使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额
 *
 *  timestamp是关键字不能作为字段的名字(关键字不能作为字段名字)
 */
object StreamFlinkSqlDemo {
    /**
     *  1. 获取流处理运行环境
     * 2. 获取Table运行环境
     * 3. 设置处理时间为 EventTime
     * 4. 创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳)
     * 5. 创建一个自定义数据源
     *    使用for循环生成1000个订单
     *    随机生成订单ID(UUID)
     *    随机生成用户ID(0-2)
     *    随机生成订单金额(0-100)
     *    时间戳为当前系统时间
     *    每隔1秒生成一个订单
     * 6. 添加水印,允许延迟2秒
     * 7. 导入 import org.apache.flink.table.api.scala._ 隐式参数
     * 8. 使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段
     * 9. 编写SQL语句统计用户订单总数、最大金额、最小金额
     * 分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口
     * 10. 使用 tableEnv.sqlQuery 执行sql语句
     * 11. 将SQL的执行结果转换成DataStream再打印出来
     * 12. 启动流处理程序
     */
    // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
    case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
    def main(args: Array[String]): Unit = {
      // 1. 创建流处理运行环境
      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      // 2. 设置处理时间为`EventTime`
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      //获取table的运行环境
      val tableEnv = TableEnvironment.getTableEnvironment(env)
      // 4. 创建一个自定义数据源
      val orderDataStream = env.addSource(new RichSourceFunction[Order] {
        var isRunning = true
        override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
          // - 随机生成订单ID(UUID)
          // - 随机生成用户ID(0-2)
          // - 随机生成订单金额(0-100)
          // - 时间戳为当前系统时间
          // - 每隔1秒生成一个订单
          for (i <- 0 until 1000 if isRunning) {
            val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
              System.currentTimeMillis())
            TimeUnit.SECONDS.sleep(1)
            ctx.collect(order)
          }
        }
        override def cancel(): Unit = { isRunning = false }
      })
      // 5. 添加水印,允许延迟2秒
      val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) {
          override def extractTimestamp(element: Order): Long = {
            val eventTime = element.createTime
            eventTime
          }
        }
      )
      // 6. 导入`import org.apache.flink.table.api.scala._`隐式参数
      // 7. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段
      import org.apache.flink.table.api.scala._
      tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime)
      // 8. 编写SQL语句统计用户订单总数、最大金额、最小金额
      // - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口
      val sql =
      """
        |select
        | userId,
        | count(1) as totalCount,
        | max(money) as maxMoney,
        | min(money) as minMoney
        | from
        | t_order
        | group by
        | tumble(createTime, interval '5' second),
        | userId
      """.stripMargin
      // 9. 使用`tableEnv.sqlQuery`执行sql语句
      val table: Table = tableEnv.sqlQuery(sql)
      // 10. 将SQL的执行结果转换成DataStream再打印出来
      table.toRetractStream[Row].print()
      env.execute("StreamSQLApp")
    }
}
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
5天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
27 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
6天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
32 1
|
27天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
51 1
|
25天前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
|
26天前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
105 0
|
26天前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
5天前
|
数据采集 监控 数据管理
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第26天】随着信息技术的发展,数据成为企业核心资源。本文探讨大数据平台的搭建与数据质量管理,包括选择合适架构、数据处理与分析能力、数据质量标准与监控机制、数据清洗与校验及元数据管理,为企业数据治理提供参考。
28 1
|
27天前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
46 3
|
2天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
22 1
|
4天前
|
数据采集 分布式计算 大数据
数据治理之道:大数据平台的搭建与数据质量管理
【10月更文挑战第27天】在数字化时代,数据治理对于确保数据资产的保值增值至关重要。本文探讨了大数据平台的搭建和数据质量管理的重要性及实践方法。大数据平台应包括数据存储、处理、分析和展示等功能,常用工具如Hadoop、Apache Spark和Flink。数据质量管理则涉及数据的准确性、一致性和完整性,通过建立数据质量评估和监控体系,确保数据分析结果的可靠性。企业应设立数据治理委员会,投资相关工具和技术,提升数据治理的效率和效果。
22 2