win10 pyflink1.12 本机env.sql_query()执行 表关联sql, 时间过长-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

win10 pyflink1.12 本机env.sql_query()执行 表关联sql, 时间过长

游客oaxmz3gavjlqk 2021-03-01 09:41:41 558

win10 使用pyflink1.12 利用DDL在流环境中定义两张源表:一个1千万行,一个7千行。利用env.sql_query执行表关联操作获取所需的250条数据。 问题: 获取数据的速度很慢,本机执行需要9mins, 再加上稍复杂一些的udaf函数,就会跑30分钟以上。 请问本地利用python执行pyflink时,是需要配置python的相关执行参数么?本人小白,看了官网的开发文档也没找到相关的指导?求路过的大佬们,指导一下,拜托啦!!!小白先谢过啦。

(另:pyflink1.11中进行过同样测试,由于pyflink1.11中支持connector read.query参数,sql交由数据库部分执行,所以获取数据速度很快。)

附代码: from pyflink.table import StreamTableEnvironment, EnvironmentSettings from pyflink.datastream import StreamExecutionEnvironment

  1. s_env = StreamExecutionEnvironment.get_execution_environment()
  2. s_env.set_parallelism(8)
  3. env = StreamTableEnvironment.create(s_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
  4. env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
  5. (700行表table1) source_ddl1_2 = """CREATE TABLE table1 (DAY_ID VARCHAR(8),IS_EXCH_DAY DECIMAL) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/db?useSSL=False', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'xxx', 'table-name' = 'table1') """
  6. (1千万行表table2) source_ddl2_2 = """CREATE TABLE table2 (PF_ID VARCHAR(10),SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/yss_datamiddle_newrisk?useSSL=False', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'xxx', 'table-name' = 'table2') """
  7. env.execute_sql(source_ddl1_2)
  8. env.execute_sql(source_ddl2_2)
  9. sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM table1 JOIN table2 ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID = '456' AND CCY_TYPE = 'AC' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"

  10. query_table = env.sql_query(sql)

  11. query_table.to_pandas()

query执行 sql关联表 env执行 sql关联 link执行
分享到
取消 提交回答
全部回答(1)
  • 开心老黄
    2021-03-09 10:51:44

    Hi, 1. 关于sql_query的话本质都是跑的也是java代码,你这边在没有使用的Python udaf跑更慢的原因是你用了to_pandas这个sink,to_pandas的一般是用来本地调试用的,在性能上是不行的。 2. 关于用了udaf的30分钟的问题,我在社区邮件也回复你了,我猜测是你pandas udaf的实现上不够高效导致的。 http://apache-flink.147419.n8.nabble.com/flink1-12-sql-query-pyflink-9min-java-3s-td10994.html

    0 0

一套基于Apache Flink构建的一站式、高性能实时大数据处理平台,广泛适用于流式数据处理、离线数据处理、DataLake计算等场景。

推荐文章
相似问题
最新问题
链接