问题1:Flink CDC中我这边规则维表有3万多记录,业务数据kafka 读取,lookup join 该规则维表,但关联字段不是规则维表的主键,关联后可能会输出多条记录
现在偶尔出现 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer 的报错
请问该如何优化?
规则维表用jdbc 读取,设置了参数
lookup.cache.max-rows'='10000'
'lookup.cache.ttl'='60s'
3万多记录会变,譬如新增规则,修改规则,就会变更,频率不高。
问题2:我用flink table 的jdbc 读取,joins时对维表 FOR SYSTEM_TIME AS OF t1.proctime
如果要LRU算法,是自定义source 吗?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
问题1:如果您的规则维表有3万多记录,并且关联字段不是主键,可能导致 Lookup Join 后输出多条记录。同时,偶尔出现 TIMED_WAITING
错误。针对这种情况,以下是几点优化建议:
调整缓存参数:适当调整 Lookup 的缓存参数可以改善性能。根据您的描述,设置了 'lookup.cache.max-rows'='10000'
和 'lookup.cache.ttl'='60s'
,可以尝试增加 lookup.cache.max-rows
的值来提高缓存大小,以适应更大的规则维表。同时,根据业务数据更新频率,合理调整 lookup.cache.ttl
的值。
考虑使用异步 I/O:Flink 1.14 引入了异步 I/O 功能,可以进一步提高 Lookup Join 的性能。通过异步请求数据库,可以在等待时间内并行地进行其他计算,从而提高整体吞吐量。您可以尝试使用 Flink 的 Async I/O 功能,将 Lookup 操作改为异步模式。
调整资源配置:根据任务的需求和资源情况,调整 Flink 集群的资源分配,包括内存、CPU 等。确保足够的资源分配给 CDC 作业,以避免过度竞争和资源瓶颈导致的性能问题。
数据预处理:考虑在 Flink SQL 之前进行数据预处理,例如过滤或聚合规则维表的数据,以减少 Lookup Join 操作的数据量。这样可以降低计算和网络开销,并提高查询性能。
问题2:如果您使用 Flink Table 的 JDBC 读取,并希望对维表应用 LRU(最近最少使用)算法,可以使用自定义的 Source 实现此功能。您可以实现一个自定义的 TableSource,使用 LRU 算法来缓存维表数据并提供读取功能。
在自定义的 TableSource 中,您可以通过重写 getLookupTable
方法来实现缓存逻辑,该方法返回一个 Table
对象,表示维表数据。在实现中,您可以使用 LRU 算法来管理缓存大小和淘汰机制,并根据需求进行数据的加载和更新。
以下是伪代码示例:
public class CustomTableSource implements LookupTableSource {
private Map<Key, Value> cache; // LRU cache for table data
public Table getLookupTable(TableEnvironment tEnv) {
// Check cache and load data if not present or expired
if (cache == null || isExpired()) {
loadData();
}
// Construct a Table object from cache
TableSchema schema = ...;
List<Row> rows = ...; // Convert cache map to List<Row>
return tEnv.fromValues(rows, schema);
}
// Other methods for LRU cache management and data loading/updating
}
请注意,这只是一个示例,实际实现需要根据您的具体需求和环境进行调整。
如果您在使用 Flink CDC 进行数据同步时遇到性能问题,可以考虑以下几种优化方法:
提高并发度:并发度是指 Flink CDC 在同步数据时同时处理的数据量。通过增加并发度,可以提高 Flink CDC 的数据处理能力,从而提高数据同步的速度。您可以在 Flink CDC 的配置文件中增加并发度参数,例如:
Copy
--parallelism 10
这将使 Flink CDC 使用 10 个并发线程同时处理数据。
增加缓冲区大小:在 Flink CDC 中,缓冲区用于存储待处理的数据,可以通过增加缓冲区大小来提高数据处理能力。您可以在 Flink CDC 的配置文件中增加缓冲区大小参数,例如:
Copy
--buffer-size 10000
这将使 Flink CDC 使用大小为 10000 的缓冲区来存储待处理的数据。
使用分区键:在进行数据同步时,使用分区键可以将数据按照指定的键值进行分区,从而提高数据处理效率。您可以在 Flink CDC 的配置文件中设置分区键,例如:
Copy
--partition.column name
这将使 Flink CDC 使用名为 "name" 的列作为分区键。
优化目标源:如果您的目标源是另一个数据库,可以考虑优化目标源的性能,例如增加目标数据库的缓存大小、增加目标数据库的处理能力等。
"回答1: lookup.cache.max-rows'='10000'。直接不要缓存
'lookup.cache.ttl'='60s'
把维表的ttl干掉,你加到内存里,60s内数据变了,缓存里有,你join后的数据也不对。如果还是想要缓存减少数据库的压力,可以把减少拉取数据的条数,100?200?300? 用LRU算法淘汰数据。
回答2:Jdbc不需要自定义,但是需要传一个目标数据库jdbc驱动包。此回答整理至钉群“Flink CDC 社区”。"
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。