开发者社区> 问答> 正文

Flink sql join hive 维表

我在尝试使用 kafka 数据流 join hive 维表时,代码启动之初,Flink 会加载一次 hive 中的维表,但是后续插入到 hive 表中的数据不能被 join 到。

查看官网和中文社区的资料,提示设置 lookup.join.cache.ttl 配置参数,我将这个参数尝试设置在 TableEnv.conf 中,和 Table hits 设置在表名后,都没有起作用。

请问有大佬实现过这个功能吗,或者有没有实现过的案例,万分感谢。 image.png

image.png

展开
收起
小草依然1 2020-11-04 11:26:23 1977 0
1 条回答
写回答
取消 提交回答
  • 好吧,刚问完,尝试出来了。 hive 的表需要通过 flink 创建,在建表时指定 lookup.join.cache.ttl 参数。

    +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package Test.Flink

    import org.apache.flink.connectors.hive.{HiveTableFactory, HiveTableSource} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.factories.TableSourceFactory import org.apache.flink.types.Row import org.apache.flink.api.scala._ import org.apache.flink.table.api.config.TableConfigOptions /** * Created by dzm on 2020/7/21. */ object TestHiveJoin { def main(args: Array[String]): Unit = {

    System.setProperty("HADOOP_USER_NAME","root")
    // 使用 Blink Planner 创建流表运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
    val streamTableEnv = StreamTableEnvironment.create(env,settings)
    
    streamTableEnv.getConfig.getConfiguration.setString("lookup.join.cache.ttl","10s")
    
    //    RetryingMetaStoreClient
    val catalog = new HiveCatalog("myHiveCatalog","default","D:\\ideaProject\\hualu\\TestFlinkSQL\\src\\main\\resources\\","1.2.1")
    //    catalog.getTable(new ObjectPath("",""))
    streamTableEnv.registerCatalog("myHiveCatalog",catalog)
    streamTableEnv.useCatalog("myHiveCatalog")
    streamTableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,true)
    //    TableSourceFactory
    //    streamTableEnv.getConfig.getConfiguration.setString("table.sql-dialect","default")
    //   streamTableEnv.getConfig.getConfiguration.setString("table.dynamic-table-options.enabled","true")
    //    streamTableEnv.executeSql("drop table TestJoin9")
    streamTableEnv.executeSql("drop table if exists user_log")
    streamTableEnv.executeSql("drop table if exists TestJoin5")
    
    streamTableEnv.executeSql("drop table if exists flink_sql_sink1")
    streamTableEnv.executeSql(FlinkSQLUtils.kafkaSqlLocal)
    
    streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,"hive")
    

    // HiveTableSource streamTableEnv.executeSql("" + "create table TestJoin5(" + " dwid String," + " dwmc String," + " name2 String" + ") stored as parquet tblproperties (" + " 'lookup.join.cache.ttl' = '10s'" + ")")

    // val fac = new HiveTableFactory()

    val aa = streamTableEnv.sqlQuery("select * from TestJoin5 /*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s')*/")
    streamTableEnv.toAppendStream[Row](aa).print().setParallelism(1)
    
    streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,"default")
    //    HBaseConfiguration
    // streamTableEnv.executeSql(FlinkSQLUtils.mysqlSinkSql1)
    
    //    streamTableEnv.executeSql("insert into TestJoin1 select * from TestJoin")
    
    //    val tableTestJoin = streamTableEnv.sqlQuery("select * from TestJoin")
    //    val ssss = streamTableEnv.toAppendStream[(String,String,String)](tableTestJoin)
    //    ssss.print().setParallelism(1)
    //    val table = streamTableEnv.sqlQuery("select * from TestJoin")
    //    val stream1 = streamTableEnv.toAppendStream[(String,String,String)](table)
    //    val procTable = stream1.toTable(streamTableEnv,'_1 as "dwid",'_2 as "dwmc",'_3 as "name2",'procTime.proctime())
    //    streamTableEnv.createTemporaryView("orcTableWithProcessTime", procTable)
    //
    //    val temFunc = procTable.createTemporalTableFunction('procTime,'dwid)
    //    streamTableEnv.registerFunction("Rates",temFunc)
    //
    
    //
    /*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s','lookup.join.cache.ttl'='15 s')*/
    // https://baijiahao.baidu.com/s?id=1678429138706485764&wfr=spider&for=pc
    try{
      val ssss = streamTableEnv.sqlQuery(
    

    // "insert into flink_sql_sink1 " + "select a.name,CAST(a.id as INT),CAST(a.age as INT),a.behavior,b.dwmc,CAST('1' as INT),b.name2,CAST(a.userProctime as BIGINT) " + "from user_log a LEFT JOIN TestJoin5 /+ OPTIONS('lookup.join.cache.ttl' = '15 s')/ FOR SYSTEM_TIME AS OF a.userProctime as b " + "on a.age = b.dwid where b.dwmc is not null") /+ OPTIONS('lookup.join.cache.ttl' = '10s')/

      streamTableEnv.toAppendStream[Row](ssss).print().setParallelism(1)
    }catch {
      case e: Exception => e.printStackTrace()
    }
    
    //    stream1.print().setParallelism(1)
    
    env.execute("cccc")
    

    } }

    2020-11-06 10:13:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hive Bucketing in Apache Spark 立即下载
spark替代HIVE实现ETL作业 立即下载
2019大数据技术公开课第五季—Hive迁移到MaxCompute最佳实践 立即下载