Flink CDC只跑一个就同步挺快的,是不是哪里没配置好?自定义sink private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    Class.forName("oracle.jdbc.driver.OracleDriver");
    connection = DriverManager.getConnection("jdbc:oracle:thin:@127.0.0.1:1521/orcl", "scott", "123456");
}
@Override
public void invoke(DataSyncBean value, Context context) {
    System.out.println("CustomSink:" + value);
    PreparedStatement statement = null;
    StringBuilder sql = new StringBuilder();
    //数据处理
    try {
        switch (value.getOperation()) {
            case "1":
                sql.append("insert into ");
                sql.append(value.getTable()).append("(").append(StringUtils.join(value.getFields(), ",")).append(") VALUES(");
                for (int i = 0; i < value.getValues().size(); i = i + 1) {
                    sql.append("?");
                    if (i < value.getValues().size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");
                statement = connection.prepareStatement(sql.toString());
                for (int i = 0; i < value.getValues().size(); i++) {
                    statement.setObject(i + 1, value.getValues().get(i));
                }
                break;
            case "2":
                sql.append("update ");
                sql.append(value.getTable()).append(" set ");
                for (int i = 0; i < value.getValues().size(); i = i + 1) {
                    if (!value.getFields().get(i).equals(value.getPk())) {
                        sql.append(value.getFields().get(i));
                        sql.append("=?");
                    }
                    if (!value.getFields().get(i).equals(value.getPk()) && i < value.getValues().size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                sql.append(value.getPk()).append("=?");
                statement = connection.prepareStatement(sql.toString());
                int index = 1;
                for (int i = 0; i < value.getValues().size(); i = i + 1) {
                    if (value.getFields().get(i).equals(value.getPk())) {
                        statement.setObject(value.getValues().size(), value.getPkValue());
                    } else {
                        statement.setObject(index, value.getValues().get(i));
                        index = index + 1;
                    }
                }
                break;
            case "3":
                sql.append("delete from ");
                sql.append(value.getTable()).append(" where ");
                sql.append(value.getPk()).append("=?");
                statement = connection.prepareStatement(sql.toString());
                statement.setObject(1, value.getPkValue());
                break;
        }
        if (statement != null) {
            statement.execute();
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        System.out.println(sql);
    }
}
@Override
public void close() throws Exception {
    super.close();
    if (connection != null) {
        connection.close();
    }
}
                    版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可能是由于以下原因之一:
- 资源限制:多个任务同时运行时,可能会造成系统资源的竞争,从而导致同步速度变慢。您可以尝试增加系统资源,例如增加 CPU、内存等资源,以提高同步速度。 - 网络带宽限制:如果多个任务同时从数据库中读取数据并同步到外部系统中,可能会占用大量的网络带宽,从而导致同步速度变慢。您可以尝试优化网络带宽的使用,例如使用压缩算法减少数据传输量,或者使用异步方式进行数据同步等。 - 自定义 Sink 的实现:如果您使用了自定义 Sink 实现,可能会影响同步速度。请确保自定义 Sink 的实现逻辑正确,并且能够高效地处理数据。另外,如果您在自定义 Sink 实现中使用了连接池等资源管理技术,请确保资源的正确释放,以避免资源泄漏和性能问题。
关于您提供的自定义 Sink 代码,可以尝试打印每一步操作的时间消耗,以确定哪个步骤导致同步速度变慢。您可以在代码中添加打印语句,并观察打印结果,以便进行进一步的优化和排查。
需要注意的是,Flink CDC 的性能受到多个因素的影响,包括数据源、网络、任务并发度、数据处理逻辑等。如果您对 Flink CDC 的性能有更高的要求,可以考虑进一步优化和调整相关配置。例如,调整任务的并发度、使用更高性能的硬件、优化代码逻辑等都可能会对同步速度产生影响。
总结来说,如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可以考虑资源限制、网络带宽限制和自定义 Sink 实现等方面的问题,并尝试优化和调整相关配置来提高同步速度。
如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可能是因为以下原因之一:
资源限制:多个任务同时运行时,可能会造成系统资源的竞争,从而导致同步速度变慢。您可以尝试增加系统资源,例如增加 CPU、内存等资源,以提高同步速度。
网络带宽限制:如果多个任务同时从数据库中读取数据并同步到外部系统中,可能会占用大量的网络带宽,从而导致同步速度变慢。您可以尝试优化网络带宽的使用,例如使用压缩算法减少数据传输量,或者使用异步方式进行数据同步等。
自定义 Sink 的实现:如果您使用了自定义 Sink 实现,可能会影响同步速度。请确保自定义 Sink 的实现逻辑正确,并且能够高效地处理数据。另外,如果您在自定义 Sink 实现中使用了连接池等资源管理技术,请确保资源的正确释放,以避免资源泄漏和性能问题。
关于您提到的 private Connection connection;,这可能是您在自定义 Sink 中使用的数据库连接对象。如果您在自定义 Sink 中使用了数据库连接对
采集数据一般都不慢的,先试试看print结果,把每一步的操作时间打印出来,看看时间消耗在哪个步骤上了,此回答整理自钉群“Flink CDC 社区”
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。