问题一:flink任务提交方式
请问现在flink有没有像sparklauncher这种任务提交方式,在任务提交成功后返回对应的任务id(不管是onyarn还是standlone),我这面想用java代码提交任务并在提交后获取任务id,请问有没有对应功能或工具
参考回答:
1.10.x版本以后env.execute()是返回一个JobExecutionResult
对象的,这里面可以获取到job相关信息,比如你想要的jobid
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371667
问题二:flink的state过期设置
想咨询一下关于state的ttl问题;
想问一下 state设置的ttl,如果从checkpoints重启 ttl会不会失效;ttl针对的是process time,
比如我设置的7天过期,重新从checkpoints启动是第一次启动的时间算还是恢复时的新processtime算;他是state的一部分 还是怎么算;
或者要注册定时器来实现
参考回答:
TTL的时间戳实际是会存储在 state 里面 [1],与每个entry在一起,也就是说从Checkpoint恢复的话,数据里面的时间戳是当时插入时候的时间戳。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371666
问题三:UDTAGGs sql的查询怎么写
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/functions/udfs.html#table-aggregation-functions 请问下UDTAGGs支持sql的写法吗,怎么写?看官档上只有table api的示例。
参考回答:
因为UDTAGGs不属于标准SQL的语法,所以只有TableApi
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/371663
问题四:flink内存分配的问题
taskmanager的内存设置为15G但实际的heap只有10G
看了tm内存分配的源码1.计算cutoff(15GB * 0.25) 2.计算heap大小(heap计算的入参是15GB - cutoff大小) 3.计算offheap大小(offheap的大小等于15GB-heap大小)
offheap就是最终的-XX:MaxDirectMemorySize的大小
想请教下MaxDirectMemorySize的大小有必要设置这么大吗?
参考回答:
FLINK TM 中是用到了大量的堆外内存的,除了通常意义的 JVM 的栈空间、方法区等堆外开销外,还包括网络 buffer、batch
缓存、RocksDB等。
默认配置是相对保守,为了保证大多数情况下预留出足够的堆外内存。具体是否设置过大了,要看具体运行的作业的情况。可以尝试通过配置'containerized.heap-cutoff-ratio'进行调整。
另外,即将发布的flink 1.10版本中对TM的内存计算进行了优化,不再采用cutoff而是根据用途列出了更具体的配置项,欢迎试用
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/373569
问题五:StreamTableEnvironment.registerDatastream()
StreamTableEnvironment.registerDatastream() 开放用户自定义的schemaDescriptionh和DeserializationSchema StreamTableEnvironment.registerDatastream(),目前只支持pojo 或者 是public属性的对象,根据默认字段或者用户指定的字段注册table,但某些场景下没有固定的格式,比如使用JSONObject类型DataStream就无法通过这种方法注册成table,是否可以提供更底层的API来使table注册的灵活性更高。用户传入schema的描述和自定义的解析器DeserializationSchema.
参考回答:
你这种需求的一种解决思路,可以把 JSONObject类型定义成object类型,然后注册成table之后通过一个UDTF把JSONObject转换成特定的schema。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/373571