问题一:Flink SQL GROUP BY后写入postgresql数据库主键如何实现业务场景需求呢?
SQL中对时间窗口和PRODUCT_ID进行了Group By聚合操作,PG数据表中的主键须设置为WINDOW_START /WINDOW_END和PRODUCT_ID,否则无法以upinsert方式写出数据,但是这样却无法满足业务场景的需求,业务上应以RANK_ID +WINDOW_START /WINDOW_END为主键标识,请问Flink 中该如何实现这个需求?*来自志愿者整理的flink邮件归档
参考答案:
这个当前确实支持不了,不过这个问题将在 FLIP-95 提供的新 TableSink 接口后解决,有望在 1.11 中解决。来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370046?spm=a2c6h.13066369.question.58.33bf585frGVY3q
问题二:修改topic名称后从Savepoint重启会怎么消费Kafka?
Hi,大佬们 突然有一个疑问点,Flink消费kafka的Job保存了SP之后停止,修改代码中的topic名称之后重启。 会从新topic的哪里开始消费?与startup-mode一致,还是从最新呢? 可以手动控制么? *来自志愿者整理的flink邮件归档
参考答案:
可以手动控制的,如果不设置默认是从最新位置开始消费,否则按照你的startup-mode来 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370093?spm=a2c6h.13066369.question.57.33bf585fknvNHu
问题三:大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据匹配
大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么? *来自志愿者整理的flink邮件归档
参考答案:
Collect 函数返回 Multiset 类型 ,可以使用 Map<T, Integer> 试试 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370094?spm=a2c6h.13066369.question.58.33bf585flp3p0Y
问题四:【Flink SQL】无法启动 env.yaml怎么处理?
在服务器上试用sql-client时,启动指令如下:
./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml -e /root/flink-sql-client/sql-client-demo.yml
配置如下:
定义表
tables: - name: SourceTable type: source-table update-mode: append connector: type: datagen rows-per-second: 5 fields: f_sequence: kind: sequence start: 1 end: 1000 f_random: min: 1 max: 1000 f_random_str: length: 10 schema: - name: f_sequence data-type: INT - name: f_random data-type: INT - name: f_random_str data-type: STRING
遇到了如下报错:
Reading default environment from: file:/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml Reading session environment from: file:/root/flink-sql-client/sql-client-demo.yml
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath.
Reason: Required context properties mismatch.
The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Missing properties: format.type=csv Mismatched properties: 'connector.type' expects 'filesystem', but is 'datagen'
The following properties are requested: connector.fields.f_random.max=1000 connector.fields.f_random.min=1 connector.fields.f_random_str.length=10 connector.fields.f_sequence.end=1000 connector.fields.f_sequence.kind=sequence connector.fields.f_sequence.start=1 connector.rows-per-second=5 connector.type=datagen schema.0.data-type=INT schema.0.name=f_sequence schema.1.data-type=INT schema.1.name=f_random schema.2.data-type=STRING schema.2.name=f_random_str update-mode=append
The following factories have been considered: org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.filesystem.FileSystemTableFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:183) at org.apache.flink.table.client.gateway.local.ExecutionContext. (ExecutionContext.java:136) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859) ... 3 more
看描述是有包找不到,到我看官网上说 json 的解析 jar 在 sql-client 中包含啊,试用 sql-client 也需要自己导包的么?哪里有更详细的资料,求指点,谢谢
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370095?spm=a2c6h.13066369.question.59.33bf585fTzCWiM
问题五:使用flink-sql解析debezium采集的mysql timestamp字段报错
flink-sql-client执行建表:
CREATE TABLE source_xxx ( id INT, ctime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'debezium-json.schema-include' = 'false', 'debezium-json.ignore-parse-errors' = 'false' );
查询: select * from source_xxx; [ERROR] Could not execute SQL statement. Reason: java.time.format.DateTimeParseException: Text '2018-07-10T23:47:35Z' could not be parsed at index 10
mysql源表中ctime字段为timestamp类型,增加'debezium-json.timestamp-format.standard' = 'ISO-8601'配置依然报错,结尾多了个Z。 想咨询一下,这块儿是flink-sql和debezium采集的timestamp格式不兼容么?还是我debezium的配置,或者使用的flink-sql类型有问题?*来自志愿者整理的flink邮件归档
参考答案:
引用 Jark 对邮件列表中另一个相关的问题的回答,详情可查看[1]。 希望对你有帮助。
[1] http://apache-flink.147419.n8.nabble.com/flink-sql-td8884.html#a8888*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370096?spm=a2c6h.13066369.question.62.33bf585f50r5Fo