暂无个人介绍
2021年12月
Flink does not directly support authenticating access to the web UI, but you can always put something like Apache's basic_auth in front of it.*来自志愿者整理的flink邮件归档
Hi! 感谢提出问题。方案一应该是最合适的,“算子名称长度超过限制而失败”不是期望行为,具体是什么样的错误栈?*来自志愿者整理的flink
Hi! scan.partition.lower-bound 和 scan.partition.upper-bound 都是一个 long 值(而不是一个 timestamp 字符串的形式)。它们将会转换成 where between and 的 SQL 语句通过 jdbc 获取数据。可以检查一下配置项的格式和值的范围是否符合期望。*来自志愿者整理的flink
Hi, 如果兩次 left join 的話是否滿足你的需求呢? 然後在取 temporal table 的字段時,用 IF 去判斷取值。參考 SQL 如下
SELECT c.mer_cust_id, IF(k.mer_cust_id IS NOT NULL AND a.mercust_id IS NOT NULL AND k.mer_cust_id <> '', k.update_time, NULL) AS update_time FROM charge_log as c LEFT JOIN ka_mer_info FOR SYSTEM_TIME AS OF c.proc_time AS k ON c.mer_cust_id = k.mer_cust_id LEFT JOIN adp_mer_user_info FOR SYSTEM_TIME AS OF c.proc_time AS a ON c.mer_cust_id = a.mer_cust_id
不過,這種寫法只能適用在兩張 MySQL 表都保證 mer_cust_id 是唯一主鍵的狀況下。如果 mer_cust_id 不是唯一的話,輸出的結果數量會跟原本提供的 SQL 期望的輸出不一致 比如說 ka_mer_info 有 0 筆數據, adp_mer_user_info 有 2 筆數據,原先的 SQL 會得到 1 筆 left join 沒成功的數據,上面提供的 SQL 則會輸出 2 筆。*来自志愿者整理的flink
Hi! 如果只是 bucket 不同的话,通过在 with 参数里指定 path 即可。
如果连 ak id 和 secret 都不同,可以考虑实现自己的 com.aliyun.oss.common.auth.CredentialsProvider 接口,并在 flink conf 中指定 fs.oss.credentials.provider 为对应的实现类。*来自志愿者整理的flink
这个是可以直接控制内部连边的方式,可以参考一下这个的Java doc。不过这个是一个内部接口,还是建议使用 env.setRuntimeMode(RuntimeExecutionMode.BATCH),这个可以参考一下这个文档: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/execution_mode/ 。
public enum GlobalStreamExchangeMode { /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */ ALL_EDGES_BLOCKING,
/** * Set job edges with {@link ForwardPartitioner} to be {@link * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link * ResultPartitionType#BLOCKING}. */ FORWARD_EDGES_PIPELINED,
/** * Set job edges with {@link ForwardPartitioner} or {@link RescalePartitioner} to be {@link * ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link * ResultPartitionType#BLOCKING}. */ POINTWISE_EDGES_PIPELINED,
/** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */ ALL_EDGES_PIPELINED,
/** Set all job edges {@link ResultPartitionType#PIPELINED_APPROXIMATE}. */ ALL_EDGES_PIPELINED_APPROXIMATE }*来自志愿者整理的flink
Hi!
邮件里看不到图片和附件,建议使用外部图床。
partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证 exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。*来自志愿者整理的flink
af里acc为个list,merge的时候合并,输出的时候 list拼成string即可*来自志愿者整理的flink
类似jdbc 里面同时进行scan,然后输出。再接流处理呢*来自志愿者整理的flink
Thanks for starting this discussion Yingjie,
How will our tests be affected by these changes? Will Flink require more resources and, thus, will it risk destabilizing our testing infrastructure?
I would propose to create a FLIP for these changes since you propose to change the default behaviour. It can be a very short one, though.*来自志愿者整理的flink
Hi!
正确。例如 HBaseRowDataAsyncLookupFunction 里就调用了 hbase 提供的 table#get 异步方法实现异步查询。
Async operator 就是为了提高无需保序的操作(例如很多 etl 查维表就不关心顺序)的效率才引入的,如果对顺序有强需求就不能用 async operator。*来自志愿者整理的flink
可能是 watermark 问题:并发多,测试数据少,有些并发没有数据,导致watermark没有增长。
如果是这个原因的话,有两种解决办法: 1)t_env.get_config().get_configuration().set_string("parallelism.default", "1") 2)t_env.get_config().get_configuration().set_string("table.exec.source.idle-timeout", "5000 ms")*来自志愿者整理的flink