开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC只跑一个就同步挺快的,是不是哪里没配置好?自定义sink

Flink CDC只跑一个就同步挺快的,是不是哪里没配置好?自定义sink private Connection connection;

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    Class.forName("oracle.jdbc.driver.OracleDriver");
    connection = DriverManager.getConnection("jdbc:oracle:thin:@127.0.0.1:1521/orcl", "scott", "123456");
}

@Override
public void invoke(DataSyncBean value, Context context) {
    System.out.println("CustomSink:" + value);
    PreparedStatement statement = null;
    StringBuilder sql = new StringBuilder();
    //数据处理
    try {
        switch (value.getOperation()) {
            case "1":
                sql.append("insert into ");
                sql.append(value.getTable()).append("(").append(StringUtils.join(value.getFields(), ",")).append(") VALUES(");
                for (int i = 0; i < value.getValues().size(); i = i + 1) {
                    sql.append("?");
                    if (i < value.getValues().size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(")");

                statement = connection.prepareStatement(sql.toString());
                for (int i = 0; i < value.getValues().size(); i++) {
                    statement.setObject(i + 1, value.getValues().get(i));
                }
                break;
            case "2":
                sql.append("update ");
                sql.append(value.getTable()).append(" set ");
                for (int i = 0; i < value.getValues().size(); i = i + 1) {
                    if (!value.getFields().get(i).equals(value.getPk())) {
                        sql.append(value.getFields().get(i));
                        sql.append("=?");
                    }
                    if (!value.getFields().get(i).equals(value.getPk()) && i < value.getValues().size() - 1) {
                        sql.append(",");
                    }
                }
                sql.append(" where ");
                sql.append(value.getPk()).append("=?");

                statement = connection.prepareStatement(sql.toString());
                int index = 1;
                for (int i = 0; i < value.getValues().size(); i = i + 1) {
                    if (value.getFields().get(i).equals(value.getPk())) {
                        statement.setObject(value.getValues().size(), value.getPkValue());
                    } else {
                        statement.setObject(index, value.getValues().get(i));
                        index = index + 1;
                    }
                }

                break;

            case "3":
                sql.append("delete from ");
                sql.append(value.getTable()).append(" where ");
                sql.append(value.getPk()).append("=?");

                statement = connection.prepareStatement(sql.toString());
                statement.setObject(1, value.getPkValue());

                break;
        }

        if (statement != null) {
            statement.execute();
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        System.out.println(sql);
    }
}

@Override
public void close() throws Exception {
    super.close();
    if (connection != null) {
        connection.close();
    }
}

展开
收起
真的很搞笑 2023-07-01 19:31:14 71 0
3 条回答
写回答
取消 提交回答
  • 如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可能是由于以下原因之一:

    - 资源限制:多个任务同时运行时,可能会造成系统资源的竞争,从而导致同步速度变慢。您可以尝试增加系统资源,例如增加 CPU、内存等资源,以提高同步速度。 - 网络带宽限制:如果多个任务同时从数据库中读取数据并同步到外部系统中,可能会占用大量的网络带宽,从而导致同步速度变慢。您可以尝试优化网络带宽的使用,例如使用压缩算法减少数据传输量,或者使用异步方式进行数据同步等。 - 自定义 Sink 的实现:如果您使用了自定义 Sink 实现,可能会影响同步速度。请确保自定义 Sink 的实现逻辑正确,并且能够高效地处理数据。另外,如果您在自定义 Sink 实现中使用了连接池等资源管理技术,请确保资源的正确释放,以避免资源泄漏和性能问题。

    关于您提供的自定义 Sink 代码,可以尝试打印每一步操作的时间消耗,以确定哪个步骤导致同步速度变慢。您可以在代码中添加打印语句,并观察打印结果,以便进行进一步的优化和排查。

    需要注意的是,Flink CDC 的性能受到多个因素的影响,包括数据源、网络、任务并发度、数据处理逻辑等。如果您对 Flink CDC 的性能有更高的要求,可以考虑进一步优化和调整相关配置。例如,调整任务的并发度、使用更高性能的硬件、优化代码逻辑等都可能会对同步速度产生影响。

    总结来说,如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可以考虑资源限制、网络带宽限制和自定义 Sink 实现等方面的问题,并尝试优化和调整相关配置来提高同步速度。

    2023-07-30 13:36:28
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    如果在 Flink CDC 中只有一个任务运行时同步速度很快,但是在多个任务同时运行时同步速度变慢,可能是因为以下原因之一:
    资源限制:多个任务同时运行时,可能会造成系统资源的竞争,从而导致同步速度变慢。您可以尝试增加系统资源,例如增加 CPU、内存等资源,以提高同步速度。
    网络带宽限制:如果多个任务同时从数据库中读取数据并同步到外部系统中,可能会占用大量的网络带宽,从而导致同步速度变慢。您可以尝试优化网络带宽的使用,例如使用压缩算法减少数据传输量,或者使用异步方式进行数据同步等。
    自定义 Sink 的实现:如果您使用了自定义 Sink 实现,可能会影响同步速度。请确保自定义 Sink 的实现逻辑正确,并且能够高效地处理数据。另外,如果您在自定义 Sink 实现中使用了连接池等资源管理技术,请确保资源的正确释放,以避免资源泄漏和性能问题。
    关于您提到的 private Connection connection;,这可能是您在自定义 Sink 中使用的数据库连接对象。如果您在自定义 Sink 中使用了数据库连接对

    2023-07-30 11:21:25
    赞同 展开评论 打赏
  • 采集数据一般都不慢的,先试试看print结果,把每一步的操作时间打印出来,看看时间消耗在哪个步骤上了,此回答整理自钉群“Flink CDC 社区”

    2023-07-01 19:35:57
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载