分布式事务之Spring/JPA/JMS事务(二)

简介: 分布式事务之Spring/JPA/JMS事务

著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

Spring事务

Spring事务机制主要包括声明式事务和编程式事务,声明式事务让我们从复杂的事务处理中得到解脱,编程式事务在实际开发中得不到广泛使用,仅供学习参考。

事务抽象

spring的事务管理提供了统一的API接口支持不同的资源,提供声明式事务方便与Spring框架集成。spring的事务管理器使用抽象的设计方式实现,以下为spring事务管理器中的逻辑实现代码 (精简了一部分,突出核心逻辑)

## 事务状态
public interface TransactionStatus extends SavepointManager{
    boolean isNewTransaction();
    boolean hasSavepoint();
    void setRollbackOnly();
    boolean isRollbackOnly();
    boolean isCompleted();
}
## 事务定义
public interface TransactionDefinition{
    int getPropagationBehavior();  #传播行为
    int getIsolationLevel();   #隔离级别
    String getName();   #事务名
    int getTimeout();    #超时时间
    boolean isReadOnly();   #是否只读
}
## 事务管理器
public interface PlatformTransactionManager{
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    void commit(TransactionStatus status) throws TransactionException;
    void rollback(TransactionStatus status) throws TransactionException;
}

事务的传播行为

Spring在 TransactionDefinition 接口中规定了 7 种类型的事务传播行为,它们规定了事务方法和事务方法发生嵌套调用时,事务是如何进行传播的

事务传播的7种行为:

事务传播行为类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

当使用 PROPAGATION_NESTED时,底层的数据源必须基于JDBC 3.0,并且实现者需要支持保存点事务机制。

事务隔离级别

spring如果没有指定事务隔离级别的话,则spring的事务隔离级别跟数据库的隔离级别走,数据库是什么隔离级别,spring就是什么隔离级别。

Spring事务的隔离级别:

  1. ISOLATION_DEFAULT(默认): 这是一个PlatfromTransactionManager默认的隔离级别,使用数据库默认的事务隔离级别. 另外四个与JDBC的隔离级别相对应
  2. ISOLATION_READ_UNCOMMITTED(未提交读取): 这是事务最低的隔离级别,它会让另外一个事务可以看到这个事务未提交的数据。 这种隔离级别会产生脏读,不可重复读和幻像读。
  3. ISOLATION_READ_COMMITTED(已提交读取): 保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据
  4. ISOLATION_REPEATABLE_READ(可重复读取): 这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。 它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。
  5. ISOLATION_SERIALIZABLE(序列化): 这是花费最高代价但是最可靠的事务隔离级别。事务被处理为顺序执行。 除了防止脏读,不可重复读外,还避免了幻像读。

接下来看一下代码方式与标签方式的事务实现:

## 代码方式实现
public OrderService{
    @Autowire 
    PlatformTransactionManager txManager;
    void buyTicket(BuyTicketDTO dto){
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        // 设置事务传播行为
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = txManager.getTransaction(def);
        try{
            ## 执行业务代码
            txManager.commit(status);
        }catch(Exception e){
            txManager.rollback(status);
        }
    }
}
## Transaction标签实现方式
public OrderService{
    @Transactonal
    void buyTick(BuyTicketDTO dto){
        // get and begin transaction manager from context
        try{
            /**业务代码*/
            // auto commit transaction
        }catch(Exception e){
            // auto rollback transaction
        }
    }
}


4b4fb5a98731d225c9ea55f3032ba7c.png

策略接口PlatformTransactionManager事务管理器的常见实现有:

DataSourceTransactionManagerJpTransactionManagerJmsTransactionManagerJtaTransactionManager

JPA简介及事务实现

JPA是Java的一个规范(Java持久性API)。 它用于在Java对象和关系数据库之间保存数据。 JPA充当面向对象的领域模型和关系数据库系统之间的桥梁。 由于JPA只是一个规范,它本身不执行任何操作。 它需要一个实现。 因此,像Hibernate,TopLink和iBatis这样的ORM工具实现了JPA数据持久性规范。

关于JPA事务实例的代码:

domian实体对象

@Entity(name = "customer")
@Data
public class Customer {
    ## id 自增长
    @Id
    @GeneratedValue
    private Long id;
    ## 唯一索引
    @Column(name = "user_name", unique = true)
    private String username;
    private String password;
    private String role;
}

dao 接口

// 继成JpaRepository中的方法,其中已经包含基本的CRUD
public interface CustomerRepository extends JpaRepository<Customer, Long> {
    Customer findOneByUsername(String username);
}

