开发者社区> 问答> 正文

Flink 的 Scala API 怎么指定 时间字段

代码如下

import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializeFilter
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSource
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sources.wmstrategies.BoundedOutOfOrderTimestamps

// 不使用java的Types,使用org.apache.flink.table.api.Types
//import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.types.Row



object RowTest1 extends App {

  class MySource extends SourceFunction[Row] {

    val data = Array[String](
      """{"rider_id":10,"rider_name":"hb","city":"hangzhou","rowtime":1555984311000}""",
      """{"rider_id":10,"rider_name":"hb","city":"hangzhou","rowtime":1555984315000}""",
      """{"rider_id":10,"rider_name":"hb","city":"hangzhou","rowtime":1555984343000}"""
    )

    override def run(ctx: SourceFunction.SourceContext[Row]): Unit = {
      for (i <- data) {

        val r1 = JSON.parseObject(i)

        val rider_id = r1.getObject("rider_id", classOf[Int])
        val rider_name = r1.getObject("rider_name", classOf[String])
        val rowTime = r1.getObject("rowtime", classOf[java.sql.Timestamp])
        //println(rider_id, rider_name, rowTime)

        val row = Row.of(rider_id.asInstanceOf[Object], rider_name.asInstanceOf[Object], rowTime.asInstanceOf[Object])
        ctx.collect(row)
        Thread.sleep(1000)
      }
    }

    override def cancel(): Unit = {}
  }


  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = TableEnvironment.getTableEnvironment(env)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val fieldNames = Array[String]("rider_id", "rider_name", "mytime.ROWTIME")

  val types = Array[TypeInformation[_]](Types.INT, Types.STRING, Types.SQL_TIMESTAMP)

  val rowSource = env.addSource(new MySource)(Types.ROW(types: _*))
  //rowSource.print()
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Row](Time.seconds(10)) {
    override def extractTimestamp(element: Row): Long = element.getField(2).asInstanceOf[java.sql.Timestamp].getTime
  })

  val table1 = rowSource.toTable(tEnv).as('rider_id, 'rider_name, 'mytime)

  table1.printSchema()

  tEnv.registerTable("t1", table1)

  tEnv.sqlQuery(
    """
      | select
      |   rider_id,
      |   count(*) as cnt
      | from t1
      | group by rider_id, TUMBLE(mytime, INTERVAL  '10' SECOND)
      |
    """.stripMargin).toAppendStream[Row].print()


  env.execute()
}

执行提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.

请问:
从DataStream[Row] 转到Table过程中, 怎么指定时间字段呢.

展开
收起
冷丰 2019-04-25 19:17:46 6217 0
1 条回答
写回答
取消 提交回答
  • 需要的pom依赖:

    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>${flink.version}</version>


    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>${flink.version}</version>


    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>${flink.version}</version>



    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>

    从kafka消费数据,转换为table,然后进行sql查询。
    用scala开发,需要导入的包,不要漏掉,否则会有问题。

    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.table.api.TableEnvironment
    import org.apache.flink.table.api.scala._
    1
    2
    3
    4
    5
    6
    下面是完整代码:
    package com.ddxygq.bigdata.flink.sql

    import java.util.Properties

    import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.table.api.TableEnvironment
    import org.apache.flink.table.api.scala._

    /**

    • @ Author: keguang
    • @ Date: 2019/2/22 16:13
    • @ version: v1.0.0
    • @ description:
      */

    object TableDemo {
    def main(args: Array[String]): Unit = {

    demo
    

    }

    def demo2(): Unit ={

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    
    val input:DataSet[WC] = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    val input2:DataSet[WC] = env.fromElements(WC("hello", 1), WC("hello", 1))
    val table = input.toTable(tEnv, 'word, 'frequency)
    val table2 = input2.toTable(tEnv, 'word2, 'frequency2)
    val result = table.join(table2).where('word == 'word2).select('word, 'frequency)
    result.toDataSet[(String, Long)].print()
    

    }

    def demo: Unit ={

    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
    
    // 连接kafka
    val ZOOKEEPER_HOST = "qcloud-test-hadoop01:2181"
    val KAFKA_BROKERS = "qcloud-test-hadoop01:9092,qcloud-test-hadoop02:9092,qcloud-test-hadoop03:9092"
    val TRANSACTION_GROUP = "transaction"
    val kafkaProps = new Properties()
    kafkaProps.setProperty("zookeeper.connect",ZOOKEEPER_HOST)
    kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS)
    kafkaProps.setProperty("group.id",TRANSACTION_GROUP)
    val input = sEnv.addSource(
      new FlinkKafkaConsumer08[String]("flink-test", new SimpleStringSchema(), kafkaProps)
    )
      .flatMap(x => x.split(" "))
      .map(x => (x, 1L))
    
    val table = sTableEnv.registerDataStream("Words", input, 'word, 'frequency)
    
    val result = sTableEnv
      .scan("Words")
      .groupBy("word")
      .select('word, 'frequency.sum as 'cnt)
    sTableEnv.toRetractStream[(String, Long)](result).print()
    
    sTableEnv.sqlQuery("select * from Words").toAppendStream[(String, Long)].print()
    
    sEnv.execute("TableDemo")

    }
    }

    这里有两个地方:
    1、这里举例用了table的算子,和标准的sql查询语法,为了演示table的基本用法。

    val result = sTableEnv

      .scan("Words")
      .groupBy("word")
      .select('word, 'frequency.sum as 'cnt)
    

    这个分组聚合统计其实可以替换成:

    val result = sTableEnv.sqlQuery("select word,sum(frequency) as cnt from Words group by word")

    // 打印到控制台
    sTableEnv.toRetractStream(String, Long).print()
    那么这个与下面的查询结果有什么区别呢?

    sTableEnv.sqlQuery("select * from Words").toAppendStream[(String, Long)].print()

    区别很明显,这里消费kafka的实时数据,那么Words表是一个动态的流表,数据在不断append,一个是group by的分组聚合,结果需要不断更新,比如当前是(hello,4),这时候又来了一个词语hello,就需要update结果为(hello,5),如果有新词,还需要insert,而后者是select * from Words,只是追加结果。

    所以,这里只是展示打印到控制台的写法不同,前者调用的是toRetractStream方法,而后者是调用toAppendStream。

    2019-07-17 23:34:17
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink峰会 - 徐榜江 立即下载
Just Enough Scala for Spark 立即下载
JDK8新特性与生产-for“华东地区scala爱好者聚会” 立即下载