Flink这个问题怎么解决?现在我有个这样的操作,就是原先有个接口是修改数据的接口,因为业务复杂,他是先删除后增加,然后删除失败回滚事务,添加失败也回滚事务,但是是手动的,现在有个批量更新的需求,需要我更新批量更新的时候,如果其中一个子事务失败了子事务回滚不影响外部,就是跳过这个错误的数据 继续更新后面的
经过测试发现使用 事务传播机制 无论是那种事务传播机制 都不行,如果都成功还好,有一个失败后面的都会失败
REQUIRES_NEW 新建事物,如果当前已经存在事物,则挂起当前事物
NESTED 如果当前存在事物,则在嵌套事物内执行;如果当前没有事物,则与PROPAGATION_REQUIRED传播特性相同9月22日 14:29张骞
Flink支持基于两阶段提交的分布式事务。对于你的场景,可以在Flink作业中使用两阶段提交来确保操作的一致性。
参考示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 为了简化,这里设置并行度为1
// 假设这是从Kafka读取的数据流
DataStream<MyData> inputStream = env.addSource(new FlinkKafkaConsumer<>(...));
// 使用两阶段提交来确保事务性
inputStream
.map(data -> {
// 这里是业务逻辑,比如删除和添加操作
// 如果需要手动控制事务,可以在这里实现
// ...
return data;
})
.addSink(new TwoPhaseCommitSinkFunction<MyData>() {
// 实现beginTransaction、preCommit、commit、abort等方法来控制事务
// ...
});
env.execute();
——参考链接。
针对你描述的问题,我可以提供一些建议供参考:
看起来你是在使用Spring AOP进行分布式事务控制,在这种情况下,当出现某种特定条件时,你需要重置当前线程的事务上下文。你可以这样做:
在每个嵌套的事务开始的地方调用clearCurrentTransaction()。
当外层事务提交或回滚时,再调用一次clearCurrentTransaction()。
注意,这种方法并不总是保证会话级别的隔离级别,因此它不能替代传统的ACID事务模型。此外,请记住,一旦你清除了当前线程的事务上下文,后续的事务将会被视为新的独立事务。
下面是如何应用这两个步骤的一个示例:
@Transactional(propagation=Propagation.REQUIRES_NEW, isolationLevel=Isolation.READ_COMMITTED)
void outerMethod() {
clearCurrentTransaction();
innerMethod();
if (outerMethodWasSuccessful) {
currentTransaction.commit();
} else {
currentTransaction.rollback();
}
}
private void innerMethod() {
clearCurrentTransaction();
doSomethingExpensiveAndPotentiallyThrowingAnError();
restorePreviousContext();
}
protected void restorePreviousContext() {
TransactionAwarePlatformTransactionManager platformTransactionManager = getPlatformTransactionManager();
PlatformTransactionQuery query = new PlatformTransactionQuery(platformTransactionManager);
QueryResult<Transaction> result = query.findFirstByPrimaryKeys(Transaction.class, getCurrentTransactionId());
if(result != null && !result.isEmpty()){
Transaction transaction = result.getSingleResult();
setCurrentTransaction(transaction);
}
}
这段代码展示了如何在外部方法中清除当前线程的事务上下文,然后在内部方法中再次创建一个新的事务。最后,我们清理掉之前的事务上下文,以便在外部方法返回时可以正确地提交或回滚整个事务。
针对你的问题,我建议你尝试使用 Flink 的 try-catch 机制来捕获子事务中的异常,并判断是否需要回滚整个事务。这里是一个简单的例子:
@Transactional(rollbackFor = Exception.class)
public String saveData() {
boolean flag = deleteData();
if (flag) {
try {
// 做一些业务
this.nestTrans1();
} catch (Exception e) {
// 如果子事务中出现异常,则回滚整个事务
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return "异常";
}
return "成功";
} else {
TransactionAspectSupport.currentTransactionStatus().setRolbackOnly();
return "异常";
}
}
@Transactional(rollbackFor = Exception.class)
public void nestTrans1() {
for (int i = 0; i < 10; i++) {
log.info("准备执行任务");
this.saveData();
}
}
在这个例子中,saveData 方法中使用了 @Transactional(rollbackFor = Exception.class),这样在方法中抛出的异常将会触发事务回滚。在 saveData 方法中,我们尝试调用 nestTrans1 方法,如果在子事务中出现异常,我们将捕获异常并调用 TransactionAspectSupport.currentTransactionStatus().setRollbackOnly() 来回滚整个事务。如果子事务中没有异常,我们将继续执行后续操作。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。