分布式事务之Spring/JPA/JMS事务(二)

简介: 分布式事务之Spring/JPA/JMS事务

著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

Spring事务

Spring事务机制主要包括声明式事务和编程式事务,声明式事务让我们从复杂的事务处理中得到解脱,编程式事务在实际开发中得不到广泛使用,仅供学习参考。

事务抽象

spring的事务管理提供了统一的API接口支持不同的资源,提供声明式事务方便与Spring框架集成。spring的事务管理器使用抽象的设计方式实现,以下为spring事务管理器中的逻辑实现代码 (精简了一部分,突出核心逻辑)

## 事务状态
public interface TransactionStatus extends SavepointManager{
    boolean isNewTransaction();
    boolean hasSavepoint();
    void setRollbackOnly();
    boolean isRollbackOnly();
    boolean isCompleted();
}
## 事务定义
public interface TransactionDefinition{
    int getPropagationBehavior();  #传播行为
    int getIsolationLevel();   #隔离级别
    String getName();   #事务名
    int getTimeout();    #超时时间
    boolean isReadOnly();   #是否只读
}
## 事务管理器
public interface PlatformTransactionManager{
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    void commit(TransactionStatus status) throws TransactionException;
    void rollback(TransactionStatus status) throws TransactionException;
}

事务的传播行为

Spring在 TransactionDefinition 接口中规定了 7 种类型的事务传播行为,它们规定了事务方法和事务方法发生嵌套调用时,事务是如何进行传播的

事务传播的7种行为:

事务传播行为类型 说明
PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。
PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

当使用 PROPAGATION_NESTED时,底层的数据源必须基于JDBC 3.0,并且实现者需要支持保存点事务机制。

事务隔离级别

spring如果没有指定事务隔离级别的话,则spring的事务隔离级别跟数据库的隔离级别走,数据库是什么隔离级别,spring就是什么隔离级别。

Spring事务的隔离级别:

  1. ISOLATION_DEFAULT(默认): 这是一个PlatfromTransactionManager默认的隔离级别,使用数据库默认的事务隔离级别. 另外四个与JDBC的隔离级别相对应
  2. ISOLATION_READ_UNCOMMITTED(未提交读取): 这是事务最低的隔离级别,它会让另外一个事务可以看到这个事务未提交的数据。 这种隔离级别会产生脏读,不可重复读和幻像读。
  3. ISOLATION_READ_COMMITTED(已提交读取): 保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据
  4. ISOLATION_REPEATABLE_READ(可重复读取): 这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。 它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。
  5. ISOLATION_SERIALIZABLE(序列化): 这是花费最高代价但是最可靠的事务隔离级别。事务被处理为顺序执行。 除了防止脏读,不可重复读外,还避免了幻像读。

接下来看一下代码方式与标签方式的事务实现:

## 代码方式实现
public OrderService{
    @Autowire 
    PlatformTransactionManager txManager;
    void buyTicket(BuyTicketDTO dto){
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        // 设置事务传播行为
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = txManager.getTransaction(def);
        try{
            ## 执行业务代码
            txManager.commit(status);
        }catch(Exception e){
            txManager.rollback(status);
        }
    }
}
## Transaction标签实现方式
public OrderService{
    @Transactonal
    void buyTick(BuyTicketDTO dto){
        // get and begin transaction manager from context
        try{
            /**业务代码*/
            // auto commit transaction
        }catch(Exception e){
            // auto rollback transaction
        }
    }
}


4b4fb5a98731d225c9ea55f3032ba7c.png

策略接口PlatformTransactionManager事务管理器的常见实现有:

DataSourceTransactionManagerJpTransactionManagerJmsTransactionManagerJtaTransactionManager

JPA简介及事务实现

JPA是Java的一个规范(Java持久性API)。 它用于在Java对象和关系数据库之间保存数据。 JPA充当面向对象的领域模型和关系数据库系统之间的桥梁。 由于JPA只是一个规范,它本身不执行任何操作。 它需要一个实现。 因此,像Hibernate,TopLink和iBatis这样的ORM工具实现了JPA数据持久性规范。

关于JPA事务实例的代码:

domian实体对象

@Entity(name = "customer")
@Data
public class Customer {
    ## id 自增长
    @Id
    @GeneratedValue
    private Long id;
    ## 唯一索引
    @Column(name = "user_name", unique = true)
    private String username;
    private String password;
    private String role;
}

dao 接口

// 继成JpaRepository中的方法,其中已经包含基本的CRUD
public interface CustomerRepository extends JpaRepository<Customer, Long> {
    Customer findOneByUsername(String username);
}

