开发者社区 > 云原生 > 正文

请问 mysqlcdc sink批量提交到下游批次设置为10000 最后一批数据达不到,是为什么?

请教个简单问题 mysqlcdc sink批量提交到下游时 批次设置为10000 最后一批数据达不到10000一直不commit 用的ps.execbatch 有什么解决方案吗?

展开
收起
哒哒哒哒哒~ 2023-03-07 11:21:23 230 0
1 条回答
写回答
取消 提交回答
  • 随心分享,欢迎友善交流讨论:)

    如果使用mysqlcdc sink批量提交到下游时,最后一批数据达不到10000一直不commit,可以考虑以下两种解决方案:

    1、调整批处理的大小 你可以将批处理的大小从10000调整为一个更小的数字,例如5000,这样就可以确保最后一批数据不会过多。通过调整批处理的大小,可以平衡性能和数据一致性之间的权衡。

    2、手动提交最后一批数据 如果数据量较小,你可以在代码中检测到最后一批数据,然后手动提交这批数据。例如,可以在SinkFunction中重写close()方法,在最后一批数据时手动调用commit()方法。如果你使用的是JDBC批量提交功能,可以使用ps.executeBatch()方法提交剩余的数据。

    下面是一个伪代码示例,展示了如何在SinkFunction中手动提交最后一批数据: public class MySinkFunction extends RichSinkFunction {

    private PreparedStatement ps;
    private int batchSize = 10000;
    private List<MyData> buffer = new ArrayList<>();
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // 初始化 PreparedStatement
    }
    
    @Override
    public void invoke(MyData value, Context context) throws Exception {
        buffer.add(value);
        if (buffer.size() >= batchSize) {
            flush();
        }
    }
    
    @Override
    public void close() throws Exception {
        // 如果还有剩余的数据,手动提交
        if (buffer.size() > 0) {
            ps.executeBatch();
        }
        ps.close();
    }
    
    private void flush() throws Exception {
        ps.getConnection().setAutoCommit(false);
        for (MyData data : buffer) {
            // 将数据添加到 PreparedStatement 中
            ps.addBatch();
        }
        ps.executeBatch();
        ps.getConnection().commit();
        buffer.clear();
    }
    

    } 在这个示例中,当达到批处理大小时,会调用flush()方法将数据批量提交到下游。在close()方法中,如果还有剩余的数据,会手动提交。这样可以确保即使最后一批数据不足10000,也能被成功提交。

    2023-03-07 11:23:20
    赞同 展开评论 打赏
问答分类:
问答地址:

阿里云拥有国内全面的云原生产品技术以及大规模的云原生应用实践,通过全面容器化、核心技术互联网化、应用 Serverless 化三大范式,助力制造业企业高效上云,实现系统稳定、应用敏捷智能。拥抱云原生,让创新无处不在。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载