剑指JUC原理-20.并发编程实践(中)

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: 剑指JUC原理-20.并发编程实践

剑指JUC原理-20.并发编程实践(上):https://developer.aliyun.com/article/1413696


以阿里云开发社区举例:


应急定位场景下,A系统调用B系统获取诊断结论,TR超时时间是500ms,对于一个异常ID事件,需要执行多个诊断项服务,并记录诊断流水;每个诊断的耗时大概在100ms以内,随着业务的增长,超过5个诊断项,计算耗时累加到500ms+,这时候服务会出现高峰期短暂不可用。

将这段代码改成异步执行,这样执行诊断的时间是耗时最大的诊断服务

// 提交future任务并发执行
futures = executor.invokeAll(tasks, timeout, timeUnit);
// 遍历读取结果
for (Future<Res> future : futures) {
    try {
        // 获取结果
        Res singleResult = future.get();
        if (singleResult != null) {
            result.add(singleResult);
        }
    } catch (Exception e) {
        LogUtil.error(e, logger, "并发执行发生异常!,poolName={0}.", threadPoolName);
    }
}

通过上面的两个场景举例,可以看出,实际上针对一些耗时较长的任务运行,适当地利用,可以达到加速的效果。但是凡事都是双刃剑,有利有弊。


线上对响应时间要求较高的场合,尽量少用多线程,尤其是服务线程需要等待任务线程的场合(很多重大事故就是和这个息息相关),如果一定要用,可以对服务线程设置一个最大等待时间。


这句话的核心是在线上高响应时间的场景下,需要谨慎使用多线程,特别是当服务线程需要等待任务线程时。因为在多线程环境中,线程间的调度和同步可能会引入额外的等待时间,这可能导致响应时间增加,影响系统的性能。


举个例子来说,假设我们有一个在线服务,它需要处理大量的用户请求。每个用户请求都会被分配到一个服务线程去处理。为了提高处理速度,每个服务线程可能会启动多个任务线程去并行执行一些计算密集型的任务。这种情况下,服务线程就需要等待所有的任务线程完成才能继续执行。


然而,由于操作系统的线程调度策略,任务线程可能并不会立即执行。此外,如果任务线程的数量超过了CPU的核心数,那么这些线程就需要在CPU核心之间进行切换,这也会引入额外的等待时间。这些都可能导致服务线程需要花费更多的时间等待任务线程,从而导致响应时间增加。


因此,这句话的建议是,在这种场景下,最好尽量少用多线程,或者至少要对服务线程设置一个最大等待时间,以防止服务线程无限期地等待任务线程。这样可以避免因为线程同步和调度问题导致的性能下降,保证在线服务的响应时间。


当然,这并不是说多线程就一定会导致性能下降。如果使用得当,多线程还是可以大大提高系统的性能的。但是在高响应时间的场景下,我们需要更加谨慎地使用多线程,以防止潜在的性能问题。


以Redis举例:


Redis 6.0 之后的版本开始选择性使用多线程模型。


Redis 选择使用单线程模型处理客户端的请求主要还是因为 CPU 不是 Redis 服务器的瓶颈,使用多线程模型带来的性能提升并不能抵消它带来的开发成本和维护成本,系统的性能瓶颈也主要在网络 I/O 操作上;


而 Redis 引入多线程操作也是出于性能上的考虑,对于一些大键值对的删除操作,通过多线程非阻塞地释放内存空间也能减少对 Redis 主线程阻塞的时间,提高执行的效率。


凡事不能有绝对,寻找到适中的平衡点最重要!


ThreadLocal


Mysql应用


场景构建


这里我们先构建一个简单的转账场景: 有一个数据表account,里面有两个用户Jack和Rose,用户Jack 给用户Rose 转账。


案例的实现主要用mysql数据库,JDBC 和 C3P0 框架。

dao层代码 : AccountDao

public class AccountDao {
public void out(String outUser, int money) throws SQLException {
    String sql = "update account set money = money - ? where name = ?";
    Connection conn = JdbcUtils.getConnection();
    PreparedStatement pstm = conn.prepareStatement(sql);
    pstm.setInt(1,money);
    pstm.setString(2,outUser);
    pstm.executeUpdate();
    JdbcUtils.release(pstm,conn);
}
public void in(String inUser, int money) throws SQLException {
    String sql = "update account set money = money + ? where name = ?";
    Connection conn = JdbcUtils.getConnection();
    PreparedStatement pstm = conn.prepareStatement(sql);
    pstm.setInt(1,money);
    pstm.setString(2,inUser);
    pstm.executeUpdate();
    JdbcUtils.release(pstm,conn);
}
} 