service 业务操作

以下2种事务实现的效果是一样的,意在告诉大家如何使用代码的方式实现与注解声明事务相同的效果。

@Service
public class CustomerServiceTxInAnnotation {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInAnnotation.class);
    @Autowired
    private CustomerRepository customerRepository;
    ## 使用注解的方式声明事务存在
    @Transactional
    public Customer create(Customer customer) {
        LOG.info("CustomerService In Annotation create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用户已经存在");
        }
        customer.setUsername("Annotation:" + customer.getUsername());
        return customerRepository.save(customer);
    }
}
@Service
public class CustomerServiceTxInCode {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerServiceTxInCode.class);
    @Autowired
    private CustomerRepository customerRepository;
    @Autowired
    private PlatformTransactionManager transactionManager;
    public Customer create(Customer customer) {
        LOG.info("CustomerService In Code create customer:{}", customer.getUsername());
        if (customer.getId() != null) {
            throw new RuntimeException("用户已经存在");
        }
        ## 使用代码的方式来声明事务存在
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setIsolationLevel(TransactionDefinition.ISOLATION_SERIALIZABLE);
        ## 当使用ISOLATION_SERIALIZABLE级别时,如果外部没事务存在,则本身创建事务,,所以submitError方法抛出异常可以回滚
        //def.setPropagationBehavior(TransactionDefinition.PROPAGATION_NOT_SUPPORTED);
        ## 当使用PROPAGATION_REQUIRED级别时,如果外部没事务存在,则本身也不存在事务,,所以submitError方法抛出异常依然可以保存数据成功 
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            customer.setUsername("Code:" + customer.getUsername());
            customerRepository.save(customer);
            submitError();
            transactionManager.commit(status);
            return customer;
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
    private void submitError(){
        throw new RuntimeException("some Data error ")
    }
}

controller 控制层

@RestController
@RequestMapping("/api/customer")
public class CustomerResource {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerResource.class);
    @Autowired
    private CustomerServiceTxInAnnotation customerService;
    @Autowired
    private CustomerServiceTxInCode customerServiceInCode;
    @Autowired
    private CustomerRepository customerRepository;
    @PostMapping("/annotation")
    public Customer createInAnnotation(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in annotation create customer:{}", customer.getUsername());
        return customerService.create(customer);
    }
    @PostMapping("/code")
    public Customer createInCode(@RequestBody Customer customer) {
        LOG.info("CustomerResource create in code create customer:{}", customer.getUsername());
        return customerServiceInCode.create(customer);
    }
    @GetMapping("")
    public List<Customer> getAll() {
        return customerRepository.findAll();
    }
}

接下来看一下程序的执行结果及JPA事务的管理过程:

Loca lhost :~ mav larns 
 Locathost :~ mavlarn $ curl http://localhost:8080/api/customer[] localhost :~ mavlarns 
 localhost :~ mavlarn $ curl - X POST - d '{" username ":" imoocz ", pplication / json 'http://localhost:8080/api/customer/ code 
(" id ":1," username ":"imooc2"," password ":"111111"," role ":" USER "} localhost :~ mavlarns 
 Locathost :~ mavlarns 
 Locaihost :~ mavtar 
 curl http://localhost:8e80/api/customer
[" id ":1," username :
'imooc2," password ":"111111"," role ":" USER "}] localhost :~ mavlarn $
 localhost :~ mavlarn $ curl - X I 
 PosT - d '{" username ":"imooc1",
" pasSword :"1l11l1",
 pplication / json 'http://localhost:8080/api/ customer / annotation 
注解方式插入一条据
[“ id ":2,“ username ”:“imooc1"," password ":"1lll1"," role ":" USER } localhost :~ mavlarn $ locathost :~ mavlarn $
 localhost :~ mavlarn $ curl http://localthost:8080/api/customer
查询出2条数据
[" id ":1," username ":"imooc2"," password ":"111111"," role ":" USER "},(" id ":2,"
" USER "}] localhost :~ mavlarn $
 Localhost :~ mavlarn $
 localhost :~ mavlarns $ curl - X POST - d '" username ":"imooc1",
 pplication / json 'http://localthost:8080/api/customer/annotat1on再次插入相同数据包吃,数据插入失败” timestamp ”:1516894291878," status ":500," error ":" Internal Server Error "
 exception ”:” org . springf 
 ityViolationException "," message ": could not execute statement ; SQL [ n / a ]; constraint [\" UK _QFRM6 DEX _5 ON PUBLIC . CUSTOMER ( USER _ NAME ) VALUES ('imooc1',2
 Statement :\ ninsert into customer 
 ser _ name ) values ( null ,7,7,7)[23505-196]]; nested exception is org . hibernate . exception . Const : could not execute statement "," path ":"/ api / customer / annotation "} localhost :~ mavlarn $
