另外,你需要为 Flink 的 Scala 批处理或流式 API 添加依赖项。对于批量查询,您需要添加:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
2. Flink SQL 实战案例
1) 批数据 SQL
用法:
- 构建 Table 运行环境
- 将 DataSet 注册为一张表
- 使用 Table 运行环境的 sqlQuery 方法来执行 SQL 语句
示例:使用 Flink SQL 统计用户消费订单的总金额、最大金额、最小金额、订单总数。
订单 id | 用户名 | 订单日期 | 消费金额 |
1 | Zhangsan | 2018-10-20 15:30 | 358.5 |
测试数据(订单 ID、用户名、订单日期、订单金额):
Order(1, "zhangsan", "2018-10-20 15:30", 358.5), Order(2, "zhangsan", "2018-10-20 16:30", 131.5), Order(3, "lisi", "2018-10-20 16:30", 127.5), Order(4, "lisi", "2018-10-20 16:30", 328.5), Order(5, "lisi", "2018-10-20 16:30", 432.5), Order(6, "zhaoliu", "2018-10-20 22:30", 451.0), Order(7, "zhaoliu", "2018-10-20 22:30", 362.0), Order(8, "zhaoliu", "2018-10-20 22:30", 364.0), Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
步骤:
- 获取一个批处理运行环境
- 获取一个 Table 运行环境
- 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额)
- 基于本地 Order 集合创建一个 DataSet source
- 使用 Table 运行环境将 DataSet 注册为一张表
- 使用 SQL 语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数)
- 使用 TableEnv.toDataSet 将 Table 转换为 DataSet
- 打印测试
示例代码:
import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.table.api.{Table, TableEnvironment} import org.apache.flink.table.api.scala.BatchTableEnvironment import org.apache.flink.api.scala._ import org.apache.flink.types.Row /** * 使用Flink SQL统计用户消费订单的总金额、最大金额、最小金额、订单总数。 */ object BatchFlinkSqlDemo { //3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额) case class Order(id:Int, userName:String, createTime:String, money:Double) def main(args: Array[String]): Unit = { /** * 实现思路: * 1. 获取一个批处理运行环境 * 2. 获取一个Table运行环境 * 3. 创建一个样例类 Order 用来映射数据(订单名、用户名、订单日期、订单金额) * 4. 基于本地 Order 集合创建一个DataSet source * 5. 使用Table运行环境将DataSet注册为一张表 * 6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数) * 7. 使用TableEnv.toDataSet将Table转换为DataSet * 8. 打印测试 */ //1. 获取一个批处理运行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //2. 获取一个Table运行环境 val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env) //4. 基于本地 Order 集合创建一个DataSet source val orderDataSet: DataSet[Order] = env.fromElements( Order(1, "zhangsan", "2018-10-20 15:30", 358.5), Order(2, "zhangsan", "2018-10-20 16:30", 131.5), Order(3, "lisi", "2018-10-20 16:30", 127.5), Order(4, "lisi", "2018-10-20 16:30", 328.5), Order(5, "lisi", "2018-10-20 16:30", 432.5), Order(6, "zhaoliu", "2018-10-20 22:30", 451.0), Order(7, "zhaoliu", "2018-10-20 22:30", 362.0), Order(8, "zhaoliu", "2018-10-20 22:30", 364.0), Order(9, "zhaoliu", "2018-10-20 22:30", 341.0) ) //5. 使用Table运行环境将DataSet注册为一张表 tabEnv.registerDataSet("t_order", orderDataSet) //6. 使用SQL语句来操作数据(统计用户消费订单的总金额、最大金额、最小金额、订单总数) //用户消费订单的总金额、最大金额、最小金额、订单总数。 val sql = """ | select | userName, | sum(money) totalMoney, | max(money) maxMoney, | min(money) minMoney, | count(1) totalCount | from t_order | group by userName |""".stripMargin //在scala中stripMargin默认是“|”作为多行连接符 //7. 使用TableEnv.toDataSet将Table转换为DataSet val table: Table = tabEnv.sqlQuery(sql) table.printSchema() tabEnv.toDataSet[Row](table).print() } }
2) 流数据 SQL
流处理中也可以支持 SQL。但是需要注意以下几点:
- 要使用流处理的 SQL,必须要添加水印时间
- 使用 registerDataStream 注册表的时候,使用 ' 来指定字段
- 注册表的时候,必须要指定一个 rowtime,否则无法在 SQL 中使用窗口
- 必须要导入 import org.apache.flink.table.api.scala._ 隐式参数
- SQL 中使用 trumble(时间列名, interval '时间' sencond) 来进行定义窗口
示例:使用 Flink SQL 来统计 5 秒内 用户的 订单总数、订单的最大金额、订单的最小金额。
步骤
- 获取流处理运行环境
- 获取 Table 运行环境
- 设置处理时间为 EventTime
- 创建一个订单样例类 Order ,包含四个字段(订单 ID、用户 ID、订单金额、时间戳)
- 创建一个自定义数据源
- 使用 for 循环生成 1000 个订单
- 随机生成订单 ID(UUID)
- 随机生成用户 ID(0-2)
- 随机生成订单金额(0-100)
- 时间戳为当前系统时间
- 每隔 1 秒生成一个订单
- 添加水印,允许延迟 2 秒
- 导入 import org.apache.flink.table.api.scala._ 隐式参数
- 使用 registerDataStream 注册表,并分别指定字段,还要指定 rowtime 字段
- 编写 SQL 语句统计用户订单总数、最大金额、最小金额
分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口 - 使用 tableEnv.sqlQuery 执行 sql 语句
- 将 SQL 的执行结果转换成 DataStream 再打印出来
- 启动流处理程序
示例代码:
import java.util.UUID import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.{Table, TableEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.types.Row import scala.util.Random /** * 需求: * 使用Flink SQL来统计5秒内 用户的 订单总数、订单的最大金额、订单的最小金额 * * timestamp是关键字不能作为字段的名字(关键字不能作为字段名字) */ object StreamFlinkSqlDemo { /** * 1. 获取流处理运行环境 * 2. 获取Table运行环境 * 3. 设置处理时间为 EventTime * 4. 创建一个订单样例类 Order ,包含四个字段(订单ID、用户ID、订单金额、时间戳) * 5. 创建一个自定义数据源 * 使用for循环生成1000个订单 * 随机生成订单ID(UUID) * 随机生成用户ID(0-2) * 随机生成订单金额(0-100) * 时间戳为当前系统时间 * 每隔1秒生成一个订单 * 6. 添加水印,允许延迟2秒 * 7. 导入 import org.apache.flink.table.api.scala._ 隐式参数 * 8. 使用 registerDataStream 注册表,并分别指定字段,还要指定rowtime字段 * 9. 编写SQL语句统计用户订单总数、最大金额、最小金额 * 分组时要使用 tumble(时间列, interval '窗口时间' second) 来创建窗口 * 10. 使用 tableEnv.sqlQuery 执行sql语句 * 11. 将SQL的执行结果转换成DataStream再打印出来 * 12. 启动流处理程序 */ // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳) case class Order(orderId:String, userId:Int, money:Long, createTime:Long) def main(args: Array[String]): Unit = { // 1. 创建流处理运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 2. 设置处理时间为`EventTime` env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //获取table的运行环境 val tableEnv = TableEnvironment.getTableEnvironment(env) // 4. 创建一个自定义数据源 val orderDataStream = env.addSource(new RichSourceFunction[Order] { var isRunning = true override def run(ctx: SourceFunction.SourceContext[Order]): Unit = { // - 随机生成订单ID(UUID) // - 随机生成用户ID(0-2) // - 随机生成订单金额(0-100) // - 时间戳为当前系统时间 // - 每隔1秒生成一个订单 for (i <- 0 until 1000 if isRunning) { val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101), System.currentTimeMillis()) TimeUnit.SECONDS.sleep(1) ctx.collect(order) } } override def cancel(): Unit = { isRunning = false } }) // 5. 添加水印,允许延迟2秒 val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(2)) { override def extractTimestamp(element: Order): Long = { val eventTime = element.createTime eventTime } } ) // 6. 导入`import org.apache.flink.table.api.scala._`隐式参数 // 7. 使用`registerDataStream`注册表,并分别指定字段,还要指定rowtime字段 import org.apache.flink.table.api.scala._ tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId, 'userId, 'money,'createTime.rowtime) // 8. 编写SQL语句统计用户订单总数、最大金额、最小金额 // - 分组时要使用`tumble(时间列, interval '窗口时间' second)`来创建窗口 val sql = """ |select | userId, | count(1) as totalCount, | max(money) as maxMoney, | min(money) as minMoney | from | t_order | group by | tumble(createTime, interval '5' second), | userId """.stripMargin // 9. 使用`tableEnv.sqlQuery`执行sql语句 val table: Table = tableEnv.sqlQuery(sql) // 10. 将SQL的执行结果转换成DataStream再打印出来 table.toRetractStream[Row].print() env.execute("StreamSQLApp") } }