开发者社区> 问答> 正文

在执行flink sql的时候报错,有见过的吗 Flink1.9的 #Flink

在执行flink sql的时候报错,有见过的吗 Flink1.9的 org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 121 to line 1, column 157: Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<VARCHAR(65536)>, )'. Supported form(s): 'TUMBLE( , <DATETIME_INTERVAL>)' 'TUMBLE( , <DATETIME_INTERVAL>, )' 我指定了时间戳也不行, #Flink

展开
收起
黄一刀 2020-06-15 20:17:58 2927 0
2 条回答
写回答
取消 提交回答
  • Checkpoint介绍

    checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。

    每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。

    1. CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。

    2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理

    3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。

    1. 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。

    2. 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败

    如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示

    两个输入源 checkpoint 过程

    1. 假设算子C有A和B两个输入源

    2. 在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。

    3. 当输入源B发出的barrier到来时,算子C制作自身快照并向CheckpointCoordinator报 告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。

    当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某 个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被 处理一次(Exactly Once)。

    持久化存储

    目前,Checkpoint持久化存储可以使用如下三种:

    MemStateBackend

    该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及

    快照的数据量非常小时使用,并不推荐用作大规模商业部署。

    FsStateBackend

    该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。

    RocksDBStateBackend

    RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

    如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自 定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多 个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用 的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。

    语法

    ​ val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // 设置checkpoint的执行模式,最多执行一次或者至少执行一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 设置checkpoint的超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) //设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    ​ 例子

    需求

    假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理

    数据规划

    使用自定义算子每秒钟产生大约10000条数据。 
 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。 
 数据经统计后,统计结果打印到终端输出。 
 打印输出的结果为Long类型的数据。 
 开发思路 


    1. source算子每隔1秒钟发送10000条数据,并注入到Window算子中。

    2. window算子每隔1秒钟统计一次最近4秒钟内数据数量。

    3. 每隔1秒钟将统计结果打印到终端

    4. 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。

    //发送数据形式 case class SEvent(id: Long, name: String, info: String, count: Int)

    class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // 任务取消时调用 override def cancel(): Unit = { isRunning = false } //// source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } }

    /** 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */ object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } }

    //该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。 // 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count }

    //该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L

    // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 从自定义快照中恢复状态 override def restoreState(state: util.List[UDFState]): Unit = { val udfState = state.get(0) total = udfState.getState }

    // 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } }

    flink-SQL

    Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

    org.apache.flink flink-table_2.11 1.5.0 另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

    org.apache.flink flink-scala_2.11 1.5.0

    Table API和SQL程序的结构

    Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;

    所以我们只需要使用一种来演示即可

    要想执行flink的SQL语句,首先需要获取SQL的执行环境:

    两种方式(batch和streaming):

    // *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

    // *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) 通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

    在内部目录中注册一个表 注册外部目录 执行SQL查询 注册用户定义的(标量,表格或聚合)函数 转换DataStream或DataSet成Table 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

    在内部目录中注册一个表

    TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。

    输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统

    输入表可以从各种来源注册:

    现有Table对象,通常是表API或SQL查询的结果。 TableSource,它访问外部数据,例如文件,数据库或消息传递系统。 DataStream或DataSet来自DataStream或DataSet程序。 输出表可以使用注册TableSink。

    注册一个表

    // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

    // register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable)

    // Table is the result of a simple projection query val projTable: Table = tableEnv.scan("projectedTable ").select(...) 注册一个tableSource

    TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...)

    // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource) 注册一个tableSink

    注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...)

    // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env)

    // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

    // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

    // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) 例子

    //创建batch执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //创建table环境用于batch查询 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //加载外部数据 val csvTableSource = CsvTableSource.builder() .path("data1.csv")//文件路径 .field("id" , Types.INT)//第一列数据 .field("name" , Types.STRING)//第二列数据 .field("age" , Types.INT)//第三列数据 .fieldDelimiter(",")//列分隔符,默认是"," .lineDelimiter("\n")//换行符 .ignoreFirstLine()//忽略第一行 .ignoreParseErrors()//忽略解析错误 .build() //将外部数据构建成表 tableEnvironment.registerTableSource("tableA" , csvTableSource) //TODO 1:使用table方式查询数据 val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'") //将数据写出去 table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) //TODO 2:使用sql方式 // val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2") //// //将数据写出去 // sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)) able和DataStream和DataSet的集成

    1:将DataStream或DataSet转换为表格

    在上面的例子讲解中,直接使用的是:registerTableSource注册表

    对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。

    然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据

    语法:

    // get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env)

    val stream: DataStream[(Long, String)] = ...

    // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) 例子

    object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    //构造数据
    val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 3),
      Order(1L, "diaper", 4),
      Order(3L, "rubber", 2)))
    val orderB: DataStream[Order] = env.fromCollection(Seq(
      Order(2L, "pen", 3),
      Order(2L, "rubber", 3),
      Order(4L, "beer", 1)))
    // 根据数据注册表
    tEnv.registerDataStream("OrderA", orderA)
    tEnv.registerDataStream("OrderB", orderB)
    // union the two tables
    val result = tEnv.sqlQuery(
      "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
        "SELECT * FROM OrderB WHERE amount < 2")
    result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
    env.execute()
    

    } } case class Order(user: Long, product: String, amount: Int)

    将表转换为DataStream或DataSet

    A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序

    1:将表转换为DataStream

    有两种模式可以将 Table转换为DataStream:

    1:Append Mode

    将一个表附加到流上

    2:Retract Mode

    将表转换为流

    语法格式:

    // get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env)

    // Table with two fields (String name, Integer age) val table: Table = ...

    // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStreamRow

    // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream(String, Int)

    // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStreamRow 例子:

    object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World"))

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    val stream = env.fromCollection(data)
    val table: Table = tEnv.fromDataStream(stream)
    //TODO 将table转换为DataStream----[数控等离子切割机](http://www.158cnc.com)[http://www.158cnc.com](http://www.158cnc.com)将一个表附加到流上Append Mode
    val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table)
    //TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息
    val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table)
    retractStream.print()
    env.execute()
    

    } }

    case class Peoject(user: Long, index: Int, content: String)

    将表转换为DataSet

    语法格式

    // get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env)

    // Table with two fields (String name, Integer age) val table: Table = ...

    // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSetRow

    // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet(String, Int) 例子:

    case class Peoject(user: Long, index: Int, content: String)

    object TableTODataSet{ def main(args: Array[String]): Unit = {

    //构造数据,转换为table
    val data = List(
      Peoject(1L, 1, "Hello"),
      Peoject(2L, 2, "Hello"),
      Peoject(3L, 3, "Hello"),
      Peoject(4L, 4, "Hello"),
      Peoject(5L, 5, "Hello"),
      Peoject(6L, 6, "Hello"),
      Peoject(7L, 7, "Hello World"),
      Peoject(8L, 8, "Hello World"),
      Peoject(8L, 8, "Hello World"),
      Peoject(20L, 20, "Hello World"))
    //初始化环境,加载table数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    val collection: DataSet[Peoject] = env.fromCollection(data)
    val table: Table = tableEnvironment.fromDataSet(collection)
    //TODO 将table转换为dataSet
    val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table)
    toDataSet.print()
    

    // env.execute() } }

    2020-06-16 19:23:12
    赞同 展开评论 打赏
  • 你的时间戳参数不是 TIMESTAMP ,而是 STRING,window 的参数必须是 TIMESTAMP。

    2020-06-15 20:18:47
    赞同 1 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载