小草依然1_个人页

个人头像照片 小草依然1
个人头像照片
0
3
0

个人介绍

暂无个人介绍

擅长的技术

获得更多能力
通用技术能力:

暂时未有相关通用技术能力~

云产品技术能力:

暂时未有相关云产品技术能力~

阿里云技能认证

详细说明
暂无更多信息

2020年11月

正在加载, 请稍后...
暂无更多信息
  • 回答了问题 2020-11-06

    Flink sql join hive 维表

    好吧,刚问完,尝试出来了。 hive 的表需要通过 flink 创建,在建表时指定 lookup.join.cache.ttl 参数。 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package Test.Flink import org.apache.flink.connectors.hive.{HiveTableFactory, HiveTableSource} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.factories.TableSourceFactory import org.apache.flink.types.Row import org.apache.flink.api.scala._ import org.apache.flink.table.api.config.TableConfigOptions /** * Created by dzm on 2020/7/21. */ object TestHiveJoin { def main(args: Array[String]): Unit = { System.setProperty('HADOOP_USER_NAME','root') // 使用 Blink Planner 创建流表运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build val streamTableEnv = StreamTableEnvironment.create(env,settings) streamTableEnv.getConfig.getConfiguration.setString('lookup.join.cache.ttl','10s') // RetryingMetaStoreClient val catalog = new HiveCatalog('myHiveCatalog','default','D:\\ideaProject\\hualu\\TestFlinkSQL\\src\\main\\resources\\','1.2.1') // catalog.getTable(new ObjectPath('','')) streamTableEnv.registerCatalog('myHiveCatalog',catalog) streamTableEnv.useCatalog('myHiveCatalog') streamTableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,true) // TableSourceFactory // streamTableEnv.getConfig.getConfiguration.setString('table.sql-dialect','default') // streamTableEnv.getConfig.getConfiguration.setString('table.dynamic-table-options.enabled','true') // streamTableEnv.executeSql('drop table TestJoin9') streamTableEnv.executeSql('drop table if exists user_log') streamTableEnv.executeSql('drop table if exists TestJoin5') streamTableEnv.executeSql('drop table if exists flink_sql_sink1') streamTableEnv.executeSql(FlinkSQLUtils.kafkaSqlLocal) streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,'hive') // HiveTableSource streamTableEnv.executeSql('' + 'create table TestJoin5(' + ' dwid String,' + ' dwmc String,' + ' name2 String' + ') stored as parquet tblproperties (' + ' 'lookup.join.cache.ttl' = '10s'' + ')') // val fac = new HiveTableFactory() val aa = streamTableEnv.sqlQuery('select * from TestJoin5 /*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s')*/') streamTableEnv.toAppendStream[Row](aa).print().setParallelism(1) streamTableEnv.getConfig.getConfiguration.set(TableConfigOptions.TABLE_SQL_DIALECT,'default') // HBaseConfiguration // streamTableEnv.executeSql(FlinkSQLUtils.mysqlSinkSql1) // streamTableEnv.executeSql('insert into TestJoin1 select * from TestJoin') // val tableTestJoin = streamTableEnv.sqlQuery('select * from TestJoin') // val ssss = streamTableEnv.toAppendStream[(String,String,String)](tableTestJoin) // ssss.print().setParallelism(1) // val table = streamTableEnv.sqlQuery('select * from TestJoin') // val stream1 = streamTableEnv.toAppendStream[(String,String,String)](table) // val procTable = stream1.toTable(streamTableEnv,'_1 as 'dwid','_2 as 'dwmc','_3 as 'name2','procTime.proctime()) // streamTableEnv.createTemporaryView('orcTableWithProcessTime', procTable) // // val temFunc = procTable.createTemporalTableFunction('procTime,'dwid) // streamTableEnv.registerFunction('Rates',temFunc) // // /*+ OPTIONS('streaming-source.enable' = 'true','streaming-source.monitor-interval' = '15 s','lookup.join.cache.ttl'='15 s')*/ // https://baijiahao.baidu.com/s?id=1678429138706485764&wfr=spider&for=pc try{ val ssss = streamTableEnv.sqlQuery( // 'insert into flink_sql_sink1 ' + 'select a.name,CAST(a.id as INT),CAST(a.age as INT),a.behavior,b.dwmc,CAST('1' as INT),b.name2,CAST(a.userProctime as BIGINT) ' + 'from user_log a LEFT JOIN TestJoin5 /+ OPTIONS('lookup.join.cache.ttl' = '15 s')/ FOR SYSTEM_TIME AS OF a.userProctime as b ' + 'on a.age = b.dwid where b.dwmc is not null') /+ OPTIONS('lookup.join.cache.ttl' = '10s')/ streamTableEnv.toAppendStream[Row](ssss).print().setParallelism(1) }catch { case e: Exception => e.printStackTrace() } // stream1.print().setParallelism(1) env.execute('cccc') } }
    踩0 评论0
  • 提交了问题 2020-11-04

    Flink sql join hive 维表

  • 回答了问题 2019-07-17

    请问一下,有人用过flink monitor 的rest api吗?我cancel job每次都返回404 not found。flink 文档里写的是DELETE

    /jobs/90667d4593028d33935129aff/yarn-cancel你用的是hadoop版的flink吧,你可以在8081的web界面里看到正在运行的job,通过F12捕捉cancel按钮发出的指令就可以看到。
    踩0 评论0
正在加载, 请稍后...
滑动查看更多
正在加载, 请稍后...
暂无更多信息