开发者社区> 问答> 正文

pyflink1.12 连接Mysql报错 : Missing required options

在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?  #DDL定义  source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\ 

symbol_id VARCHAR,biz_date VARCHAR,\ 

ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\ 

is_valid DECIMAL,time_mark TIMESTAMP) WITH ( 

'connector' = 'jdbc', 

'connector.url' = 'jdbc:mysql://ip:port/db_base', 

'connector.table' = 'ts_pf_sec_yldrate', 

'table-name' = 'ts_pf_sec_yldrate', 

'connector.driver' = 'com.mysql.jdbc.Driver', 

'connector.username' = 'xxx', 

'connector.password' = 'xxx') 

"""  错误信息:  Traceback (most recent call last):  File "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py", line 67, in   print(join.to_pandas().head(6))  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py", line 807, in to_pandas  .collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in call  answer, self.gateway_client, self.target_id, self.name)  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco  return f(*a, **kw)  File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value  format(target_id, ".", name), value)  py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.  : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'. 

Table options are: 

'connector'='jdbc'  'connector.driver'='com.mysql.jdbc.Driver'  'connector.password'='xxx'  'connector.table'='ts_pf_sec_yldrate'  'connector.url'='jdbc:mysql://ip:port/db_base'  'connector.username'='xxx'  'table-name'='ts_pf_sec_yldrate'  at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)  at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)  at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)  at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)  at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)  at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)  at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)  at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)  at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)  at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)  at java.util.Collections$SingletonList.forEach(Collections.java:4824)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)  at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)  at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)  at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)  at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)  at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)  at org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)  at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)  at java.util.Collections$SingletonList.forEach(Collections.java:4824)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)  at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)  at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:72)  at org.apache.flink.table.operations.FilterQueryOperation.accept(FilterQueryOperation.java:68)  at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)  at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:202)  at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)  at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)  at scala.collection.Iterator$class.foreach(Iterator.scala:891)  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)  at scala.collection.AbstractTraversable.map(Traversable.scala:104)  at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)  at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267)  at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)  at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:570)  at org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame(ArrowUtils.java:630)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:498)  at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)  at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)  at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)  at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)  at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)  at java.lang.Thread.run(Thread.java:748)  Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing. 

Missing required options are: 

url  at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:303)  at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:280)  at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:553)  at org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.createDynamicTableSource(JdbcDynamicTableFactory.java:179)  at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)  ... 58 more*来自志愿者整理的flink邮件归档

展开
收起
又出bug了-- 2021-12-02 11:46:19 1484 0
1 条回答
写回答
取消 提交回答
  • 正如报错中提示的,with参数里需要的是"url"参数,你可以尝试将connector.url改成url试试看会不会报错了*来自志愿者整理的FLINK邮件归档

    2021-12-02 14:32:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
One Box: 解读事务与分析一体化数据库 HybridDB for MySQL 立即下载
One Box:解读事务与分析一体化数据库HybridDB for MySQL 立即下载
如何支撑HTAP场景-HybridDB for MySQL系统架构和技术演进 立即下载

相关镜像