没有数据
" password “:“111111”," role ":" USER 代码方式插入一条数据
" ro Le ":" USER 
" username :"inooc1”,“ pas 
" password :"111111”," role ”:" USER 
2) N "; SQL

在整个事务管理过程中使用的是Spring事务控制,并且由相关ORM框架实现JPA规范

JMS事务原理

Spring JMS Session

  • 通过Session进行事务管理操作
  • Session 是一个thread-bound(线程范围内)
  • 事务上下文:一个线程一个Session

2种Spring JMS事务类型

  • Session管理的事务:原生事务
  • 外部管理的事务:JmsTransactionManager、JTA

Srping JMS事务机制过程

Session原生事务:

1c098d3ff8d89475e9af2dc30f7c776.png

JmsTransactionManager 外部管理的事务:

1c384dbda5d0a67cb0629e759f491cd.png

##注解方式注入Spring Bean
@EnableJms
@Configuration
public class JmsConfig {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
    @Bean
    public JmsTemplate initJmsTemplate(ConnectionFactory connectionFactory) {
        LOG.debug("init jms template with converter.");
        JmsTemplate template = new JmsTemplate();
        ## JmsTemplate使用的connectionFactory跟JmsTransactionManager使用的必须是同一个,不能在这里封装成caching之类的。
        template.setConnectionFactory(connectionFactory); 
        return template;
    }
    ## 这个用于设置 @JmsListener使用的containerFactory
    @Bean
    public JmsListenerContainerFactory<?> msgFactory(ConnectionFactory connectionFactory,
                                                     DefaultJmsListenerContainerFactoryConfigurer configurer,
                                                     PlatformTransactionManager transactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setTransactionManager(transactionManager);
        factory.setCacheLevelName("CACHE_CONNECTION");
        factory.setReceiveTimeout(10000L);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new JmsTransactionManager(connectionFactory);
    }
}
@Service
public class CustomerService {
    private static final Logger LOG = LoggerFactory.getLogger(CustomerService.class);
    @Autowired
    JmsTemplate jmsTemplate;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @PostConstruct
    public void init() {
        jmsTemplate.setReceiveTimeout(3000);
    }
    ## 原生事务
    @JmsListener(destination = "customer:msg:new", containerFactory = "msgFactory")
    public void handle(String msg) {
        LOG.debug("Get JMS message to from customer:{}", msg);
        String reply = "Replied - " + msg;
        jmsTemplate.convertAndSend("customer:msg:reply", reply);
        if (msg.contains("error")) {
            simulateError();
        }
    }
    ## JmsTransactionManager事务
    @JmsListener(destination = "customer:msg2:new", containerFactory = "msgFactory")
    public void handle2(String msg) {
        LOG.debug("Get JMS message2 to from customer:{}", msg);
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            String reply = "Replied-2 - " + msg;
            jmsTemplate.convertAndSend("customer:msg:reply", reply);
            if (!msg.contains("error")) {
                transactionManager.commit(status);
            } else {
                transactionManager.rollback(status);
            }
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
    private void simulateError() {
        throw new RuntimeException("some Data error.");
    }
}

Spring 本地事务

Spring 本地事务紧密依赖于底层资源管理器(例如数据库连接 ),事务处理局限在当前事务资源内。此种事务处理方式不存在对应用服务器的依赖,因而部署灵活却无法支持多数据源的分布式事务。

  • Spring容器管理事务的生命周期
  • 通过Spring事务接口调用
  • 业务代码与具体事务的实现无关

在数据库连接中使用本地事务Demo:

public void transferAccount() { 
       Connection conn = null; 
       Statement stmt = null; 
       try{ 
           conn = getDataSource().getConnection(); 
           ## 将自动提交设置为 false,
           ## 若设置为 true 则数据库将会把每一次数据更新认定为一个事务并自动提交
           conn.setAutoCommit(false);
           stmt = conn.createStatement(); 
           ## 将 A 账户中的金额减少 500 
           stmt.execute("\
           update t_account set amount = amount - 500 where account_id = 'A'");
           ## 将 B 账户中的金额增加 500 
           stmt.execute("\
           update t_account set amount = amount + 500 where account_id = 'B'");
           ## 提交事务
           conn.commit();
           ## 事务提交:转账的两步操作同时成功
       } catch(SQLException sqle){            
           try{ 
               ## 发生异常,回滚在本事务中的操做
              conn.rollback();
               ## 事务回滚:转账的两步操作完全撤销
               stmt.close(); 
               conn.close(); 
           }catch(Exception ignore){ } 
           sqle.printStackTrace(); 
       } 
   }

本地事务机制过程图:

6cc889fc653c3db804a8cb0e1d1e1a7.png

Spring 外部(全局)事务

