允许延迟(Allowed Lateness)
将迟到的数据放入侧输出流
Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了、本该被丢弃的数据。
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。
sideOutputLateData() 方法,传入一个输出标签,用来标记分治的迟到数据流
DataStream<Event> stream = env.addSource(...); OutputTag<Event> outputTag = new OutputTag<Event>("late") {}; stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...) .window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag) .aggregate(new MyAggregateFunction()) DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。
Flink关联维表实战
在Flink实际开发过程中,可能会遇到source 进来的数据,需要连接数据库里面的字段,再做后面的处理,比如,想要通过id获取对应的地区名字,这时候需要通过id查询地区维度表,获取具体的地区名。
对于不同的应用场景,关联维度表的方式不同
- 场景1:维度表信息基本不发生改变,或者发生改变的频率很低。
实现方案:采用Flink提供的CachedFile。
Flink提供了一个分布式缓存(CachedFile),类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在TaskManager节点中,防止task重复拉取。 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。 当程序执行,Flink自动将文件或者目录复制到所有TaskManager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从TaskManager节点的本地文件系统访问它。
val env = StreamExecutionEnvironment.getExecutionEnvironment env.registerCachedFile("/root/id2city","id2city") val socketStream = env.socketTextStream("node01",8888) val stream = socketStream.map(_.toInt) stream.map(new RichMapFunction[Int,String] { private val id2CityMap = new mutable.HashMap[Int,String]() override def open(parameters: Configuration): Unit = { val file = getRuntimeContext().getDistributedCache().getFile("id2city") val str = FileUtils.readFileUtf8(file) val strings = str.split("\r\n") for(str <- strings){ val splits = str.split(" ") val id = splits(0).toInt val city = splits(1) id2CityMap.put(id,city) } } override def map(value: Int): String = { id2CityMap.getOrElse(value,"not found city") } }).print() env.execute()
- 在集群中查看对应TaskManager的log日志,发现注册的file会被拉取到各个TaskManager的工作目录区。
- 场景2:对于维度表更新频率比较高并且对于查询维度表的实时性要求比较高
实现方案:使用定时器,定时加载外部配置文件或者数据库
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.socketTextStream("node01",8888) stream.map(new RichMapFunction[String,String] { private val map = new mutable.HashMap[String,String]() override def open(parameters: Configuration): Unit = { println("init data ...") query() val timer = new Timer(true) timer.schedule(new TimerTask { override def run(): Unit = { query() } //1s后,每隔2s执行一次 },1000,2000) } def query()={ val source = Source.fromFile("D:\\code\\StudyFlink\\data\\id2city","UTF-8") val iterator = source.getLines() for (elem <- iterator) { val vs = elem.split(" ") map.put(vs(0),vs(1)) } } override def map(key: String): String = { map.getOrElse(key,"not found city") } }).print() env.execute()
- 场景3:对于维度表更新频率高并且对于查询维度表的实时性要求高
实现方案:将更改的信息同步值Kafka配置Topic中,然后将kafka的配置流信息变成广播流,广播到业务流的各个线程中。
val env = StreamExecutionEnvironment.getExecutionEnvironment //设置连接kafka的配置信息 val props = new Properties() //注意 sparkstreaming + kafka(0.10之前版本) receiver模式 zookeeper url(元数据) props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") props.setProperty("group.id","flink-kafka-001") props.setProperty("key.deserializer",classOf[StringSerializer].getName) props.setProperty("value.deserializer",classOf[StringSerializer].getName) val consumer = new FlinkKafkaConsumer[String]("configure",new SimpleStringSchema(),props) //从topic最开始的数据读取 // consumer.setStartFromEarliest() //从最新的数据开始读取 consumer.setStartFromLatest() //动态配置信息流 val configureStream = env.addSource(consumer) //业务流 val busStream = env.socketTextStream("node01",8888) val descriptor = new MapStateDescriptor[String, String]("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) //设置广播流的数据描述信息 val broadcastStream = configureStream.broadcast(descriptor) //connect关联业务流与配置信息流,broadcastStream流中的数据会广播到下游的各个线程中 busStream.connect(broadcastStream) .process(new BroadcastProcessFunction[String,String,String] { override def processElement(line: String, ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = { val broadcast = ctx.getBroadcastState(descriptor) val city = broadcast.get(line) if(city == null){ out.collect("not found city") }else{ out.collect(city) } } //kafka中配置流信息,写入到广播流中 override def processBroadcastElement(line: String, ctx: BroadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = { val broadcast = ctx.getBroadcastState(descriptor) //kafka中的数据 val elems = line.split(" ") broadcast.put(elems(0),elems(1)) } }).print() env.execute()
Table API & Flink SQL
在Spark中有DataFrame这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。Flink也提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API以及SQL能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套API编写流式应用和批量应用,从而达到真正意义的批流统一
在 Flink 1.8 架构里,如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。 Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,阿里巴巴的 Blink 团队在这方面做了大量的工作,已经实现了 Table API & SQL 层的流批统一。阿里巴巴已经将 Blink 开源回馈给 Flink 社区。
开发环境构建
在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能,取名叫: Blink Planner。在使用Table API和SQL开发Flink应用之前,通过添加Maven的依赖配置到项目中,在本地工程中引入相应的依赖库,库中包含了Table API和SQL接口。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.9.1</version> </dependency>
Table Environment
和DataStream API一样,Table API和SQL中具有相同的基本编程模型。首先需要构建对应的TableEnviroment创建关系型编程环境,才能够在程序中使用Table API和SQL来编写应用程序,另外Table API和SQL接口可以在应用中同时使用,Flink SQL基于Apache Calcite框架实现了SQL标准协议,是构建在Table API之上的更高级接口。
首先需要在环境中创建TableEnvironment对象,TableEnvironment中提供了注册内部表、执行Flink SQL语句、注册自定义函数等功能。根据应用类型的不同,TableEnvironment创建方式也有所不同,但是都是通过调用create()方法创建
流计算环境下创建TableEnviroment:
//创建流式计算的上下文环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //创建Table API的上下文环境 val tableEvn =StreamTableEnvironment.create(env)
Table API
Table API 顾名思义,就是基于“表”(Table)的一套 API,专门为处理表而设计的,它提供了关系型编程模型,可以用来处理结构化数据,支持表和视图的概念。在此基础上,Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来,我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了,非常实用。
下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从CSV文件中读取数据,然后执行简单的查询并将结果写入到另一个CSV文件中。
首先我们需要导入maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> </dependency>
代码示例如下:
import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; public class TableAPIExample { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); DataSet<Tuple2<String, Integer>> data = env.readCsvFile("input.csv") .includeFields("11") .types(String.class, Integer.class); Table table = tableEnv.fromDataSet(data, "name, age"); tableEnv.createTemporaryView("people", table); Table result = tableEnv.sqlQuery("SELECT name, age FROM people WHERE age > 30"); DataSet<Tuple2<String, Integer>> output = tableEnv.toDataSet(result, Tuple2.class); output.writeAsCsv("output.csv"); env.execute(); } }
在这个例子中,使用readCsvFile
方法从CSV文件中读取数据,并使用includeFields
和types
方法指定要包含的字段和字段类型。接下来,使用fromDataSet
方法将数据集转换为表,并使用createTemporaryView
方法创建一个临时视图。然后,使用sqlQuery
方法执行SQL查询,并使用toDataSet
方法将结果转换为数据集。最后,使用writeAsCsv
方法将结果写入到CSV文件中,并使用execute
方法启动执行。
除了上面这种写法外,我们还有下面2种写法:
//这里每个方法的参数都是一个“表达式”(Expression),用方法调用的形式直观地说明 //“$”符号用来指定表中的一个字段。代码和直接执行SQL是等效的。 Table maryClickTable = eventTable.where($("user").isEqual("Alice")).select($("url"),$("user")) //这其实是一种简略的写法,我们将 Table 对象名 eventTable 直接以字符串拼接的形式添加到 SQL 语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视图的步骤。 Table clickTable = tableEnvironment.sqlQuery("select url, user from " +eventTable);
Virtual Tables(虚拟表)
在环境中注册之后,我们就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
得到的 newTable 是一个中间转换结果,如果之后又希望直接使用这个表执行 SQL,又该怎么做呢?由于 newTable 是一个 Table 对象,并没有在表环境中注册;所以我们还需要将这个中间结果表注册到环境中,才能在 SQL 中使用:
tableEnv.createTemporaryView("NewTable", newTable);
这里的注册其实是创建了一个“虚拟表”(Virtual Table)。这个概念与 SQL 语法中的视图(View)非常类似,所以调用的方法也叫作创建“虚拟视图” (createTemporaryView)。
表流互转
// 将表转换成数据流,并打印 tableEnv.toDataStream(aliceVisitTable).print(); // 将数据流转换成表。 // 另外,我们还可以在 fromDataStream()方法中增加参数,用来指定提取哪些属性作为表中的字段名,并可以任意指定位置: Table eventTable2 = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),$("url"));
动态表和持续查询
在Flink中,动态表(Dynamic Tables)是一种特殊的表,它可以随时间变化。它们通常用于表示无限流数据,例如事件流或服务器日志。与静态表不同,动态表可以在运行时插入、更新和删除行。
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作持续查询(Continuous Query)。
下面是一个简单的例子,它使用Java编写了一个Flink程序,该程序使用Table API从Kafka主题中读取数据,然后执行持续查询并将结果写入到另一个Kafka主题中。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class DynamicTableExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.executeSql("CREATE TABLE input (" + " name STRING," + " age INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'input-topic'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'format' = 'json'" + ")"); tableEnv.executeSql("CREATE TABLE output (" + " name STRING," + " age INT" + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'output-topic'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'format' = 'json'" + ")"); Table result = tableEnv.sqlQuery("SELECT name, age FROM input WHERE age > 30"); tableEnv.toAppendStream(result, Row.class).print(); result.executeInsert("output"); env.execute(); } }
在这个例子中,首先创建了一个StreamExecutionEnvironment
来设置执行环境,并使用StreamTableEnvironment.create
方法创建了一个StreamTableEnvironment
。然后,使用executeSql
方法创建了两个Kafka表:一个用于读取输入数据,另一个用于写入输出数据。接下来,使用sqlQuery
方法执行持续查询,并使用toAppendStream
方法将结果转换为数据流。最后,使用executeInsert
方法将结果写入到输出表中,并使用execute
方法启动执行。