service 业务操作

以下2种事务实现的效果是一样的,意在告诉大家如何使用代码的方式实现与注解声明事务相同的效果。

@Service
public class CustomerServiceTxInAnnotation {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
    @Autowired
    private CustomerRepository customerRepository;
    ## 使用注解的方式声明事务存在
    @Transactional
    public Customer create(Customer customer) {
        LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用户已经存在");
        }
        customer.setUsername("Annotation:" + customer.getUsername());
        return customerRepository.save(customer);
    }
}
@Service
public class CustomerServiceTxInCode {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);
    @Autowired
    private CustomerRepository customerRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;
    public Customer create(Customer customer) {
        LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用户已经存在");
        }
        ## 使用代码的方式来声明事务存在
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
        ## 当使用ISOLATION_SERIALIZABLE级别时,如果外部没事务存在,则本身创建事务,,所以submitError方法抛出异常可以回滚
        //def.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
        ## 当使用PROPAGATION_REQUIRED级别时,如果外部没事务存在,则本身也不存在事务,,所以submitError方法抛出异常依然可以保存数据成功 
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            customer.setUsername("Code:" + customer.getUsername());
            customerRepository.save(customer);
            submitError();
            transactionManager.commit(status);
            return customer;
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
    private void submitError(){
        throw new RuntimeException("some Data error ")
    }
}

controller 控制层

@RestController
@RequestMapping("/api/customer")
public class CustomerResource {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class);
    @Autowired
    private CustomerServiceTxInAnnotation customerService;
    @Autowired
    private CustomerServiceTxInCode customerServiceInCode;
    @Autowired
    private CustomerRepository customerRepository;
    @PostMapping("/annotation")
    public Customer createInAnnotation(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername());
        return customerService.create(customer);
    }
    @PostMapping("/code")
    public Customer createInCode(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in code create customer:{}", customer.getUsername());
        return customerServiceInCode.create(customer);
    }
    @GetMapping("")
    public List<Customer> getAll() {
        return customerRepository.findAll();
    }
}

接下来看一下程序的执行结果及JPA事务的管理过程:

Loca lhost :~ mav larns 
 Locathost :~ mavlarn $ curl http://localhost:8080/api/customer[] localhost :~ mavlarns 
 localhost :~ mavlarn $ curl - X POST - d '{" username ":" imoocz ", pplication / json 'http://localhost:8080/api/customer/ code 
(" id ":1," username ":"imooc2"," password ":"111111"," role ":" USER "} localhost :~ mavlarns 
 Locathost :~ mavlarns 
 Locaihost :~ mavtar 
 curl http://localhost:8e80/api/customer
[" id ":1," username :
'imooc2," password ":"111111"," role ":" USER "}] localhost :~ mavlarn $
 localhost :~ mavlarn $ curl - X I 
 PosT - d '{" username ":"imooc1",
" pasSword :"1l11l1",
 pplication / json 'http://localhost:8080/api/ customer / annotation 
注解方式插入一条据
[“ id ":2,“ username ”:“imooc1"," password ":"1lll1"," role ":" USER } localhost :~ mavlarn $ locathost :~ mavlarn $
 localhost :~ mavlarn $ curl http://localthost:8080/api/customer
查询出2条数据
[" id ":1," username ":"imooc2"," password ":"111111"," role ":" USER "},(" id ":2,"
" USER "}] localhost :~ mavlarn $
 Localhost :~ mavlarn $
 localhost :~ mavlarns $ curl - X POST - d '" username ":"imooc1",
 pplication / json 'http://localthost:8080/api/customer/annotat1on再次插入相同数据包吃,数据插入失败” timestamp ”:1516894291878," status ":500," error ":" Internal Server Error "
 exception ”:” org . springf 
 ityViolationException "," message ": could not execute statement ; SQL [ n / a ]; constraint [\" UK _QFRM6 DEX _5 ON PUBLIC . CUSTOMER ( USER _ NAME ) VALUES ('imooc1',2
 Statement :\ ninsert into customer 
 ser _ name ) values ( null ,7,7,7)[23505-196]]; nested exception is org . hibernate . exception . Const : could not execute statement "," path ":"/ api / customer / annotation "} localhost :~ mavlarn $
没有数据
" password “:“111111”," role ":" USER 代码方式插入一条数据
" ro Le ":" USER 
" username :"inooc1”,“ pas 
" password :"111111”," role ”:" USER 
2) N "; SQL

在整个事务管理过程中使用的是Spring事务控制,并且由相关ORM框架实现JPA规范