  • 外部事务管理器提供事务管理
  • 通过Spring事务接口,调用外部管理器
  • 使用JNDI等方式获取外部事务管理器的实例
  • 外部事务管理器一般由应用服务器提供、如JBoss等

什么是JNDI?

参考==>JNDI到底是什么?


目录
相关文章
|
3月前
|
SQL 关系型数据库 MySQL
乐观锁在分布式数据库中如何与事务隔离级别结合使用
乐观锁在分布式数据库中如何与事务隔离级别结合使用
|
10天前
|
SQL Java 关系型数据库
【SpringFramework】Spring事务
本文简述Spring中数据库及事务相关衍伸知识点。
37 9
|
17天前
|
Java 开发者 Spring
理解和解决Spring框架中的事务自调用问题
事务自调用问题是由于 Spring AOP 代理机制引起的,当方法在同一个类内部自调用时,事务注解将失效。通过使用代理对象调用、将事务逻辑分离到不同类中或使用 AspectJ 模式,可以有效解决这一问题。理解和解决这一问题,对于保证 Spring 应用中的事务管理正确性至关重要。掌握这些技巧,可以提高开发效率和代码的健壮性。
51 13
|
1月前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
1月前
|
缓存 安全 Java
Spring高手之路26——全方位掌握事务监听器
本文深入探讨了Spring事务监听器的设计与实现,包括通过TransactionSynchronization接口和@TransactionalEventListener注解实现事务监听器的方法,并通过实例详细展示了如何在事务生命周期的不同阶段执行自定义逻辑,提供了实际应用场景中的最佳实践。
52 2
Spring高手之路26——全方位掌握事务监听器
|
1月前
|
Java 关系型数据库 数据库
京东面试:聊聊Spring事务?Spring事务的10种失效场景?加入型传播和嵌套型传播有什么区别?
45岁老架构师尼恩分享了Spring事务的核心知识点,包括事务的两种管理方式(编程式和声明式)、@Transactional注解的五大属性(transactionManager、propagation、isolation、timeout、readOnly、rollbackFor)、事务的七种传播行为、事务隔离级别及其与数据库隔离级别的关系,以及Spring事务的10种失效场景。尼恩还强调了面试中如何给出高质量答案,推荐阅读《尼恩Java面试宝典PDF》以提升面试表现。更多技术资料可在公众号【技术自由圈】获取。
|
2月前
|
Java 开发者 Spring
Spring高手之路24——事务类型及传播行为实战指南
本篇文章深入探讨了Spring中的事务管理,特别是事务传播行为(如REQUIRES_NEW和NESTED)的应用与区别。通过详实的示例和优化的时序图,全面解析如何在实际项目中使用这些高级事务控制技巧,以提升开发者的Spring事务管理能力。
65 1
Spring高手之路24——事务类型及传播行为实战指南
|
2月前
|
存储 Java 关系型数据库
在Spring Boot中整合Seata框架实现分布式事务
可以在 Spring Boot 中成功整合 Seata 框架,实现分布式事务的管理和处理。在实际应用中,还需要根据具体的业务需求和技术架构进行进一步的优化和调整。同时,要注意处理各种可能出现的问题,以保障分布式事务的顺利执行。
116 6
|
2月前
|
JavaScript Java 关系型数据库
Spring事务失效的8种场景
本文总结了使用 @Transactional 注解时事务可能失效的几种情况,包括数据库引擎不支持事务、类未被 Spring 管理、方法非 public、自身调用、未配置事务管理器、设置为不支持事务、异常未抛出及异常类型不匹配等。针对这些情况,文章提供了相应的解决建议,帮助开发者排查和解决事务不生效的问题。
|
2月前
|
XML Java 数据库连接
Spring中的事务是如何实现的
Spring中的事务管理机制通过一系列强大的功能和灵活的配置选项,为开发者提供了高效且可靠的事务处理手段。无论是通过注解还是AOP配置,Spring都能轻松实现复杂的事务管理需求。掌握这些工具和最佳实践,能
83 3