在执行flink sql的时候报错,有见过的吗 Flink1.9的 org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 121 to line 1, column 157: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<VARCHAR(65536)>, )'. Supported form(s): 'TUMBLE( , <DATETIME_INTERVAL>)' 'TUMBLE( , <DATETIME_INTERVAL>, )' 我指定了时间戳也不行, #Flink
Checkpoint介绍
checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。
每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理
3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。
每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。
当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败
如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示
两个输入源 checkpoint 过程
假设算子C有A和B两个输入源
在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。
当输入源B发出的barrier到来时,算子C制作自身快照并向CheckpointCoordinator报 告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。
当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某 个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被 处理一次(Exactly Once)。
持久化存储
目前,Checkpoint持久化存储可以使用如下三种:
MemStateBackend
该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及
快照的数据量非常小时使用,并不推荐用作大规模商业部署。
FsStateBackend
该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。
RocksDBStateBackend
RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。
如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自 定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多 个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用 的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。
语法
val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // 设置checkpoint的执行模式,最多执行一次或者至少执行一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 设置checkpoint的超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) //设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
例子
需求
假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理
数据规划
使用自定义算子每秒钟产生大约10000条数据。 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。 数据经统计后,统计结果打印到终端输出。 打印输出的结果为Long类型的数据。 开发思路
source算子每隔1秒钟发送10000条数据,并注入到Window算子中。
window算子每隔1秒钟统计一次最近4秒钟内数据数量。
每隔1秒钟将统计结果打印到终端
每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。
//发送数据形式 case class SEvent(id: Long, name: String, info: String, count: Int)
class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // 任务取消时调用 override def cancel(): Unit = { isRunning = false } //// source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } }
/** 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */ object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }
//该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。 // 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }
//该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L
// window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 从自定义快照中恢复状态 override def restoreState(state: util.List[UDFState]): Unit = { val udfState = state.get(0) total = udfState.getState }
// 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } }
flink-SQL
Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:
org.apache.flink flink-table_2.11 1.5.0 另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:
org.apache.flink flink-scala_2.11 1.5.0
Table API和SQL程序的结构
Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;
所以我们只需要使用一种来演示即可
要想执行flink的SQL语句,首先需要获取SQL的执行环境:
两种方式(batch和streaming):
// *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) 通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:
在内部目录中注册一个表 注册外部目录 执行SQL查询 注册用户定义的(标量,表格或聚合)函数 转换DataStream或DataSet成Table 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment
在内部目录中注册一个表
TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。
输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统
输入表可以从各种来源注册:
现有Table对象,通常是表API或SQL查询的结果。 TableSource,它访问外部数据,例如文件,数据库或消息传递系统。 DataStream或DataSet来自DataStream或DataSet程序。 输出表可以使用注册TableSink。
注册一个表
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable)
// Table is the result of a simple projection query val projTable: Table = tableEnv.scan("projectedTable ").select(...) 注册一个tableSource
TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...)
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource) 注册一个tableSink
注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...)
// get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) 例子
//创建batch执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //创建table环境用于batch查询 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //加载外部数据 val csvTableSource = CsvTableSource.builder() .path("data1.csv")//文件路径 .field("id" , Types.INT)//第一列数据 .field("name" , Types.STRING)//第二列数据 .field("age" , Types.INT)//第三列数据 .fieldDelimiter(",")//列分隔符,默认是"," .lineDelimiter("\n")//换行符 .ignoreFirstLine()//忽略第一行 .ignoreParseErrors()//忽略解析错误 .build() //将外部数据构建成表 tableEnvironment.registerTableSource("tableA" , csvTableSource) //TODO 1:使用table方式查询数据 val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'") //将数据写出去 table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) //TODO 2:使用sql方式 // val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2") //// //将数据写出去 // sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)) able和DataStream和DataSet的集成
1:将DataStream或DataSet转换为表格
在上面的例子讲解中,直接使用的是:registerTableSource注册表
对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。
然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据
语法:
// get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) 例子
object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = {
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
//构造数据
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(3L, "rubber", 2)))
val orderB: DataStream[Order] = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(2L, "rubber", 3),
Order(4L, "beer", 1)))
// 根据数据注册表
tEnv.registerDataStream("OrderA", orderA)
tEnv.registerDataStream("OrderB", orderB)
// union the two tables
val result = tEnv.sqlQuery(
"SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2")
result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
env.execute()
} } case class Order(user: Long, product: String, amount: Int)
将表转换为DataStream或DataSet
A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序
1:将表转换为DataStream
有两种模式可以将 Table转换为DataStream:
1:Append Mode
将一个表附加到流上
2:Retract Mode
将表转换为流
语法格式:
// get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age) val table: Table = ...
// convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStreamRow
// convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream(String, Int)
// convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStreamRow 例子:
object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World"))
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
val stream = env.fromCollection(data)
val table: Table = tEnv.fromDataStream(stream)
//TODO 将table转换为DataStream----[数控等离子切割机](http://www.158cnc.com)[http://www.158cnc.com](http://www.158cnc.com)将一个表附加到流上Append Mode
val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table)
//TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息
val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table)
retractStream.print()
env.execute()
} }
case class Peoject(user: Long, index: Int, content: String)
将表转换为DataSet
语法格式
// get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age) val table: Table = ...
// convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSetRow
// convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet(String, Int) 例子:
case class Peoject(user: Long, index: Int, content: String)
object TableTODataSet{ def main(args: Array[String]): Unit = {
//构造数据,转换为table
val data = List(
Peoject(1L, 1, "Hello"),
Peoject(2L, 2, "Hello"),
Peoject(3L, 3, "Hello"),
Peoject(4L, 4, "Hello"),
Peoject(5L, 5, "Hello"),
Peoject(6L, 6, "Hello"),
Peoject(7L, 7, "Hello World"),
Peoject(8L, 8, "Hello World"),
Peoject(8L, 8, "Hello World"),
Peoject(20L, 20, "Hello World"))
//初始化环境,加载table数据
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnvironment = TableEnvironment.getTableEnvironment(env)
val collection: DataSet[Peoject] = env.fromCollection(data)
val table: Table = tableEnvironment.fromDataSet(collection)
//TODO 将table转换为dataSet
val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table)
toDataSet.print()
// env.execute() } }
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。