JMS事务原理

Spring JMS Session

  • 通过Session进行事务管理操作
  • Session 是一个thread-bound(线程范围内)
  • 事务上下文:一个线程一个Session

2种Spring JMS事务类型

  • Session管理的事务:原生事务
  • 外部管理的事务:JmsTransactionManager、JTA

Srping JMS事务机制过程

Session原生事务:

1c098d3ff8d89475e9af2dc30f7c776.png

JmsTransactionManager 外部管理的事务:

1c384dbda5d0a67cb0629e759f491cd.png

##注解方式注入Spring Bean
@EnableJms
@Configuration
public class JmsConfig {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
    @Bean
    public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
        LOG.debug("init jms template with converter.");
        JmsTemplate template = new JmsTemplate();
        ## JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必须是同一个,不能在这里封装成caching之类的。
        template.setConnectionFactory(connectionFactory); 
        return template;
    }
    ## 这个用于设置 @JmsListener使用的containerFactory
    @Bean
    public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory,
                                                     DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                     PlatformTransactionManager transactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setTransactionManager(transactionManager);
        factory.setCacheLevelName("CACHE_CONNECTION");
        factory.setReceiveTimeout(10000L);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}
@Service
public class CustomerService {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
    @Autowired
    JmsTemplate jmsTemplate;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @PostConstruct
    public void init() {
        jmsTemplate.setReceiveTimeout(3000);
    }
    ## 原生事务
    @JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory")
    public void handle(String msg) {
        LOG.debug("Get JMS message to from customer:{}", msg);
        String reply = "Replied - " + msg;
        jmsTemplate.convertAndSend("customer:msg:reply", reply);
        if (msg.contains("error")) {
            simulateError();
        }
    }
    ## JmsTransactionManager事务
    @JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory")
    public void handle2(String msg) {
        LOG.debug("Get JMS message2 to from customer:{}", msg);
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            String reply = "Replied-2 - " + msg;
            jmsTemplate.convertAndSend("customer:msg:reply", reply);
            if (!msg.contains("error")) {
                transactionManager.commit(status);
            } else {
                transactionManager.rollback(status);
            }
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
    private void simulateError() {
        throw new RuntimeException("some Data error.");
    }
}

Spring 本地事务

Spring 本地事务紧密依赖于底层资源管理器(例如数据库连接 ),事务处理局限在当前事务资源内。此种事务处理方式不存在对应用服务器的依赖,因而部署灵活却无法支持多数据源的分布式事务。

  • Spring容器管理事务的生命周期
  • 通过Spring事务接口调用
  • 业务代码与具体事务的实现无关

在数据库连接中使用本地事务Demo:

public void transferAccount() { 
       Connection conn = null; 
       Statement stmt = null; 
       try{ 
           conn = getDataSource().getConnection(); 
           ## 将自动提交设置为 false,
           ## 若设置为 true 则数据库将会把每一次数据更新认定为一个事务并自动提交
           conn.setAutoCommit(false);
           stmt = conn.createStatement(); 
           ## 将 A 账户中的金额减少 500 
           stmt.execute("\
           update t_account set amount = amount - 500 where account_id = 'A'");
           ## 将 B 账户中的金额增加 500 
           stmt.execute("\
           update t_account set amount = amount + 500 where account_id = 'B'");
           ## 提交事务
           conn.commit();
           ## 事务提交:转账的两步操作同时成功
       } catch(SQLException sqle){            
           try{ 
               ## 发生异常,回滚在本事务中的操做
              conn.rollback();
               ## 事务回滚:转账的两步操作完全撤销
               stmt.close(); 
               conn.close(); 
           }catch(Exception ignore){ } 
           sqle.printStackTrace(); 
       } 
   }

本地事务机制过程图:

6cc889fc653c3db804a8cb0e1d1e1a7.png

Spring 外部(全局)事务

