使用同一段代码,只是表名不同,目的是同步同一个库中的多个表。运行在yarn上的。当启动一个表的任务时,基本没什么延迟,并且一切正常,但是当再启动另一个表的同步任务时,就会特别慢,延迟四五分钟这样,哪怕新增一笔数据也是延迟这么久。有时候还会出现数据库连接失败的报错。具体代码如下:public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing();
env.getCheckpointConfig().setCheckpointInterval(30000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setParallelism(1);
EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
String sourceDDL = "CREATE TABLE TS_XX_source (\n" +
"ACCT_ID DECIMAL,\n" +
" SEQ DECIMAL,\n" +
" CC_TYPE DECIMAL,\n" +
" CREDIT_LIMIT DECIMAL,\n" +
" EFF_DATE TIMESTAMP(3),\n" +
" EXP_DATE TIMESTAMP(3),\n" +
" SP_ID DECIMAL,\n" +
" primary key (ACCT_ID) not enforced\n" +
") WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
" 'hostname' = '******',\n" +
" 'port' = '******',\n" +
" 'username' = '******',\n" +
" 'password' = '******',\n" +
" 'database-name' = '******',\n" +
" 'schema-name' = '******'," +
" 'table-name' = '这里表名不同',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")";
String sinkDDL = "CREATE TABLE TS_XX_sink (\n" +
"ACCT_ID DECIMAL,\n" +
" SEQ DECIMAL,\n" +
" CC_TYPE DECIMAL,\n" +
" CREDIT_LIMIT DECIMAL,\n" +
" EFF_DATE TIMESTAMP(3),\n" +
" EXP_DATE TIMESTAMP(3),\n" +
" SP_ID DECIMAL,\n" +
" primary key (ACCT_ID) not enforced\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
" 'url' = '******',\n" +
" 'username' = '****',\n" +
" 'password' = '****',\n" +
" 'table-name' = '****'\n" +
")";
String transformDmlSQL = "insert into TS_XX_sink select * from TS_XX_source";
tableEnv.executeSql(sourceDDL).print();
tableEnv.executeSql(sinkDDL).print();
tableEnv.executeSql(transformDmlSQL).print();
env.execute("sync-cdc");
}
根据您提供的代码段,以下是一些可能导致同步任务运行缓慢和数据库连接失败的常见问题和建议解决方法:
并行度设置:在代码中,您将并行度设置为了1 (env.setParallelism(1)
),这意味着只有一个并行任务来处理数据。如果您同时启动了多个任务,但并行度仍然为1,可能会导致任务之间的竞争和资源争夺,从而导致某些任务运行缓慢。建议适当调整并行度,以充分利用集群资源,并确保每个任务都可以获得足够的计算能力。
数据源配置:检查您的数据源配置是否正确,并且与每个表的任务相关。确保使用了正确的连接参数、用户名、密码等信息。还要确保数据库服务器具有足够的性能和资源,以支持同时处理多个任务。
资源调整:在执行多个任务时,尤其是在相同的数据库上执行时,需要评估和调整任务所需的资源。包括内存、CPU、网络等方面的资源。确保分配给每个任务的资源足够,并避免资源争夺和瓶颈。
数据库连接池:考虑使用连接池管理数据库连接,以减少连接的创建和销毁开销。连接池可以帮助提供可重用的连接,并对连接进行管理,以支持并发访问和提高性能。
日志和错误排查:检查任务运行期间的日志文件,查看是否存在其他错误或异常信息,这可能有助于确定导致任务慢和数据库连接失败的具体原因。注意关注资源不足、网络问题、数据库负载过高等可能的问题。
调整参数:根据具体情况,可以尝试调整 Flink 的相关配置参数,如网络缓冲区大小、堆内存大小等,以优化任务的性能和资源利用。
从您提供的信息来看,这个问题可能与以下因素有关:
数据库连接资源限制:
如果您的目标数据库的连接数有限制,那么当并发执行多个同步任务时,可能会因为无法获取足够的数据库连接而导致延迟或失败。您可以检查目标数据库的连接池设置,并根据需要进行调整。
网络带宽和性能问题:
当同时运行多个表的同步任务时,可能会占用大量的网络带宽和计算资源,导致每个任务的处理速度变慢。如果您使用的机器资源有限,建议考虑使用更强大的硬件或者优化您的代码以减少资源消耗。
Flink并行度设置:
您在代码中将Flink的并行度设置为1,这意味着所有的任务都会在一个线程中串行执行。如果您的表数据量较大,这可能会导致任务执行非常慢。您可以尝试提高并行度来加速任务执行,但要注意不要超过您的硬件资源限制。
YARN资源分配:
在YARN集群上运行多个任务时,YARN会动态地分配资源给每个任务。如果资源不足,可能会导致任务延迟或失败。您可以查看YARN的日志和监控信息,了解资源分配情况,并根据需要调整YARN的配置。
CDC日志读取速度:
如果您的Oracle CDC源表中的数据变化非常频繁,那么读取CDC日志的速度可能会成为瓶颈。在这种情况下,您可能需要考虑优化您的Oracle CDC设置,例如调整scan.startup.mode
参数、增加日志读取线程等。
代码逻辑问题:
请确保您的代码没有明显的逻辑错误,比如资源泄露、死锁等问题。可以使用一些工具(如JProfiler、VisualVM等)来分析您的程序性能,找出潜在的问题。
为了更好地解决这个问题,建议您提供更多的上下文信息,例如:
这个问题可能是由于多个表的同步任务同时运行导致的。你可以尝试以下方法来解决这个问题:
增加并行度:将env.setParallelism(1);
修改为env.setParallelism(2);
或更高,以便同时运行多个表的同步任务。
调整检查点间隔和超时时间:根据实际需求调整检查点间隔和超时时间,以减少检查点之间的延迟。
优化数据库连接:检查数据库连接配置,确保所有表的同步任务都能正常连接到数据库。
使用批处理:在同步数据之前,将数据分批处理,以减少每次同步的数据量。
监控资源使用情况:使用YARN的资源管理器(ResourceManager)来监控集群中各个节点的资源使用情况,以便发现潜在的性能瓶颈。
考虑使用其他同步工具:如果上述方法都无法解决问题,可以考虑使用其他同步工具,如Apache Kafka、Apache Flink等,来实现跨表的实时数据同步。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。