3.7 connect
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_Connect { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.转换成样例类 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) // 4. 从集合中再读取一条流 val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6)) val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS) // coMap表示连接流调用的map,各自都需要一个 function resultCS.map( sensor=>sensor.id, num=>num+1 ).print() // 4. 执行 env.execute() } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.8 union
对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream
connect与 union 区别:
- union之前两个流的类型必须是一样,connect可以不一样
- connect只能操作两个流,union可以操作多个。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:FlatMap */ object Transform_Union { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2. 从集合中读取流 val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4)) val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10)) val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110)) // TODO union 真正将多条流合并成一条流 // 合并的流,类型必须一致 // 可以合并多条流,只要类型一致 num1DS.union(num2DS).union(num3DS) .print() // 4. 执行 env.execute() } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.9 Operator
Flink作为计算框架,主要应用于数据计算处理上, 所以在keyBy对数据进行分流后,可以对数据进行相应的统计分析
3.9.1 滚动聚合算子(Rolling Aggregation)
这些算子可以针对KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
sum()
3.9.2 reduce
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:Reduce */ object Transform_Reduce { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.转换成样例类 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id) // 输入的类型一样,输出类型和输出类型也要一样 // 组内的第一条数据,不进入reduce计算 val reduceDS: DataStream[WaterSensor] = sensorKS.reduce( (ws1, ws2) => { println(ws1 + "<===>" + ws2) WaterSensor(ws1.id, System.currentTimeMillis(), ws1.vc + ws2.vc) } ) reduceDS.print("reduce") // 4. 执行 env.execute() } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.9.3process
Flink在数据流通过keyBy进行分流处理后,如果想要处理过程中获取环境相关信息,可以采用process算子自定义实现 1)继承KeyedProcessFunction抽象类,并定义泛型:
[KEY, IN, OUT]
class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String]{}
- 重写方法
// 自定义KeyedProcessFunction,是一个特殊的富函数 // 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型 // 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑 /** * 处理逻辑:来一条处理一条 * * @param value 一条数据 * @param ctx 上下文对象 * @param out 采集器:收集数据,并输出 */ override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = { out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value) // 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple // ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple }
完整代码:
import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 阳斌 * 邮箱:1692207904@qq.com * 类的说明:Reduce */ object Transform_Process { def main(args: Array[String]): Unit = { // 1.创建执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.读取数据 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.转换成样例类 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) //按照ID 进行分组 val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id) sensorKS.process(new MyKeyedProcessFunction) // 4. 执行 env.execute() } // 自定义KeyedProcessFunction,是一个特殊的富函数 // 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型 // 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑 class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] { /** * 处理逻辑:来一条处理一条 * * @param value 一条数据 * @param ctx 上下文对象 * @param out 采集器:收集数据,并输出 */ override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = { out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value) // 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple // ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple } } /** * 定义样例类:水位传感器:用于接收空高数据 * * @param id 传感器编号 * @param ts 时间戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
4.Sink
Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作
之前我们一直在使用的print方法其实就是一种Sink。
@PublicEvolving public DataStreamSink<T> print(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false); return this.addSink(printFunction).name("Print to Std. Out"); }
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink