开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

win10 pyflink1.12 本机env.sql_query()执行 表关联sql, 时间过长

win10 使用pyflink1.12 利用DDL在流环境中定义两张源表:一个1千万行,一个7千行。利用env.sql_query执行表关联操作获取所需的250条数据。 问题: 获取数据的速度很慢,本机执行需要9mins, 再加上稍复杂一些的udaf函数,就会跑30分钟以上。 请问本地利用python执行pyflink时,是需要配置python的相关执行参数么?本人小白,看了官网的开发文档也没找到相关的指导?求路过的大佬们,指导一下,拜托啦!!!小白先谢过啦。

(另:pyflink1.11中进行过同样测试,由于pyflink1.11中支持connector read.query参数,sql交由数据库部分执行,所以获取数据速度很快。)

附代码: from pyflink.table import StreamTableEnvironment, EnvironmentSettings from pyflink.datastream import StreamExecutionEnvironment

  1. s_env = StreamExecutionEnvironment.get_execution_environment()
  2. s_env.set_parallelism(8)
  3. env = StreamTableEnvironment.create(s_env, environment_settings=EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build())
  4. env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
  5. (700行表table1) source_ddl1_2 = """CREATE TABLE table1 (DAY_ID VARCHAR(8),IS_EXCH_DAY DECIMAL) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/db?useSSL=False', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'xxx', 'table-name' = 'table1') """
  6. (1千万行表table2) source_ddl2_2 = """CREATE TABLE table2 (PF_ID VARCHAR(10),SYMBOL_ID VARCHAR(20),BIZ_DATE VARCHAR(8),CCY_TYPE VARCHAR(10),YLDRATE DECIMAL(18,12)) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://ip:port/yss_datamiddle_newrisk?useSSL=False', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'xxx', 'table-name' = 'table2') """
  7. env.execute_sql(source_ddl1_2)
  8. env.execute_sql(source_ddl2_2)
  9. sql = "SELECT YLDRATE, PF_ID, SYMBOL_ID FROM table1 JOIN table2 ON DAY_ID = BIZ_DATE WHERE PF_ID = '123' AND SYMBOL_ID = '456' AND CCY_TYPE = 'AC' AND BIZ_DATE BETWEEN '20160701' AND '20170307'"

  10. query_table = env.sql_query(sql)

  11. query_table.to_pandas()

展开
收起
游客oaxmz3gavjlqk 2021-03-01 09:41:41 1317 0
1 条回答
写回答
取消 提交回答
  • Hi, 1. 关于sql_query的话本质都是跑的也是java代码,你这边在没有使用的Python udaf跑更慢的原因是你用了to_pandas这个sink,to_pandas的一般是用来本地调试用的,在性能上是不行的。 2. 关于用了udaf的30分钟的问题,我在社区邮件也回复你了,我猜测是你pandas udaf的实现上不够高效导致的。 http://apache-flink.147419.n8.nabble.com/flink1-12-sql-query-pyflink-9min-java-3s-td10994.html

    2021-03-09 10:51:44
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载