Flink CDC只跑一个就同步挺快的,是不是哪里没配置好?自定义sink private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("oracle.jdbc.driver.OracleDriver");
connection = DriverManager.getConnection("jdbc:oracle:thin:@127.0.0.1:1521/orcl", "scott", "123456");
}
@Override
public void invoke(DataSyncBean value, Context context) {
System.out.println("CustomSink:" + value);
PreparedStatement statement = null;
StringBuilder sql = new StringBuilder();
//数据处理
try {
switch (value.getOperation()) {
case "1":
sql.append("insert into ");
sql.append(value.getTable()).append("(").append(StringUtils.join(value.getFields(), ",")).append(") VALUES(");
for (int i = 0; i < value.getValues().size(); i = i + 1) {
sql.append("?");
if (i < value.getValues().size() - 1) {
sql.append(",");
}
}
sql.append(")");
statement = connection.prepareStatement(sql.toString());
for (int i = 0; i < value.getValues().size(); i++) {
statement.setObject(i + 1, value.getValues().get(i));
}
break;
case "2":
sql.append("update ");
sql.append(value.getTable()).append(" set ");
for (int i = 0; i < value.getValues().size(); i = i + 1) {
if (!value.getFields().get(i).equals(value.getPk())) {
sql.append(value.getFields().get(i));
sql.append("=?");
}
if (!value.getFields().get(i).equals(value.getPk()) && i < value.getValues().size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
sql.append(value.getPk()).append("=?");
statement = connection.prepareStatement(sql.toString());
int index = 1;
for (int i = 0; i < value.getValues().size(); i = i + 1) {
if (value.getFields().get(i).equals(value.getPk())) {
statement.setObject(value.getValues().size(), value.getPkValue());
} else {
statement.setObject(index, value.getValues().get(i));
index = index + 1;
}
}
break;
case "3":
sql.append("delete from ");
sql.append(value.getTable()).append(" where ");
sql.append(value.getPk()).append("=?");
statement = connection.prepareStatement(sql.toString());
statement.setObject(1, value.getPkValue());
break;
}
if (statement != null) {
statement.execute();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(sql);
}
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
}
如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可能是由于以下原因之一:
- 资源限制:多个任务同时运行时,可能会造成系统资源的竞争,从而导致同步速度变慢。您可以尝试增加系统资源,例如增加 CPU、内存等资源,以提高同步速度。 - 网络带宽限制:如果多个任务同时从数据库中读取数据并同步到外部系统中,可能会占用大量的网络带宽,从而导致同步速度变慢。您可以尝试优化网络带宽的使用,例如使用压缩算法减少数据传输量,或者使用异步方式进行数据同步等。 - 自定义 Sink 的实现:如果您使用了自定义 Sink 实现,可能会影响同步速度。请确保自定义 Sink 的实现逻辑正确,并且能够高效地处理数据。另外,如果您在自定义 Sink 实现中使用了连接池等资源管理技术,请确保资源的正确释放,以避免资源泄漏和性能问题。
关于您提供的自定义 Sink 代码,可以尝试打印每一步操作的时间消耗,以确定哪个步骤导致同步速度变慢。您可以在代码中添加打印语句,并观察打印结果,以便进行进一步的优化和排查。
需要注意的是,Flink CDC 的性能受到多个因素的影响,包括数据源、网络、任务并发度、数据处理逻辑等。如果您对 Flink CDC 的性能有更高的要求,可以考虑进一步优化和调整相关配置。例如,调整任务的并发度、使用更高性能的硬件、优化代码逻辑等都可能会对同步速度产生影响。
总结来说,如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可以考虑资源限制、网络带宽限制和自定义 Sink 实现等方面的问题,并尝试优化和调整相关配置来提高同步速度。
如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可能是因为以下原因之一:
资源限制:多个任务同时运行时,可能会造成系统资源的竞争,从而导致同步速度变慢。您可以尝试增加系统资源,例如增加 CPU、内存等资源,以提高同步速度。
网络带宽限制:如果多个任务同时从数据库中读取数据并同步到外部系统中,可能会占用大量的网络带宽,从而导致同步速度变慢。您可以尝试优化网络带宽的使用,例如使用压缩算法减少数据传输量,或者使用异步方式进行数据同步等。
自定义 Sink 的实现:如果您使用了自定义 Sink 实现,可能会影响同步速度。请确保自定义 Sink 的实现逻辑正确,并且能够高效地处理数据。另外,如果您在自定义 Sink 实现中使用了连接池等资源管理技术,请确保资源的正确释放,以避免资源泄漏和性能问题。
关于您提到的 private Connection connection;,这可能是您在自定义 Sink 中使用的数据库连接对象。如果您在自定义 Sink 中使用了数据库连接对
采集数据一般都不慢的,先试试看print结果,把每一步的操作时间打印出来,看看时间消耗在哪个步骤上了,此回答整理自钉群“Flink CDC 社区”
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。