问题一:flink 1.11.2 on yarn 可用slot始终为0,job无法提交如何处理?
http://apache-flink.147419.n8.nabble.com/file/t1162/Screenshot_2020-12-09_153858.png
启动命令: ./bin/flink run-application -t yarn-application -Djobmanager.memory.process.size=2048m -Dtaskmanager.memory.process.size=2048m -Dyarn.application.name="Test Job" -c com.jacob.Main /opt/app/test.jar
Hadoop集群 资源充足。flink无法为job分配slot。 *来自志愿者整理的flink邮件归档
参考答案:
jobmanager 的日志方便发下吗?
另外,可以看下 yarn 是否分配了 taskmanager 的 container,如果有的话通过 yarn 获取以下 taskmanager
的日志。
Thank you~*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370052?spm=a2c6h.13066369.question.16.33bf585fottvLO
问题二:FlinkSQL如何定义JsonObject数据的字段类型
flink version: 1.11.2 api: flink-sql
场景:使用flink sql定义了一张kafka的source表,kafka中数据为json格式的字符串。 其中context是json的一个键,其值为jsonObject,数据示例如下: { “id”: 1, "context”: { … (这里的数据为jsonObject,具体schema不确定, 由各个业务方自行确定,可能嵌套,也可能不嵌套,完全不可控) } } 建表语句为: CREATE TABLE json_source ( id bigint, context ) WITH ( 'connector' = 'kafka’, 'format' = 'json’ );
问题: 该使用什么数据类型来指定类型 呢?从目前的flink sql 的 data type 里感觉没有很合适的匹配项,不管是ROW,或者MAP都不太合适。 *来自志愿者整理的flink邮件归档
参考答案:
可以用string*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370109?spm=a2c6h.13066369.question.19.33bf585f6mhu88
问题三:flink sql 1.11 kafka cdc与holo sink有一些问题
flink sql 1.11 创建kafka source 表 ,kafka数据是canal采集的mysql 信息,'format' = 'canal-json', 问题是 1,在source表中只能有与msyql对应的schema信息么,(也就是data[{}]里面的字段)能不能获取table,ts这种字段的值? 2,对于一个topic中有多张mysql binlog信息的表,kafka source表是如何区分的,依赖于schema的不同吗? 3,这种source表,与holo sink 表结合使用,遇到delete类型的数据会在holo中删除该条数据吗?'ignoreDelete' = 'false' *来自志愿者整理的flink邮件归档
参考答案:
- 目前不支持。 已有 issue 跟进支持 https://issues.apache.org/jira/browse/FLINK-20385
- 配上 canal-json.table.include = 't1' 来过滤表。暂不支持正则过滤。
- 会*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370054?spm=a2c6h.13066369.question.18.33bf585fcvaO9V
问题四:flink11 SQL 如何支持双引号字符串?
我在迁移hive sql 到 flink引擎。原来的很多 hive sql 中
字符串都是用双引号表示,例如 select * from table1 where column1 =
"word"。我如何在不修改SQL的前提下,使flink SQL 支持 双引号字符串。
ps:我看到flink SQL中字符串都必须用 单引号,例如 select * from table1 where column1 =
'word' 。如何使 字符串 既可以是单引号 也可以是 双引号呢*来自志愿者整理的flink邮件归档
参考答案:
是跟这个 Issue 有关吗?https://issues.apache.org/jira/browse/FLINK-20537*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370055?spm=a2c6h.13066369.question.19.33bf585fYvBtt6
问题五:关于flink sql往postgres写数据遇到的timestamp问题
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法 在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义 select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE); 有没有什么转换方法?或者只插入部分数据的方法? 报错信息: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE NOT NULL' (conversion class: java.time.Instant) to type information. Only data types that originated from type information fully support a reverse conversion. at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438) at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329) at org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237) at org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49) at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271) at org.apache.flink.table.client.gateway.local.result.CollectStreamResult. (CollectStreamResult.java:71) at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult. (MaterializedCollectStreamResult.java:101) at org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult. (MaterializedCollectStreamResult.java:129) at org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465) at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555) at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)*来自志愿者整理的flink邮件归档
参考答案:
看报错信息,你并没有 insert into postgres,而是只是 select 了 query,这个会将运行结果显式在 sql client 界面上,而不会插入到 postgres 中。
你的 query 中有 timestamp with local time zone, 而 sql client 的 query运行结果的显式 还不支持这个类型。
这个问题的解决可以关注下这个 issue:https://issues.apache.org/jira/browse/FLINK-17948 *来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370056?spm=a2c6h.13066369.question.22.33bf585fLoUq0n