service层代码 : AccountService

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        try {
            // 转出
            ad.out(outUser, money);
            // 转入
            ad.in(inUser, money);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

工具类 : JdbcUtils

public class JdbcUtils { 
public static void commitAndClose(Connection conn) {
    try {
        if(conn != null){
            //提交事务
            conn.commit();
            //释放连接
            conn.close();
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
}
public static void rollbackAndClose(Connection conn) {
    try {
        if(conn != null){
            //回滚事务
            conn.rollback();
            //释放连接
            conn.close();
        }
    } catch (SQLException e) {
        e.printStackTrace();
    }
}
} 

引入事务


案例中的转账涉及两个DML操作: 一个转出,一个转入。这些操作是需要具备原子性的,不可分割。不然就有可能出现数据修改异常情况。

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        try {
            // 转出
            ad.out(outUser, money);
            // 模拟转账过程中的异常
            int i = 1/0;
            // 转入
            ad.in(inUser, money);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

所以这里就需要操作事务,来保证转出和转入操作具备原子性,要么同时成功,要么同时失败。


JDBC中关于事务的操作的api

Connection接口的方法 作用
void setAutoCommit(false) 禁用事务自动提交(改为手动)
void commit(); 提交事务
void rollback(); 回滚事务

开启事务的注意点:


  • 为了保证所有的操作在一个事务中,案例中使用的连接必须是同一个: service层开启事务的connection需要跟dao层访问数据库的connection保持一致
  • 线程并发情况下, 每个线程只能操作各自的 connection


常规解决方案


常规方案的实现


基于上面给出的前提, 大家通常想到的解决方案是 :


  • 传参: 从service层将connection对象向dao层传递
  • 加锁


以下是代码实现修改的部分:


AccountService 类

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        //线程并发情况下,为了保证每个线程使用各自的connection,故加锁
        synchronized (AccountService.class) {
            Connection conn = null;
            try {
                conn = JdbcUtils.getConnection();
                //开启事务
                conn.setAutoCommit(false);
                // 转出
                ad.out(conn, outUser, money);
                // 模拟转账过程中的异常
//            int i = 1/0;
                // 转入
                ad.in(conn, inUser, money);
                //事务提交
                JdbcUtils.commitAndClose(conn);
            } catch (Exception e) {
                e.printStackTrace();
                //事务回滚
                JdbcUtils.rollbackAndClose(conn);
                return false;
            }
            return true;
        }
    }
}

AccountDao 类 (这里需要注意的是: connection不能在dao层释放,要在service层,不然在dao层释放,service层就无法使用了)

public class AccountDao {
    public void out(Connection conn, String outUser, int money) throws SQLException{
        String sql = "update account set money = money - ? where name = ?";
        //注释从连接池获取连接的代码,使用从service中传递过来的connection
//        Connection conn = JdbcUtils.getConnection();
        PreparedStatement pstm = conn.prepareStatement(sql);
        pstm.setInt(1,money);
        pstm.setString(2,outUser);
        pstm.executeUpdate();
        //连接不能在这里释放,service层中还需要使用
//        JdbcUtils.release(pstm,conn);
        JdbcUtils.release(pstm);
    }
    public void in(Connection conn, String inUser, int money) throws SQLException {
        String sql = "update account set money = money + ? where name = ?";
//        Connection conn = JdbcUtils.getConnection();
        PreparedStatement pstm = conn.prepareStatement(sql);
        pstm.setInt(1,money);
        pstm.setString(2,inUser);
        pstm.executeUpdate();
//        JdbcUtils.release(pstm,conn);
        JdbcUtils.release(pstm);
    }
}

常规方案的弊端


上述方式我们看到的确按要求解决了问题,但是仔细观察,会发现这样实现的弊端:


  • 直接从service层传递connection到dao层, 造成代码耦合度提高
  • 加锁会造成线程失去并发性,程序性能降低


ThreadLocal方案的实现


像这种需要在项目中进行数据传递线程隔离的场景,我们不妨用ThreadLocal来解决:


工具类的修改: 加入ThreadLocal

public class JdbcUtils {
    //ThreadLocal对象 : 将connection绑定在当前线程中
    private static final ThreadLocal<Connection> tl = new ThreadLocal();
    // c3p0 数据库连接池对象属性
    private static final ComboPooledDataSource ds = new ComboPooledDataSource();
    // 获取连接
    public static Connection getConnection() throws SQLException {
        //取出当前线程绑定的connection对象
        Connection conn = tl.get();
        if (conn == null) {
            //如果没有,则从连接池中取出
            conn = ds.getConnection();
            //再将connection对象绑定到当前线程中
            tl.set(conn);
        }
        return conn;
    }
    //释放资源
    public static void release(AutoCloseable... ios) {
        for (AutoCloseable io : ios) {
            if (io != null) {
                try {
                    io.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void commitAndClose() {
        try {
            Connection conn = getConnection();
            //提交事务
            conn.commit();
            //解除绑定 及时释放
            tl.remove();
            //释放连接
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    public static void rollbackAndClose() {
        try {
            Connection conn = getConnection();
            //回滚事务
            conn.rollback();
            //解除绑定 及时释放
            tl.remove();
            //释放连接
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

AccountService类的修改:不需要传递connection对象

public class AccountService {
    public boolean transfer(String outUser, String inUser, int money) {
        AccountDao ad = new AccountDao();
        try {
            Connection conn = JdbcUtils.getConnection();
            //开启事务
            conn.setAutoCommit(false);
            // 转出 : 这里不需要传参了 !
            ad.out(outUser, money);
            // 模拟转账过程中的异常
//            int i = 1 / 0;
            // 转入
            ad.in(inUser, money);
            //事务提交
            JdbcUtils.commitAndClose();
        } catch (Exception e) {
            e.printStackTrace();
            //事务回滚
           JdbcUtils.rollbackAndClose();
            return false;
        }
        return true;
    }
}

AccountDao类的修改:照常使用


TraceId日志 - 传递参数


使用 MDC 传递参数!


MDC是什么?


MDC是org.slf4j包下的一个类,它的全称是Mapped Diagnostic Context,我们可以认为它是一个线程安全的存放诊断日志的容器。


MDC的底层是用了ThreadLocal来保存数据的。


例如现在有这样一种场景:我们使用RestTemplate调用远程接口时,有时需要在header中传递信息,比如:traceId,source等,便于在查询日志时能够串联一次完整的请求链路,快速定位问题。


这种业务场景就能通过ClientHttpRequestInterceptor接口实现,具体做法如下:


第一步,定义一个LogFilter拦截所有接口请求,在MDC中设置traceId:

public class LogFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        MdcUtil.add(UUID.randomUUID().toString());
        System.out.println("记录请求日志");
        chain.doFilter(request, response);
        System.out.println("记录响应日志");
    }
    @Override
    public void destroy() {
    }
}

第二步,实现ClientHttpRequestInterceptor接口,MDC中获取当前请求的traceId,然后设置到header中:


实现ClientHttpRequestInterceptor接口是指创建一个类,并让该类实现org.springframework.http.client.ClientHttpRequestInterceptor接口。这是Spring框架中用于拦截客户端发起的HTTP请求的接口。


通过实现ClientHttpRequestInterceptor接口,你可以在发起HTTP请求之前或之后对请求进行拦截和处理。这样的拦截器通常被用于添加、修改或者记录HTTP请求的头部信息、请求体内容等。在实际应用中,这个功能可以被用来实现诸如鉴权、日志记录、统一添加请求头、异常处理等功能。

public class RestTemplateInterceptor implements ClientHttpRequestInterceptor {
    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
        request.getHeaders().set("traceId", MdcUtil.get());
        return execution.execute(request, body);
    }
}

第三步,定义配置类,配置上面定义的RestTemplateInterceptor类:

@Configuration
public class RestTemplateConfiguration {
    @Bean
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(Collections.singletonList(restTemplateInterceptor()));
        return restTemplate;
    }
    @Bean
    public RestTemplateInterceptor restTemplateInterceptor() {
        return new RestTemplateInterceptor();
    }
}

其中MdcUtil其实是利用MDC工具在ThreadLocal中存储和获取traceId

public class MdcUtil {
    private static final String TRACE_ID = "TRACE_ID";
    public static String get() {
        return MDC.get(TRACE_ID);
    }
    public static void add(String value) {
        MDC.put(TRACE_ID, value);
    }
}

当然,这个例子中没有演示MdcUtil类的add方法具体调的地方,我们可以在filter中执行接口方法之前,生成traceId,调用MdcUtil类的add方法添加到MDC中,然后在同一个请求的其他地方就能通过MdcUtil类的get方法获取到该traceId。


能使用MDC保存traceId等参数的根本原因是,用户请求到应用服务器,Tomcat会从线程池中分配一个线程去处理该请求。


那么该请求的整个过程中,保存到MDC的ThreadLocal中的参数,也是该线程独享的,所以不会有线程安全问题。


过滤器 和 拦截器详解


过滤器与拦截器相同点


  • 拦截器与过滤器都是体现了AOP的思想,对方法实现增强,都可以拦截请求方法。
  • 拦截器和过滤器都可以通过Order注解设定执行顺序


过滤器与拦截器区别


在Java Web开发中,过滤器(Filter)和拦截器(Interceptor)都是常见的用于在请求和响应之间进行处理的组件。它们的主要区别如下:


  • 运行位置不同:过滤器是运行在Web服务器和Servlet容器之间的组件,可以拦截所有进出该容器的请求和响应(包括静态资源);而拦截器则是针对具体的控制器方法进行拦截处理的,只在控制器内部执行。
  • 执行顺序不同:过滤器的执行顺序是由其在web.xml文件中声明的顺序决定的,按照声明的顺序依次执行;而拦截器的执行顺序是根据其在配置文件中的声明顺序决定的。
  • 功能不同:过滤器主要用于对请求进行预处理和过滤,例如设置字符集、登录验证、日志记录等操作;而拦截器则主要用于对请求进行流程控制,例如权限验证、参数注入、异常处理等操作。
  • 依赖框架不同:过滤器是基于Servlet规范实现的,不依赖任何特定的框架;而拦截器则通常是针对特定的框架或库实现的,例如Spring MVC框架中的拦截器。


综上所述,过滤器和拦截器在实现方式、功能和使用场景等方面都有一定的差异,开发者可以根据具体需求选择适合的组件。


过滤器 与 拦截器经典问题?


1.过滤器和拦截器的区别是什么?


过滤器(Filter)是在Servlet容器中用于对请求进行预处理和过滤的组件,可以实现过滤、验证、压缩等功能。而拦截器(Interceptor)是在Spring MVC框架中用于对请求进行拦截和处理的组件,可以实现权限验证、日志记录、异常处理等功能。过滤器是在Servlet容器中执行的,而拦截器是在Spring MVC框架中执行的。


2.过滤器和拦截器的执行顺序是怎样的?


在Java Web应用程序中,过滤器和拦截器的执行顺序都是由它们在配置文件中的声明顺序决定的。一般来说,先声明的过滤器或拦截器会先执行,后声明的过滤器或拦截器会后执行。


3.过滤器和拦截器的作用有哪些?


过滤器和拦截器都可以对请求进行处理和控制,实现一系列的功能,例如请求过滤、身份验证、数据加密、日志记录等。过滤器主要用于对请求进行预处理和过滤操作,而拦截器主要用于对请求进行拦截处理,在控制器方法执行之前或之后进行拦截和处理。


4.过滤器和拦截器的使用场景有哪些?


过滤器和拦截器都可以用于实现一系列的控制和管理功能。例如,过滤器可以用于身份验证、数据加密和解密、请求过滤和压缩等场景;而拦截器可以用于权限验证、日志记录、异常处理等场景。


5.如何在Java Web应用程序中使用过滤器和拦截器?


在Java Web应用程序中,要使用过滤器和拦截器,需要在配置文件中进行声明和注册。对于过滤器,可以通过在web.xml文件中添加和标签来完成;对于拦截器,可以通过在Spring MVC配置文件中添加mvc:interceptors标签来完成。同时,在声明和注册过滤器和拦截器时,还需要指定其执行顺序以及拦截路径等相关信息。


TransmittableThreadLocal


观察下面的代码请你判断代码的输出:

public class TestCase1 {
    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
    public static void main(String[] args) {
        case1();
    }
    public static void case1() {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        threadLocal.set("Hello");
        Runnable task1 = new Runnable() {
            @Override
            public void run() {
                System.out.println("ThreadLocal value in task1: " + threadLocal.get());
                threadLocal.set("Task1");
            }
        };
        Runnable task2 = new Runnable() {
            @Override
            public void run() {
                System.out.println("ThreadLocal value in task2: " + threadLocal.get());
                threadLocal.set("Task2");
            }
        };
        executorService.submit(task1);
        sleep(100);
        executorService.submit(task2);
        sleep(100);
        System.out.println("ThreadLocal value in mainThread: " + threadLocal.get());
        executorService.shutdown();
    }
    public static void sleep(int val){
        try {
            Thread.sleep(val);
        } catch (InterruptedException ignored) {
        }
    }
}

分析这段代码的输出并不难,实际输出如下:

ThreadLocal value in task1: null
ThreadLocal value in task2: Task1
ThreadLocal value in mainThread: Hello

因为我们线程池中只有一个线程,当第一个任务执行完成之后,这个线程池的线程的ThreadLocal便设置上了Task1,之后第二个任务执行时获取到的ThreadLocal中的值便是Task1,但是主线程和子线程是不同的线程,所以无论子线程如何修改ThreadLocal的内容对主线程都是无影响的。


线程池场景下ThreadLocal的值传递问题


有这么一个需求,用户登录之后,在所有的请求接口中,通过某个公共方法,就能获取到当前登录用户的信息?


获取的用户上下文,我们以CurrentUser为例。


CurrentUser内部包含了一个ThreadLocal对象,它负责保存当前线程的用户上下文信息。当然为了保证在线程池中,也能从用户上下文中获取到正确的用户信息,这里用了阿里的TransmittableThreadLocal。伪代码如下:

@Data
public class CurrentUser {
    private static final TransmittableThreadLocal<CurrentUser> THREA_LOCAL = new TransmittableThreadLocal<>();
    private String id;
    private String userName;
    private String password;
    private String phone;
    ...
    public statis void set(CurrentUser user) {
      THREA_LOCAL.set(user);
    }
    public static void getCurrent() {
      return THREA_LOCAL.get();
    }
}

这里为什么用了阿里的TransmittableThreadLocal,而不是普通的ThreadLocal呢?在线程池中,由于线程会被多次复用,导致从普通的ThreadLocal中无法获取正确的用户信息。父线程中的参数,没法传递给子线程,而TransmittableThreadLocal很好解决了这个问题。


剑指JUC原理-20.并发编程实践(下):https://developer.aliyun.com/article/1413702


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
5天前
|
消息中间件 存储 Java
剑指JUC原理-20.并发编程实践(下)
剑指JUC原理-20.并发编程实践
40 0
|
5天前
|
存储 算法 安全
剑指JUC原理-5.synchronized底层原理(上)
剑指JUC原理-5.synchronized底层原理
38 0
|
5天前
|
Java
剑指JUC原理-14.ReentrantLock原理(下)
剑指JUC原理-14.ReentrantLock原理
26 1
|
5天前
|
安全 Java 程序员
剑指JUC原理-14.ReentrantLock原理(上)
剑指JUC原理-14.ReentrantLock原理
30 0
|
5天前
|
存储 Java 编译器
剑指JUC原理-5.synchronized底层原理(下)
剑指JUC原理-5.synchronized底层原理
33 0
|
5天前
|
Java 编译器 测试技术
剑指JUC原理-8.Java内存模型(中)
剑指JUC原理-8.Java内存模型
37 0
|
5天前
|
SQL 安全 Java
剑指JUC原理-8.Java内存模型(下)
剑指JUC原理-8.Java内存模型
38 0
|
5天前
|
缓存 安全 前端开发
剑指JUC原理-8.Java内存模型(上)
剑指JUC原理-8.Java内存模型
42 0
|
5天前
|
消息中间件 canal Java
剑指JUC原理-20.并发编程实践(上)
剑指JUC原理-20.并发编程实践
37 0
|
5天前
|
存储 缓存 安全
剑指JUC原理-11.不可变设计
剑指JUC原理-11.不可变设计
19 0