![个人头像照片](https://ucc.alicdn.com/avatar/0f6e75b628d247738bf704a300799c8b.jpg)
暂无个人介绍
2021年12月
pushdown逻辑是批流复用的,应该work的很愉快。*来自志愿者整理的FLINK邮件归档
建议升级到1.10.0版本,该版本默认对RocksDB backend的内存使用会有限制,更多资料请参考官方文档 [1]。*来自志愿者整理的FLINK邮件归档
你应该是要把作业提交到 yarn 上吧。这个错误应该没有正确的加载 FlinkYarnSessionCli 导致的,这些日志不是失败的根因。可以多提供一些日志看看。*来自志愿者整理的FLINK邮件归档
A1会继续处理。如果是 exactly-once 模式,taskB 不会处理 taskA传递给taskB的cp2的数据。所以,如果 A2 非常非常慢,最终 taskB 会反压到 A1,导致 A1也无法继续处理数据。*来自志愿者整理的FLINK邮件归档
看到异常信息 - Closing TaskExecutor connection container_1578492316659_0830_01_000006 because: Container [pid=30031,containerID=container_1578492316659_0830_01_000006] is running beyond physical memory limits. Current usage: 10.0 GB of 10 GB physical memory used; 11.8 GB of 21 GB virtual memory used. Killing container.
应该是超内存了,容器被 kill 了*来自志愿者整理的FLINK邮件归档
可以看一下 PctrLogJoin -> (Sink: hdfsSink, Sink:> kafkaSink) (8/36) 这个的 tm log 看看具体是什么原因导致的 checkpoint 失败*来自志愿者整理的FLINK邮件归档
当前 batch 模式还不支持 UpsertTableSink,不过已经有 PR 在支持中了: https://issues.apache.org/jira/browse/FLINK-15579*来自志愿者整理的FLINK邮件归档
你可以在这个 ReduceFunction 的 reduce 打印一下 o1 和 o2,看看 o1.getAct() + o2.getAct()
后是啥*来自志愿者整理的FLINK邮件归档
1、首先定位产生反压的位置(可以在 Flink UI 上查看或者根据 Flink 的 Metric 定位)
2、定位到了反压源之后,处理反压可以先从系统资源/垃圾收集(GC)/线程竞争/负载不平衡 等基本原因去分析
更详细的可以看下之前写的一篇文章
https://gitbook.cn/gitchat/column/5dad4a20669f843a1a37cb4f/topic/5db6bed1f6a6211cb9616645*来自志愿者整理的FLINK邮件归档
hi, 你看到的 select count(distinct a, b) from mytable 单元测试能通过,应该是只测试 logical
plan,当前在生成 physical plan的时候,显示的禁用了多个字段*来自志愿者整理的FLINK邮件归档
Hi
你的单节点rocksDB state size多大呢?(可以通过打开相关metrics [1] 或者登录到RocksDB所在机器观察一下RocksDB目录的size)
造成反压是如何确定一定是rocksDB 状态大导致的呢?看你的IO情况绝对值很大,但是百分比倒不是很高。是否用jstack观察过TM的进程,看一下是不是task主线程很容易打在RocksDB的get等读操作上。
RocksDB本质上还是面向磁盘的kv存储,如果是每次读写都更新的话,block cache发挥的作用会很有限。如果达到磁盘瓶颈,需要考虑提高磁盘性能或者想办法降低单个rocksDB实例的state大小。
最后,1.10 默认开启了RocksDB的内存限制功能,你这里提到的性能反压包括在之前版本上试验过么?
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#state-backend-rocksdb-metrics-total-sst-files-size*来自志愿者整理的FLINK邮件归档
建议检查下Watermark,打印出来看看是不是合法的。btw,这代码缩紧有点尴尬。*来自志愿者整理的FLINK邮件归档
应该是所有operator完成各自的pre-commit,它们会发起一个commit操作,然后才会flush*来自志愿者整理的FLINK邮件归档
我在配置flink连接hive时,由于集群开启了Kerberos认证,经过一番探索,异常没有了。但是现在连接的时候需要我输入Kerberos用户名和密码。我理解指定了keytab文件路径后,应该不需要用户名和密码了吧?请教各位大神可能的配置问题。
security.kerberos.login.use-ticker-cache: false security.kerberos.login.keytab: /app/flink/flink-1.10.10/kerberos/flink_test.keytab security.kerberos.login.principal: flink_test@HADOOP.HTSC.COM*来自志愿者整理的FLINK邮件归档
user-zh我就说中文啦. 你需要设置成bigint. 具体报什么错?*来自志愿者整理的FLINK邮件归档
FLINK TM 中是用到了大量的堆外内存的,除了通常意义的 JVM 的栈空间、方法区等堆外开销外,还包括网络 buffer、batch
缓存、RocksDB等。
默认配置是相对保守,为了保证大多数情况下预留出足够的堆外内存。具体是否设置过大了,要看具体运行的作业的情况。可以尝试通过配置'containerized.heap-cutoff-ratio'进行调整。
另外,即将发布的flink 1.10版本中对TM的内存计算进行了优化,不再采用cutoff而是根据用途列出了更具体的配置项,欢迎试用
*来自志愿者整理的FLINK邮件归档
看起来你只能改下connector代码才能支持压缩了: ParquetAvroWriters.createAvroParquetWriter里:设置AvroParquetWriter.Builder的压缩格式。*来自志愿者整理的FLINK邮件归档
flink-hbase_2.11-1.9.0.jar 只包括了flink对hbase读写的封装的类,并没有提供hbase client的类,你需要把hbaes client等相关的jar包提供出来放到 lib包里面。*来自志愿者整理的FLINK邮件归档
Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention policy.
kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 [1],在Flink-1.9 版本下可以忽略这些异常。
[1] https://issues.apache.org/jira/browse/FLINK-12147*来自志愿者整理的FLINK邮件归档
邮件列表里不支持直接发送图片,你可以用一些图床工具来发送图片。
根据你的描述,我猜测你应该是join维表的语法写的不对,写成了普通的join的方式。这种情况下,会把jdbc的表解析成JDBCInputFormat
,一次性读取全部数据。
维表join的SQL写法如下所示:
SELECT
o.amout, o.currency, r.rate, o.amount * r.rateFROM
Orders AS o* JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
详细内容可以参考文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#joins*来自志愿者整理的FLINK邮件归档