  • 外部事务管理器提供事务管理
  • 通过Spring事务接口,调用外部管理器
  • 使用JNDI等方式获取外部事务管理器的实例
  • 外部事务管理器一般由应用服务器提供、如JBoss等

什么是JNDI?

参考==>JNDI到底是什么?


目录
相关文章
|
1月前
|
SQL Java 关系型数据库
Spring事务传播机制:7种姿势教你玩转"事务接力赛"
事务传播机制是Spring框架中用于管理事务行为的重要概念,它决定了在方法调用时事务如何传递与执行。通过7种传播行为,开发者可以灵活控制事务边界,适应不同业务场景。例如:REQUIRED默认加入或新建事务,REQUIRES_NEW独立开启新事务,NESTED支持嵌套回滚等。合理使用传播机制不仅能保障数据一致性,还能提升系统性能与健壮性。掌握这“七种人格”,才能在复杂业务中游刃有余。
|
1月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
113 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
2月前
|
Java 关系型数据库 数据库
深度剖析【Spring】事务:万字详解,彻底掌握传播机制与事务原理
在Java开发中,Spring框架通过事务管理机制,帮我们轻松实现了这种“承诺”。它不仅封装了底层复杂的事务控制逻辑(比如手动开启、提交、回滚事务),还提供了灵活的配置方式,让开发者能专注于业务逻辑,而不用纠结于事务细节。
|
7月前
|
Java Spring
Spring中事务失效的场景
因为Spring事务是基于代理来实现的,所以某个加了@Transactional的⽅法只有是被代理对象调⽤时, 那么这个注解才会⽣效 , 如果使用的是被代理对象调用, 那么@Transactional会失效 同时如果某个⽅法是private的,那么@Transactional也会失效,因为底层cglib是基于⽗⼦类来实现 的,⼦类是不能重载⽗类的private⽅法的,所以⽆法很好的利⽤代理,也会导致@Transactianal失效 如果在业务中对异常进行了捕获处理 , 出现异常后Spring框架无法感知到异常, @Transactional也会失效
|
7月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——常见问题总结
本文总结了Spring Boot中使用事务的常见问题,虽然通过`@Transactional`注解可以轻松实现事务管理,但在实际项目中仍有许多潜在坑点。文章详细分析了三个典型问题:1) 异常未被捕获导致事务未回滚,需明确指定`rollbackFor`属性;2) 异常被try-catch“吃掉”,应避免在事务方法中直接处理异常;3) 事务范围与锁范围不一致引发并发问题,建议调整锁策略以覆盖事务范围。这些问题看似简单,但一旦发生,排查难度较大,因此开发时需格外留意。最后,文章提供了课程源代码下载地址,供读者实践参考。
156 0
|
7月前
|
Java 关系型数据库 数据库
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——Spring Boot 事务配置
本文介绍了 Spring Boot 中的事务配置与使用方法。首先需要导入 MySQL 依赖,Spring Boot 会自动注入 `DataSourceTransactionManager`,无需额外配置即可通过 `@Transactional` 注解实现事务管理。接着通过创建一个用户插入功能的示例,展示了如何在 Service 层手动抛出异常以测试事务回滚机制。测试结果表明,数据库中未新增记录,证明事务已成功回滚。此过程简单高效,适合日常开发需求。
994 0
|
7月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot事务配置管理——事务相关
本文介绍Spring Boot事务配置管理,阐述事务在企业应用开发中的重要性。事务确保数据操作可靠,任一异常均可回滚至初始状态,如转账、购票等场景需全流程执行成功才算完成。同时,事务管理在Spring Boot的service层广泛应用,但根据实际需求也可能存在无需事务的情况,例如独立数据插入操作。
176 0
|
4月前
|
Java API 数据库
JPA简介:Spring Boot环境下的实践指南
上述内容仅是JPA在Spring Boot环境下使用的冰山一角,实际的实践中你会发现更深更广的应用。总而言之,只要掌握了JPA的规则,你就可以借助Spring Boot无比丰富的功能,娴熟地驾驶这台高性能的跑车,在属于你的程序世界里驰骋。
172 15
|
5月前
|
人工智能 Java 数据库连接
Spring事务失效场景
本文深入探讨了Spring框架中事务管理可能失效的几种常见场景及解决方案,包括事务方法访问级别不当、方法内部自调用、错误的异常处理、事务管理器或数据源配置错误、数据库不支持事务以及不合理的事务传播行为或隔离级别。通过合理配置和正确使用`@Transactional`注解,开发者可以有效避免这些问题,确保应用的数据一致性和完整性。
257 10
|
4月前
|
Java 关系型数据库 MySQL
【Spring】【事务】初学者直呼学会了的Spring事务入门
本文深入解析了Spring事务的核心概念与使用方法。Spring事务是一种数据库事务管理机制,通过确保操作的原子性、一致性、隔离性和持久性(ACID),维护数据完整性。文章详细讲解了声明式事务(@Transactional注解)和编程式事务(TransactionTemplate、PlatformTransactionManager)的区别与用法,并探讨了事务传播行为(如REQUIRED、REQUIRES_NEW等)及隔离级别(如READ_COMMITTED、REPEATABLE_READ)。
344 1