开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大佬们,flink cdc如何限制拉取的数量?flink内存不多

有条数、大小可以限制吗

展开
收起
雪哥哥 2022-11-13 20:12:47 1993 0
6 条回答
写回答
取消 提交回答
  • 学无止境!

    CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,

    用户可以在以下的场景下使用CDC:

    使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。 可以在源数据库上实时的物化一个聚合视图 因为只是增量同步,所以可以实时的低延迟的同步数据 使用EventTime join 一个temporal表以便可以获取准确的结果 flink 1.11 将这些changelog提取并转化为table apa和sql,目前支持两种格式:Debezium和Canal,这就意味着源表不仅仅是append操作,而且还有upsert、delete操作。


    FlinkCDC-Springboot拉取数据写入Kafka https://blog.51cto.com/u_15127572/3652096

    2022-11-30 11:41:07
    赞同 展开评论 打赏
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    算一算每条数据的大小有多大,然后根据flink内存调整数量,确保最终的每个批次的总大小要小于flink内存

    2022-11-28 10:18:55
    赞同 展开评论 打赏
  • 十年摸盘键,代码未曾试。 今日码示君,谁有上云事。

    目前Flink1.1版本是没有简单的参数可以用来实现限流的功能的。可以通过如下三种方案来实现flink端限流: 设置任务的并行度; 改造kafka source; 调用 FlinkKafkaConsumer010#setRateLimiter,其中可以使用 GuavaFlinkConnectorRateLimiter 类。

    2022-11-24 11:42:26
    赞同 展开评论 打赏
  • Flink 会根据默认值或其他配置参数自动调整剩余内存部分的大小。 关于各内存部分的更多细节,请分别参考 TaskManager 和 JobManager 的相关文档。 对于独立部署模式(Standalone Deployment),如果你希望指定由 Flink 应用本身使用的内存大小,最好选择配置 Flink 总内存。 Flink 总内存会进一步划分为 JVM 堆内存和堆外内存。 更多详情请参考如何为独立部署模式配置内存。 通过配置进程总内存可以指定由 Flink JVM 进程使用的总内存大小。 对于容器化部署模式(Containerized Deployment),这相当于申请的容器(Container)大小,详情请参考如何配置容器内存(Kubernetes、Yarn 或 Mesos)。

    此外,还可以通过设置 Flink 总内存的特定内部组成部分的方式来进行内存配置。 不同进程需要设置的内存组成部分是不一样的。 详情请分别参考 TaskManager 和 JobManager 的相关文档。

    2022-11-23 17:40:58
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    更改配置文件postgresql.conf

    更改wal日志方式为logical

    wal_level = logical # minimal, replica, or logical

    更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots

    max_replication_slots = 20 # max number of replication slots

    更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样

    max_wal_senders = 20 # max number of walsender processes

    中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)

    wal_sender_timeout = 180s # in milliseconds; 0 disable

    wal_level是必须更改的,其它参数选着性更改,如果同步表数量超过10张建议修改为合适的值

    更改配置文件postgresql.conf完成,需要重启pg服务生效,所以一般是在业务低峰期更改 

    2022-11-23 15:02:04
    赞同 展开评论 打赏
  • 网站:http://ixiancheng.cn/ 微信订阅号:小马哥学JAVA

    解决办法,可以使用flink task的failover次数,修改单词的task checkpoint的时长短一些。

    2022-11-22 19:55:53
    赞同 展开评论 打赏
滑动查看更多

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载