Flink CDC中这种情况请问该如何优化?

问题1:Flink CDC中我这边规则维表有3万多记录,业务数据kafka 读取,lookup join 该规则维表,但关联字段不是规则维表的主键,关联后可能会输出多条记录
现在偶尔出现 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer 的报错
请问该如何优化?
规则维表用jdbc 读取,设置了参数
lookup.cache.max-rows'='10000'
'lookup.cache.ttl'='60s'
3万多记录会变,譬如新增规则,修改规则,就会变更,频率不高。
问题2:我用flink table 的jdbc 读取,joins时对维表 FOR SYSTEM_TIME AS OF t1.proctime

如果要LRU算法,是自定义source 吗?

展开
收起
十一0204 2023-07-26 08:07:01 175 分享 版权
3 条回答
写回答
取消 提交回答
  • 问题1:如果您的规则维表有3万多记录,并且关联字段不是主键,可能导致 Lookup Join 后输出多条记录。同时,偶尔出现 TIMED_WAITING 错误。针对这种情况,以下是几点优化建议:

    1. 调整缓存参数:适当调整 Lookup 的缓存参数可以改善性能。根据您的描述,设置了 'lookup.cache.max-rows'='10000''lookup.cache.ttl'='60s',可以尝试增加 lookup.cache.max-rows 的值来提高缓存大小,以适应更大的规则维表。同时,根据业务数据更新频率,合理调整 lookup.cache.ttl 的值。

    2. 考虑使用异步 I/O:Flink 1.14 引入了异步 I/O 功能,可以进一步提高 Lookup Join 的性能。通过异步请求数据库,可以在等待时间内并行地进行其他计算,从而提高整体吞吐量。您可以尝试使用 Flink 的 Async I/O 功能,将 Lookup 操作改为异步模式。

    3. 调整资源配置:根据任务的需求和资源情况,调整 Flink 集群的资源分配,包括内存、CPU 等。确保足够的资源分配给 CDC 作业,以避免过度竞争和资源瓶颈导致的性能问题。

    4. 数据预处理:考虑在 Flink SQL 之前进行数据预处理,例如过滤或聚合规则维表的数据,以减少 Lookup Join 操作的数据量。这样可以降低计算和网络开销,并提高查询性能。

    问题2:如果您使用 Flink Table 的 JDBC 读取,并希望对维表应用 LRU(最近最少使用)算法,可以使用自定义的 Source 实现此功能。您可以实现一个自定义的 TableSource,使用 LRU 算法来缓存维表数据并提供读取功能。

    在自定义的 TableSource 中,您可以通过重写 getLookupTable 方法来实现缓存逻辑,该方法返回一个 Table 对象,表示维表数据。在实现中,您可以使用 LRU 算法来管理缓存大小和淘汰机制,并根据需求进行数据的加载和更新。

    以下是伪代码示例:

    public class CustomTableSource implements LookupTableSource {
        private Map<Key, Value> cache; // LRU cache for table data
    
        public Table getLookupTable(TableEnvironment tEnv) {
            // Check cache and load data if not present or expired
            if (cache == null || isExpired()) {
                loadData();
            }
    
            // Construct a Table object from cache
            TableSchema schema = ...;
            List<Row> rows = ...; // Convert cache map to List<Row>
            return tEnv.fromValues(rows, schema);
        }
    
        // Other methods for LRU cache management and data loading/updating
    }
    

    请注意,这只是一个示例,实际实现需要根据您的具体需求和环境进行调整。

    2023-07-31 23:11:06
    赞同 展开评论
  • 北京阿里云ACE会长

    如果您在使用 Flink CDC 进行数据同步时遇到性能问题,可以考虑以下几种优化方法:
    提高并发度:并发度是指 Flink CDC 在同步数据时同时处理的数据量。通过增加并发度,可以提高 Flink CDC 的数据处理能力,从而提高数据同步的速度。您可以在 Flink CDC 的配置文件中增加并发度参数,例如:
    Copy
    --parallelism 10
    这将使 Flink CDC 使用 10 个并发线程同时处理数据。
    增加缓冲区大小:在 Flink CDC 中,缓冲区用于存储待处理的数据,可以通过增加缓冲区大小来提高数据处理能力。您可以在 Flink CDC 的配置文件中增加缓冲区大小参数,例如:
    Copy
    --buffer-size 10000
    这将使 Flink CDC 使用大小为 10000 的缓冲区来存储待处理的数据。
    使用分区键:在进行数据同步时,使用分区键可以将数据按照指定的键值进行分区,从而提高数据处理效率。您可以在 Flink CDC 的配置文件中设置分区键,例如:
    Copy
    --partition.column name
    这将使 Flink CDC 使用名为 "name" 的列作为分区键。
    优化目标源:如果您的目标源是另一个数据库,可以考虑优化目标源的性能,例如增加目标数据库的缓存大小、增加目标数据库的处理能力等。

    2023-07-29 16:36:09
    赞同 展开评论
  • 意中人就是我呀!

    "回答1: lookup.cache.max-rows'='10000'。直接不要缓存
    'lookup.cache.ttl'='60s'
    把维表的ttl干掉,你加到内存里,60s内数据变了,缓存里有,你join后的数据也不对。如果还是想要缓存减少数据库的压力,可以把减少拉取数据的条数,100?200?300? 用LRU算法淘汰数据。
    回答2:Jdbc不需要自定义,但是需要传一个目标数据库jdbc驱动包。此回答整理至钉群“Flink CDC 社区”。"

    2023-07-26 12:03:59
    赞同 展开评论

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理