开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink这个问题怎么解决?

Flink这个问题怎么解决?现在我有个这样的操作,就是原先有个接口是修改数据的接口,因为业务复杂,他是先删除后增加,然后删除失败回滚事务,添加失败也回滚事务,但是是手动的,现在有个批量更新的需求,需要我更新批量更新的时候,如果其中一个子事务失败了子事务回滚不影响外部,就是跳过这个错误的数据 继续更新后面的image.png
经过测试发现使用 事务传播机制 无论是那种事务传播机制 都不行,如果都成功还好,有一个失败后面的都会失败

REQUIRES_NEW 新建事物,如果当前已经存在事物,则挂起当前事物
NESTED 如果当前存在事物,则在嵌套事物内执行;如果当前没有事物,则与PROPAGATION_REQUIRED传播特性相同9月22日 14:29张骞

展开
收起
三分钟热度的鱼 2023-09-27 19:26:51 154 0
4 条回答
写回答
取消 提交回答
  • Flink支持基于两阶段提交的分布式事务。对于你的场景,可以在Flink作业中使用两阶段提交来确保操作的一致性。
    image.png

    参考示例:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1); // 为了简化,这里设置并行度为1
    
    // 假设这是从Kafka读取的数据流
    DataStream<MyData> inputStream = env.addSource(new FlinkKafkaConsumer<>(...));
    
    // 使用两阶段提交来确保事务性
    inputStream
        .map(data -> {
            // 这里是业务逻辑,比如删除和添加操作
            // 如果需要手动控制事务,可以在这里实现
            // ...
            return data;
        })
        .addSink(new TwoPhaseCommitSinkFunction<MyData>() {
            // 实现beginTransaction、preCommit、commit、abort等方法来控制事务
            // ...
        });
    
    env.execute();
    

    ——参考链接

    2024-01-26 20:44:33
    赞同 展开评论 打赏
  • 针对你描述的问题,我可以提供一些建议供参考:

    1. 优化逻辑设计: 考虑采用更合理的逻辑结构去组织你的代码。比如,你可以选择在单独的事务里只负责一部分任务,而不是一次性包含所有的任务。这样做可以帮助减少并发带来的影响,提高系统的稳定性和可靠性。
    2. 补偿性事务: 对于某些无法保证原子性的操作,可以采取补偿性事务的设计思路。也就是说,一旦发生错误,就立即启动另一个事务来恢复状态。这种方法通常适用于那些不能容忍任何丢失的信息的应用场景。
    3. 分布式锁: 如果你想保持一致性,那么一种可行的办法就是在服务端维护一个分布式的锁。只有获得锁的服务才会允许自己继续后续的任务。然而需要注意的是,分布式锁本身也会带来一定的开销,所以应该谨慎评估它的适用范围。
    4. 异步化: 异步编程是一种有效的技术手段,它可以让你在不牺牲响应时间的情况下处理大量的请求。你可以利用消息队列或者其他形式的异步通信来降低同步调用的压力。
    5. 负载均衡: 如果你的服务器集群足够大,也可以考虑实施负载均衡策略。这意味着你可以将工作分散到更多的机器上去,从而减轻每台机器上的压力。
    2024-01-15 11:12:34
    赞同 展开评论 打赏
  • 某政企事业单位安全运维工程师,主要从事系统运维及网络安全工作,多次获得阿里云、华为云、腾讯云征文比赛一二等奖;CTF选手,白帽,全国交通行业网络安全大赛二等奖,全国数信杯数据安全大赛银奖,手握多张EDU、CNVD、CNNVD证书,欧盟网络安全名人堂提名,联合国网络安全名人堂提名

    看起来你是在使用Spring AOP进行分布式事务控制,在这种情况下,当出现某种特定条件时,你需要重置当前线程的事务上下文。你可以这样做:

    • 在每个嵌套的事务开始的地方调用clearCurrentTransaction()。

    • 当外层事务提交或回滚时,再调用一次clearCurrentTransaction()。

    注意,这种方法并不总是保证会话级别的隔离级别,因此它不能替代传统的ACID事务模型。此外,请记住,一旦你清除了当前线程的事务上下文,后续的事务将会被视为新的独立事务。

    下面是如何应用这两个步骤的一个示例:

    @Transactional(propagation=Propagation.REQUIRES_NEW, isolationLevel=Isolation.READ_COMMITTED)
    void outerMethod() {
        clearCurrentTransaction();
    
        innerMethod();
    
        if (outerMethodWasSuccessful) {
            currentTransaction.commit();
        } else {
            currentTransaction.rollback();
        }
    }
    
    private void innerMethod() {
        clearCurrentTransaction();
    
        doSomethingExpensiveAndPotentiallyThrowingAnError();
    
        restorePreviousContext();
    }
    
    protected void restorePreviousContext() {
        TransactionAwarePlatformTransactionManager platformTransactionManager = getPlatformTransactionManager();
        PlatformTransactionQuery query = new PlatformTransactionQuery(platformTransactionManager);
        QueryResult<Transaction> result = query.findFirstByPrimaryKeys(Transaction.class, getCurrentTransactionId());
        if(result != null && !result.isEmpty()){
            Transaction transaction = result.getSingleResult();
            setCurrentTransaction(transaction);
        }
    }
    

    这段代码展示了如何在外部方法中清除当前线程的事务上下文,然后在内部方法中再次创建一个新的事务。最后,我们清理掉之前的事务上下文,以便在外部方法返回时可以正确地提交或回滚整个事务。

    2024-01-14 19:13:44
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    针对你的问题,我建议你尝试使用 Flink 的 try-catch 机制来捕获子事务中的异常,并判断是否需要回滚整个事务。这里是一个简单的例子:

    @Transactional(rollbackFor = Exception.class)
    public String saveData() {
    boolean flag = deleteData();
    if (flag) {
    try {
    // 做一些业务
    this.nestTrans1();
    } catch (Exception e) {
    // 如果子事务中出现异常,则回滚整个事务
    TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
    return "异常";
    }
    return "成功";
    } else {
    TransactionAspectSupport.currentTransactionStatus().setRolbackOnly();
    return "异常";
    }
    }
    @Transactional(rollbackFor = Exception.class)
    public void nestTrans1() {
    for (int i = 0; i < 10; i++) {
    log.info("准备执行任务");
    this.saveData();
    }
    }

    在这个例子中,saveData 方法中使用了 @Transactional(rollbackFor = Exception.class),这样在方法中抛出的异常将会触发事务回滚。在 saveData 方法中,我们尝试调用 nestTrans1 方法,如果在子事务中出现异常,我们将捕获异常并调用 TransactionAspectSupport.currentTransactionStatus().setRollbackOnly() 来回滚整个事务。如果子事务中没有异常,我们将继续执行后续操作。

    2024-01-12 21:58:35
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载