![个人头像照片](https://ucc.alicdn.com/avatar/avatar3.jpg)
暂无个人介绍
2020年10月
2020年07月
2020年05月
2019年12月
2019年11月
菜鸟实验室
前几天看书,可以通过执行计划判断是否走了多进程查询
flink的Table&SQL API 建表都会存在对应的 catalog 的 database 中。默认情况下,使用的是 org.apache.flink.table.catalog.GenericInMemoryCatalog 你注册的souce表也好,sink表也好,中间创建的table或者view也好,都会注册在这个catalog中,也就是内存中,任务结束,信息不会被保存
如果你想跨flink session使用表。 比如我提前创建好source表的表结构,如kafakSource,然后其他任务就不用再很麻烦的重新注册这个kafkasource表的话。 需要自己实现一个 catalog 类,将注册的表信息写道mysql中或者其他持久化存储中。 这样任何的flink任务,只要使用你自己实现的catalog,就可以获取定义好的表。
可以看下我这个flink与hive集成时的demo,flink自带的hive的catalog就是这么做的,只不过更复杂 https://github.com/935205406/flink-hive-integration-demo/blob/master/src/main/java/com/example/flink/HiveWriteDemo.java
我们也在研究这块、打算用groovy加aviatror的方案动态修改cep的规则、但是也要改flink源码
HBase也可以,kv格式都可以。
可以keyby keyby之后还 是数据倾斜的话现在解决的方法一般是LocalGlobal
可以看一下 相当于MapReduce的Combine+Reduce https://help.aliyun.com/document_detail/98981.html?spm=a2c4g.11186623.6.614.86286d163E6NmE
一般是 java 和 scala,现在社区也在发展 python。后期应该会有更好的支持
map的话每个消息都需要有输出,flatMap的话过滤掉的不需要转换,flatMap相当于filter+map
距离可以在where中做下判断吧,如果关闭定位数据不在上传,那么整个模式匹配失败,之前的事件就会被清理掉
感觉你这种场景不适合双流jion,用户表可以认为维表吧 放在数据库里 行为数据来的时候再查询关联
这种不可累加型的指标,全天数据几分钟产出一次,确实很烦人,而且state还得手动清,那个ttl不支持event time,说实话感觉用外部存储,比如redis 的hyperloglog会更好一些,uv这种不可累加的指标,放在state里面确实不太好维护
如果上游source组件支持重放,下游sink组件支持事务就可以实现端到端
定位瓶颈在哪里,应该是磁盘io是瓶颈,可以关闭 local recovery,并且把 本地的 rocksdb 目录指定到多个磁盘
一条一条提交,Kafka的压力肯定山大。。。如果不想丢数据,只能尽量把checkpoint的周期设短一点
https://mp.weixin.qq.com/s?__biz=MzU5Mzk3MDA3Mw==&mid=2247483866&idx=2&sn=6a3b458caf5bebf0171f9fbd834b7517&chksm=fe09172cc97e9e3a590f5ea2978d078b1b46d94f86bd344173fa69c1d63790b09d2fe173bffb&token=1856795336&lang=zh_CN#rd
参考下
可以看下 flink_taskmanager_job_task_buffers_outPoolUsage 这个指标
窗口自带trigger是在窗口结束时触发,如果想提早触发可以提供一个额外trigger,例如stream.tumblingWindow(...).trigger(new ContinuousTrigger(...)),如果需要每来条消息就触发就换用CountTrigger
这个可能是数据格式的问题
应该是你yarn配置container最大使用内存的原因
shell 命令查看连接数撒,大概率是跟redis mysql 这类代码没释放连接有关系,取消不了任务 应该是临时文件呗自动删除了 设置修改一个参数路径应该就好了